added ZSTD Layer

master
cheetah 1 year ago
parent d1bddfd037
commit 0b00bac7df

1
.gitignore vendored

@ -6,3 +6,4 @@ main
storageserver/storageserver
gmad_linux
.vscode/
storageserver/test/

@ -3,7 +3,6 @@ package common
import (
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"os"
@ -36,6 +35,7 @@ type DB_File struct {
BatchID string `json:"batch"`
InitialPath string `json:"initialPath"`
Extension string `json:"extension"`
Created time.Time `json:"created"`
Size int64 `json:"size"`
CRC uint32 `json:"crc"`
Hash string `json:"hash"`
@ -65,6 +65,7 @@ type DB_Chunk struct {
NotReady bool `json:"notReady"`
Finalized bool `json:"finalized"`
ReadOnly bool `json:"readOnly"`
Created time.Time `json:"created"`
FileCount int `json:"fileCount"`
Size int64 `json:"size"`
@ -77,13 +78,13 @@ type DB_File2Chunk struct {
File string `json:"_from"`
}
func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte) (err error) {
func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte, workerID string) (err error) {
//fmt.Printf("\nMultipartUpload(%s, %s)\n", url, path)
file, err := os.Open(path)
if err != nil {
return err
}
fileContents, err := ioutil.ReadAll(file)
fileContents, err := io.ReadAll(file)
if err != nil {
return err
}
@ -103,6 +104,11 @@ func MultipartUpload(client *http.Client, url string, path string, jsonBytes []b
return
}
err = writer.WriteField("worker", fmt.Sprintf("gma-%s", workerID))
if err != nil {
return
}
part, err := writer.CreateFormFile("file", fi.Name())
if err != nil {
return
@ -142,7 +148,7 @@ func MultipartUpload(client *http.Client, url string, path string, jsonBytes []b
// Discard Response Body, to clean up tcp socket
defer res.Body.Close()
_, err = io.Copy(ioutil.Discard, res.Body)
_, err = io.Copy(io.Discard, res.Body)
if err != nil {
return err
}

@ -0,0 +1,258 @@
package main
import (
"archive/tar"
"context"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/common"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/klauspost/compress/zstd"
)
var (
arangoDB adriver.Database
arangoCTX context.Context
colChunk adriver.Collection
colFile adriver.Collection
colFile2Chunk adriver.Collection
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
log.Println("connectDB:", "Starting Connection Process...")
// Retry Loop for Failed Connections
for i := 0; i < 6; i++ {
if i == 5 {
return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i)
} else if i > 0 {
time.Sleep(30 * time.Second)
}
// Connect to ArangoDB URL
conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{
Endpoints: []string{baseURL},
TLSConfig: &tls.Config{ /*...*/ },
})
if err != nil {
log.Println("connectDB:", "Cannot Connect to ArangoDB!", err)
continue
}
// Connect Driver to User
client, err := adriver.NewClient(adriver.ClientConfig{
Connection: conn,
Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD),
})
if err != nil {
log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err)
continue
}
// Create Context for Database Access
ctx = context.Background()
driver, err = client.Database(ctx, arangoDatabase)
if err != nil {
log.Println("connectDB:", "Cannot Load ArangoDB Database!", err)
continue
}
log.Println("connectDB:", "Connection Sucessful!")
return driver, ctx, nil
}
return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?")
}
func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator")
if err != nil {
return err
}
colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
if err != nil {
return err
}
colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil {
return err
}
colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map")
if err != nil {
return err
}
return nil
}
type PoolRecoveryData struct {
PoolID string `json:"_key"`
Size uint64 `json:"size"`
Created time.Time `json:"date"`
Hash string `json:"hash"`
ItemCount int `json:"itemCount"`
Items []string `json:"items"`
RecoveryData []common.DB_File `json:"recoveryData"`
}
func bla() error {
entries, err := os.ReadDir("/mnt/SC9000/storagePools/")
if err != nil {
return err
}
for _, e := range entries {
if !e.IsDir() {
fmt.Printf("Scanning For Local Pools, found %s:", e.Name())
tarFinalPath := filepath.Join("/mnt/SC9000/storagePools/", e.Name())
stats, err := os.Stat(tarFinalPath)
if err != nil {
return err
}
parts := strings.Split(e.Name(), ".")
var chunk common.DB_Chunk
_, err = colChunk.ReadDocument(arangoCTX, parts[0], &chunk)
if err != nil {
return err
}
chunk.Finalized = true
chunk.NotReady = false
chunk.ReadOnly = true
chunk.Size = stats.Size()
zstFile, err := os.Open(tarFinalPath)
if err != nil {
return err
}
shaHasher := sha256.New()
_, err = io.Copy(shaHasher, zstFile)
if err != nil {
return err
}
jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0]))
_, err = os.Stat(jsonPath)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
// rewrite json from db
zstFile.Seek(0, 0)
decompressor, err := zstd.NewReader(zstFile)
if err != nil {
panic(err)
}
defer decompressor.Close()
items := []string{}
tarReader := tar.NewReader(decompressor)
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
items = append(items, header.Name)
}
poolRecoveryData := PoolRecoveryData{
PoolID: parts[0],
Size: uint64(stats.Size()),
Created: time.Now(),
Hash: fmt.Sprintf("%x", shaHasher.Sum(nil)),
ItemCount: 500,
Items: items,
//RecoveryData,
}
chunk.Hash = poolRecoveryData.Hash
//TODO: fetch RecoveryData from DB
poolRecoveryData.RecoveryData = make([]common.DB_File, len(items))
_, _, err = colFile.ReadDocuments(arangoCTX, items, poolRecoveryData.RecoveryData)
if err != nil {
return fmt.Errorf("error @ReadDocuments %v", err)
}
json, err := json.MarshalIndent(poolRecoveryData, "", "\t")
if err != nil {
return fmt.Errorf("error @json.MarshalIndent %v", err)
}
recoveryFile, err := os.Create(jsonPath)
if err != nil {
return err
}
_, err = recoveryFile.Write(json)
if err != nil {
return fmt.Errorf("error @recoveryFile.Write %v", err)
}
}
} else {
var poolRecoveryData PoolRecoveryData
readJSONFile, err := os.Open(jsonPath)
if err != nil {
return err
}
defer readJSONFile.Close()
readBytes, err := io.ReadAll(readJSONFile)
if err != nil {
return err
}
err = json.Unmarshal(readBytes, &poolRecoveryData)
if err != nil {
return err
}
poolRecoveryData.Size = uint64(stats.Size())
poolRecoveryData.Created = time.Now()
poolRecoveryData.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil))
chunk.Hash = poolRecoveryData.Hash
json, err := json.MarshalIndent(poolRecoveryData, "", "\t")
if err != nil {
return fmt.Errorf("error @json.MarshalIndent %v", err)
}
recoveryFile, err := os.Create(jsonPath)
if err != nil {
return err
}
_, err = recoveryFile.Write(json)
if err != nil {
return fmt.Errorf("error @recoveryFile.Write %v", err)
}
}
_, err = colChunk.UpdateDocument(arangoCTX, parts[0], &chunk)
if err != nil {
return err
}
}
}
return nil
}
func main() {
err := InitDatabase()
if err != nil {
panic(err)
}
err = bla()
if err != nil {
panic(err)
}
}

@ -8,6 +8,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/djherbis/times v1.5.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/labstack/echo v3.3.10+incompatible // indirect
github.com/labstack/echo/v4 v4.10.2 // indirect
github.com/labstack/gommon v0.4.0 // indirect

@ -19,6 +19,7 @@ import (
"git.cheetah.cat/worksucc/gma-puzzles/common"
"git.cheetah.cat/worksucc/gma-puzzles/gma"
"github.com/arangodb/go-driver"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/schollz/progressbar/v3"
@ -35,6 +36,8 @@ var (
//colFile2Chunk adriver.Collection
colGMA adriver.Collection
colGMA2File adriver.Collection
workerID string
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
@ -117,12 +120,14 @@ var GlobalWriteLock sync.Mutex
func main() {
folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string")
skipNameP := flag.String("skip", "", "skip until this name")
workerNameP := flag.String("worker", "def", "worker name")
debug.SetMemoryLimit(6e9)
flag.Parse()
folderPath := *folderPathP
skipName := *skipNameP
skipNameEnabled := len(skipName) > 0
workerID = *workerNameP
go func() {
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
}()
@ -191,7 +196,7 @@ func main() {
}
func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) {
log.Printf("undoBatch(%x, %s)\n", undoBatch, gmaID)
log.Printf("undoBatch(%v, %s)\n", undoBatch, gmaID)
/*
_, err = colGMA.RemoveDocument(arangoCTX, gmaID)
if err != nil {
@ -220,7 +225,7 @@ func ProcessGMA(filePath string) (err error) {
fmt.Println("aquired global write lock")
defer unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway
defer fmt.Println("unlocking GlobalWriteLock")
time.Sleep(5 * time.Second)
//time.Sleep(5 * time.Second)
var (
fileIDs []string
@ -231,6 +236,31 @@ func ProcessGMA(filePath string) (err error) {
dboGMA.OriginalPath = filePath
dboGMA.ProcessingStart = time.Now()
cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR g IN gma FILTER g.originalPath == '%s' RETURN g", dboGMA.OriginalPath), nil)
if err != nil {
return err
}
defer cursor.Close()
skipHashCheck := false
if cursor.Count() > 0 || cursor.HasMore() {
for {
gma := common.DB_GMA{}
_, err = cursor.ReadDocument(arangoCTX, &gma)
if driver.IsNoMoreDocuments(err) {
break
} else if err != nil {
return err
}
if gma.Success {
return fmt.Errorf("GMA with ID %s was successfull at %v", gma.ID, gma.ProcessingEnd)
} else {
skipHashCheck = true
}
}
}
fileStat, err := os.Stat(filePath)
if err != nil {
return err
@ -241,6 +271,7 @@ func ProcessGMA(filePath string) (err error) {
if dboGMA.GMASize < 200 {
return fmt.Errorf("GMA File too small, skipping")
}
log.Printf("Opening %s\n", filePath)
gmaReader, err := gma.NewReader(filePath)
if err != nil {
@ -262,9 +293,15 @@ func ProcessGMA(filePath string) (err error) {
if err != nil {
return err
}
if dboIDExists {
if dboIDExists && !skipHashCheck {
return fmt.Errorf("GMA with ID %s exists", dboGMA.ID)
}
if dboIDExists && skipHashCheck {
_, err = colGMA.RemoveDocument(arangoCTX, dboGMA.ID)
if err != nil {
return err
}
}
header, err := gmaReader.ReadHeader()
if err != nil {
@ -444,7 +481,7 @@ func ProcessGMA(filePath string) (err error) {
}
uploadBar.Describe("Uploading")
err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON)
err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON, workerID)
if err != nil {
log.Println("err @common.MultipartUpload")
log.Println(err)

@ -22,6 +22,7 @@ import (
"git.cheetah.cat/worksucc/gma-puzzles/common"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/klauspost/compress/zstd"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/twinj/uuid"
@ -68,7 +69,7 @@ type PoolFile struct {
type PoolMaster struct {
cachePath string
finalPath string
CurrentPool *Pool
CurrentPool map[string]*Pool
lock sync.Mutex
WORMLock sync.Mutex
@ -175,7 +176,14 @@ func (p *Pool) OpenTar() (err error) {
return err
}
p.items = []string{}
p.tarReader = tar.NewReader(p.file)
decompressor, err := zstd.NewReader(p.file, zstd.WithDecoderConcurrency(8))
if err != nil {
panic(err)
}
defer decompressor.Close()
p.tarReader = tar.NewReader(decompressor)
for {
header, err := p.tarReader.Next()
if err == io.EOF {
@ -229,6 +237,7 @@ func (p *Pool) Unload() {
func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) {
poolMaster.finalPath = finalPath
poolMaster.cachePath = cachePath
poolMaster.CurrentPool = make(map[string]*Pool)
poolMaster.WORMPools = make(map[string]*Pool)
//poolMaster.lock = sync.Mutex{}
@ -266,30 +275,33 @@ func (p *PoolMaster) NewPool() (*Pool, error) {
return &pool, nil
}
func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) {
func (p *PoolMaster) GetCurrentWriteablePool(workerID string) (pool *Pool, err error) {
//fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems {
if p.CurrentPool[workerID] != nil && p.CurrentPool[workerID].itemCount >= PoolMaxItems {
fmt.Printf("Aquiring Lock for GetCurrentWriteablepool\n")
p.lock.Lock()
defer p.lock.Unlock()
defer fmt.Printf("unlock GetCurrentWriteablepool\n")
fmt.Printf("Aquired Lock success for GetCurrentWriteablepool\n")
p.CurrentPool.ReadOnly = true
p.FullPools = append(p.FullPools, p.CurrentPool)
p.CurrentPool[workerID].ReadOnly = true
p.CurrentPool[workerID].LastTouchy = time.Now()
p.FullPools = append(p.FullPools, p.CurrentPool[workerID])
// queue for compression
fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool.PoolID, p.CurrentPool.itemCount)
p.CurrentPool = nil
fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool[workerID].PoolID, p.CurrentPool[workerID].itemCount)
p.CurrentPool[workerID] = nil
}
if p.CurrentPool == nil {
if p.CurrentPool[workerID] == nil {
fmt.Printf("Creating new Pool")
p.CurrentPool, err = p.AcquireNewOrRecoverPool()
fmt.Printf("... got [%s]\n", p.CurrentPool.PoolID)
p.CurrentPool[workerID], err = p.AcquireNewOrRecoverPool()
err := os.WriteFile(filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", p.CurrentPool[workerID].PoolID)), []byte(workerID), os.ModePerm)
if err != nil {
return pool, err
}
fmt.Printf("... got [%s]\n", p.CurrentPool[workerID].PoolID)
}
return p.CurrentPool, nil
return p.CurrentPool[workerID], nil
}
func RestorePoolFromFolder(folderPath string) (pool *Pool, err error) {
pool = &Pool{}
@ -324,7 +336,7 @@ func (p *PoolMaster) ScanForLocalPools() (err error) {
if e.IsDir() {
fmt.Printf("Scanning For Local Pools, found %s:", e.Name())
tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", e.Name()))
tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", e.Name()))
_, err = os.Stat(tarFinalPath)
finalPathExists := false
if err != nil {
@ -358,8 +370,27 @@ func (p *PoolMaster) ScanForLocalPools() (err error) {
if err != nil {
return err
}
workerCacheFileName := filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", e.Name()))
fmt.Printf("is readonly %v itemCount=%d\n", restoredPool.ReadOnly, restoredPool.itemCount)
if restoredPool.itemCount == 500 {
p.LocalPools = append(p.LocalPools, restoredPool)
} else {
_, err = os.Stat(workerCacheFileName) //if we have a worker assingment file
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
// if not exists
p.LocalPools = append(p.LocalPools, restoredPool)
} else {
workerBytes, err := os.ReadFile(workerCacheFileName)
if err != nil {
return err
}
p.CurrentPool[string(workerBytes)] = restoredPool
}
}
}
}
return nil
@ -373,6 +404,7 @@ func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error)
Hash: packResult.Hash,
Size: packResult.Size,
FileCount: packResult.FileCount,
Created: time.Now(),
NotReady: true,
ReadOnly: true,
Finalized: true,
@ -429,7 +461,7 @@ func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error)
func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) {
startTime := time.Now()
targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", packResult.PoolID))
targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", packResult.PoolID))
err = common.MoveFile(packResult.outputFileName, targetFileName)
if err != nil {
return err
@ -501,19 +533,25 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
startTime := time.Now()
packResult.PoolID = poolID
packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID))
packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar.zst", poolID))
tarFile, err := os.Create(packResult.outputFileName)
if err != nil {
return packResult, err
return packResult, fmt.Errorf("os.Create: %v", err)
}
defer tarFile.Close()
tw := tar.NewWriter(tarFile)
compressor, err := zstd.NewWriter(tarFile, zstd.WithEncoderLevel(4))
if err != nil {
return packResult, fmt.Errorf("zstd.NewWriter: %v", err)
}
defer compressor.Close()
tw := tar.NewWriter(compressor)
defer tw.Close()
entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool", poolID))
if err != nil {
return packResult, err
return packResult, fmt.Errorf("os.ReadDir: %v", err)
}
//fmt.Printf("len(entries) == %d\n", len(entries))
if len(entries) != PoolMaxItems {
@ -524,13 +562,13 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
file, err := os.Open(originalPath)
if err != nil {
return packResult, err
return packResult, fmt.Errorf("for os.Open: %v", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return packResult, err
return packResult, fmt.Errorf("for fs.Stat: %v", err)
}
tarFileHeader, err := tar.FileInfoHeader(info, info.Name())
@ -548,20 +586,24 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
}
packResult.FileCount++
packResult.Files = append(packResult.Files, e.Name())
fmt.Printf("*")
}
err = tw.Flush()
err = tw.Close()
if err != nil {
return packResult, err
}
err = compressor.Close()
if err != nil {
return packResult, err
}
err = tarFile.Close()
if err != nil {
return packResult, err
}
// re-open and check
{
tarFileCheck, err := os.Open(packResult.outputFileName)
if err != nil {
return packResult, err
@ -576,8 +618,40 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil))
fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash)
// TODO: write index json describing the files inside, pure hash list, aswell
// as dictionary containing all the infos for restore/disaster-recovery into Arango
tarFileCheck.Seek(0, 0)
// validate written tar-chunk
decompressor, err := zstd.NewReader(tarFileCheck, zstd.WithDecoderConcurrency(8))
if err != nil {
panic(err)
}
defer decompressor.Close()
tarFileCheckReader := tar.NewReader(decompressor)
//filenamesReadBackList := []string{}
for {
header, err := tarFileCheckReader.Next()
//header.PAXRecords
if err == io.EOF {
break
}
if err != nil {
return packResult, err
}
hasher := sha256.New()
hashedBytes, err := io.Copy(hasher, tarFileCheckReader)
if err != nil {
return packResult, err
}
readBackChecksum := fmt.Sprintf("%x", hasher.Sum(nil))
if hashedBytes != header.Size {
return packResult, fmt.Errorf("validation on output archive, incorrect size file %s has %d should be %d", header.Name, hashedBytes, header.Size)
}
if header.Name != readBackChecksum {
return packResult, fmt.Errorf("validation on output archive, incorrect checksum file %s has %s", header.Name, readBackChecksum)
}
//filenamesReadBackList = append(filenamesReadBackList, header.Name)
}
packFileStats, err := tarFileCheck.Stat()
if err != nil {
@ -589,6 +663,11 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
}
fmt.Printf("PackPool Duration %dms\n", time.Since(startTime).Milliseconds())
}
// TODO: write index json describing the files inside, pure hash list, aswell
// as dictionary containing all the infos for restore/disaster-recovery into Arango
return packResult, nil
}
@ -603,13 +682,15 @@ func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
}
func (p *PoolMaster) Lookup(id string) (exists bool) {
if p.CurrentPool != nil { // CurrentPool
for _, poolItem := range p.CurrentPool.items {
for _, cp := range p.CurrentPool {
if cp != nil { // CurrentPool
for _, poolItem := range cp.items {
if poolItem == id {
return true
}
}
}
}
p.lock.Lock()
defer p.lock.Unlock()
for _, wormPool := range p.WORMPools { // WORM Pools
@ -643,13 +724,15 @@ func (p *PoolMaster) Lookup(id string) (exists bool) {
func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writer) (err error) {
fmt.Printf("FetchLoadWORM(chunkID %s, fileID %s, ...)\n", chunkID, fileID)
// search within loaded worm-pools
//fmt.Println("WormPool For Start")
for wormID, wormPool := range p.WORMPools {
//fmt.Printf("WORMPool[%s] for-iter\n", wormID)
if wormID != chunkID {
continue
}
for _, poolItem := range wormPool.items {
if poolItem == fileID {
fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID)
//fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID)
return wormPool.Fetch(fileID, writer)
}
}
@ -663,7 +746,7 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
fmt.Printf("Aquiring WORMLock for FetchLoadWORM\n")
p.WORMLock.Lock()
defer p.WORMLock.Unlock()
defer fmt.Printf("unlock WORMLock FetchLoadWORM\n")
0 defer fmt.Printf("unlock WORMLock FetchLoadWORM\n")
fmt.Printf("Aquired WORMLock success for FetchLoadWORM\n")
*/
@ -685,15 +768,17 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
Finalized: dboChunk.Finalized,
LastTouchy: time.Now(),
filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", dboChunk.ID)),
filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", dboChunk.ID)),
}
fmt.Println("initialized loadedWormPool, Opening tar...")
fmt.Printf("initialized loadedWormPool (%s), Opening tar...\n", dboChunk.ID)
err = loadedWormPool.OpenTar()
if err != nil {
fmt.Println(err)
return err
}
fmt.Println("extracted")
fmt.Printf("extracted and key = %s\n", loadedWormPool.PoolID)
p.WORMPools[loadedWormPool.PoolID] = &loadedWormPool
fmt.Println(p.WORMPools)
return loadedWormPool.Fetch(fileID, writer)
//return nil
}
@ -714,11 +799,12 @@ func (p *PoolMaster) CleanWORMTemp() (err error) {
}
func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
if p.CurrentPool != nil {
for _, poolItem := range p.CurrentPool.items {
for _, cp := range p.CurrentPool {
if cp != nil {
for _, poolItem := range cp.items {
if poolItem == id {
fmt.Printf("Fetch CurrentPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", p.CurrentPool.PoolID, id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", cp.PoolID, id)
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem)
srcLocalFile, err := os.Open(poolLocalFilePath)
@ -735,6 +821,7 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
}
}
}
}
p.lock.Lock()
for _, wormPool := range p.WORMPools {
for _, poolItem := range wormPool.items {
@ -750,6 +837,7 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
for _, poolItem := range fullPool.items {
if poolItem == id {
fmt.Printf("Fetch FullPool %s\n", id)
fullPool.LastTouchy = time.Now()
poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
@ -800,9 +888,8 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
}
return nil
}
func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err error) {
pool, err := p.GetCurrentWriteablePool()
func (p *PoolMaster) Store(id string, workerID string, src io.Reader, targetSize int64) (err error) {
pool, err := p.GetCurrentWriteablePool(workerID)
if err != nil {
return err
}
@ -899,26 +986,34 @@ func main() {
var deletedPools []string
for _, fullPool := range poolMaster.FullPools {
if time.Since(fullPool.LastTouchy).Minutes() < 1 {
continue
}
packResult, err := poolMaster.PackPool(fullPool.PoolID)
if err != nil {
panic(err)
panic(fmt.Errorf("error @PackPool: %v", err))
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
panic(err)
panic(fmt.Errorf("error @ImportPoolPackResult: %v", err))
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
panic(err)
panic(fmt.Errorf("error @MovePoolPackToWORM: %v", err))
}
err = poolMaster.WriteRecoveryFile(packResult, fullPool)
if err != nil {
panic(err)
panic(fmt.Errorf("error @WriteRecoveryFile: %v", err))
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
deletedPools = append(deletedPools, fullPool.PoolID)
_, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{
NotReady: false,
Finalized: true,
ReadOnly: true,
Hash: packResult.Hash,
Size: packResult.Size,
})
if err != nil {
panic(err)
@ -966,6 +1061,10 @@ func main() {
if err != nil {
panic(err)
}
err = poolMaster.WriteRecoveryFile(packResult, localPool)
if err != nil {
panic(err)
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
}
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
@ -981,6 +1080,9 @@ func main() {
e.GET("/", func(c echo.Context) error {
return c.String(http.StatusOK, "Hello, World!")
})
e.GET("/pmdump", func(c echo.Context) error {
return c.JSON(http.StatusOK, poolMaster)
})
e.GET("/check/:id", func(c echo.Context) error {
id := c.Param("id")
@ -1021,6 +1123,7 @@ func main() {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
workerID := c.FormValue("worker")
infoJSON := c.FormValue("info")
fileInfo := common.DB_File{}
err = json.Unmarshal([]byte(infoJSON), &fileInfo)
@ -1061,13 +1164,14 @@ func main() {
return c.String(http.StatusExpectationFailed, "Error")
}
defer formStream.Close()
err = poolMaster.Store(id, formStream, sizeVal)
err = poolMaster.Store(id, workerID, formStream, sizeVal)
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
fmt.Println("...stashed")
fmt.Println(fileInfo)
fileInfo.Created = time.Now()
_, err = colFile.CreateDocument(arangoCTX, fileInfo)
if err != nil {
fmt.Println(err)

@ -0,0 +1,76 @@
package main
import (
"archive/tar"
"fmt"
"io"
"os"
"path/filepath"
"github.com/klauspost/compress/zstd"
)
func main() {
tarFile, err := os.Create("test.tar.zst")
if err != nil {
panic(err)
}
defer tarFile.Close()
compressor, err := zstd.NewWriter(tarFile, zstd.WithEncoderLevel(4))
if err != nil {
panic(err)
}
defer compressor.Close()
tw := tar.NewWriter(compressor)
defer tw.Close()
entries, err := os.ReadDir("/home/cheetah/dev/gma-puzzles/zstd-tar-test/testpayload")
if err != nil {
panic(err)
}
for _, e := range entries {
originalPath := filepath.Join("/home/cheetah/dev/gma-puzzles/zstd-tar-test/testpayload", e.Name())
file, err := os.Open(originalPath)
if err != nil {
panic(err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
panic(err)
}
tarFileHeader, err := tar.FileInfoHeader(info, info.Name())
if err != nil {
panic(err)
}
err = tw.WriteHeader(tarFileHeader)
if err != nil {
panic(err)
}
_, err = io.Copy(tw, file)
if err != nil {
panic(err)
}
fmt.Println(info.Name())
}
err = tw.Close()
if err != nil {
panic(err)
}
err = compressor.Close()
if err != nil {
panic(err)
}
err = tarFile.Close()
if err != nil {
panic(err)
}
}
Loading…
Cancel
Save