From 0b00bac7df8de78617450debdfc047a2e04dea7f Mon Sep 17 00:00:00 2001 From: cheetah Date: Mon, 3 Jul 2023 07:55:09 -0500 Subject: [PATCH] added ZSTD Layer --- .gitignore | 3 +- common/common.go | 34 +++-- fix/fix.go | 258 +++++++++++++++++++++++++++++++ go.mod | 1 + main.go | 47 +++++- storageserver/storageserver.go | 270 +++++++++++++++++++++++---------- zstd-tar-test/tartest.go | 76 ++++++++++ 7 files changed, 586 insertions(+), 103 deletions(-) create mode 100644 fix/fix.go create mode 100644 zstd-tar-test/tartest.go diff --git a/.gitignore b/.gitignore index 9431c61..6efbedf 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ temp/ main storageserver/storageserver gmad_linux -.vscode/ \ No newline at end of file +.vscode/ +storageserver/test/ \ No newline at end of file diff --git a/common/common.go b/common/common.go index b7c6806..6f4c28f 100644 --- a/common/common.go +++ b/common/common.go @@ -3,7 +3,6 @@ package common import ( "fmt" "io" - "io/ioutil" "mime/multipart" "net/http" "os" @@ -32,13 +31,14 @@ type DB_GMA struct { } type DB_File struct { - ID string `json:"_key"` - BatchID string `json:"batch"` - InitialPath string `json:"initialPath"` - Extension string `json:"extension"` - Size int64 `json:"size"` - CRC uint32 `json:"crc"` - Hash string `json:"hash"` + ID string `json:"_key"` + BatchID string `json:"batch"` + InitialPath string `json:"initialPath"` + Extension string `json:"extension"` + Created time.Time `json:"created"` + Size int64 `json:"size"` + CRC uint32 `json:"crc"` + Hash string `json:"hash"` } type DB_GMA2File struct { @@ -62,9 +62,10 @@ type DB_GMA2File struct { type DB_Chunk struct { ID string `json:"_key"` - NotReady bool `json:"notReady"` - Finalized bool `json:"finalized"` - ReadOnly bool `json:"readOnly"` + NotReady bool `json:"notReady"` + Finalized bool `json:"finalized"` + ReadOnly bool `json:"readOnly"` + Created time.Time `json:"created"` FileCount int `json:"fileCount"` Size int64 `json:"size"` @@ -77,13 +78,13 @@ type DB_File2Chunk struct { File string `json:"_from"` } -func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte) (err error) { +func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte, workerID string) (err error) { //fmt.Printf("\nMultipartUpload(%s, %s)\n", url, path) file, err := os.Open(path) if err != nil { return err } - fileContents, err := ioutil.ReadAll(file) + fileContents, err := io.ReadAll(file) if err != nil { return err } @@ -103,6 +104,11 @@ func MultipartUpload(client *http.Client, url string, path string, jsonBytes []b return } + err = writer.WriteField("worker", fmt.Sprintf("gma-%s", workerID)) + if err != nil { + return + } + part, err := writer.CreateFormFile("file", fi.Name()) if err != nil { return @@ -142,7 +148,7 @@ func MultipartUpload(client *http.Client, url string, path string, jsonBytes []b // Discard Response Body, to clean up tcp socket defer res.Body.Close() - _, err = io.Copy(ioutil.Discard, res.Body) + _, err = io.Copy(io.Discard, res.Body) if err != nil { return err } diff --git a/fix/fix.go b/fix/fix.go new file mode 100644 index 0000000..6879d91 --- /dev/null +++ b/fix/fix.go @@ -0,0 +1,258 @@ +package main + +import ( + "archive/tar" + "context" + "crypto/sha256" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + "time" + + "git.cheetah.cat/worksucc/gma-puzzles/common" + adriver "github.com/arangodb/go-driver" + ahttp "github.com/arangodb/go-driver/http" + "github.com/klauspost/compress/zstd" +) + +var ( + arangoDB adriver.Database + arangoCTX context.Context + colChunk adriver.Collection + colFile adriver.Collection + colFile2Chunk adriver.Collection +) + +func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { + log.Println("connectDB:", "Starting Connection Process...") + + // Retry Loop for Failed Connections + for i := 0; i < 6; i++ { + if i == 5 { + return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i) + } else if i > 0 { + time.Sleep(30 * time.Second) + } + + // Connect to ArangoDB URL + conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ + Endpoints: []string{baseURL}, + TLSConfig: &tls.Config{ /*...*/ }, + }) + if err != nil { + log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) + continue + } + + // Connect Driver to User + client, err := adriver.NewClient(adriver.ClientConfig{ + Connection: conn, + Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), + }) + if err != nil { + log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) + continue + } + + // Create Context for Database Access + ctx = context.Background() + driver, err = client.Database(ctx, arangoDatabase) + if err != nil { + log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) + continue + } + log.Println("connectDB:", "Connection Sucessful!") + return driver, ctx, nil + } + + return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") +} +func InitDatabase() (err error) { + arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") + if err != nil { + return err + } + + colChunk, err = arangoDB.Collection(arangoCTX, "chunk") + if err != nil { + return err + } + colFile, err = arangoDB.Collection(arangoCTX, "file") + if err != nil { + return err + } + colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") + if err != nil { + return err + } + return nil +} + +type PoolRecoveryData struct { + PoolID string `json:"_key"` + Size uint64 `json:"size"` + Created time.Time `json:"date"` + Hash string `json:"hash"` + + ItemCount int `json:"itemCount"` + Items []string `json:"items"` + RecoveryData []common.DB_File `json:"recoveryData"` +} + +func bla() error { + entries, err := os.ReadDir("/mnt/SC9000/storagePools/") + if err != nil { + return err + } + for _, e := range entries { + if !e.IsDir() { + fmt.Printf("Scanning For Local Pools, found %s:", e.Name()) + + tarFinalPath := filepath.Join("/mnt/SC9000/storagePools/", e.Name()) + stats, err := os.Stat(tarFinalPath) + if err != nil { + return err + } + parts := strings.Split(e.Name(), ".") + var chunk common.DB_Chunk + _, err = colChunk.ReadDocument(arangoCTX, parts[0], &chunk) + if err != nil { + return err + } + chunk.Finalized = true + chunk.NotReady = false + chunk.ReadOnly = true + chunk.Size = stats.Size() + + zstFile, err := os.Open(tarFinalPath) + if err != nil { + return err + } + shaHasher := sha256.New() + _, err = io.Copy(shaHasher, zstFile) + if err != nil { + return err + } + + jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0])) + + _, err = os.Stat(jsonPath) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + + // rewrite json from db + + zstFile.Seek(0, 0) + decompressor, err := zstd.NewReader(zstFile) + if err != nil { + panic(err) + } + defer decompressor.Close() + + items := []string{} + tarReader := tar.NewReader(decompressor) + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return err + } + items = append(items, header.Name) + } + + poolRecoveryData := PoolRecoveryData{ + PoolID: parts[0], + Size: uint64(stats.Size()), + Created: time.Now(), + Hash: fmt.Sprintf("%x", shaHasher.Sum(nil)), + ItemCount: 500, + Items: items, + //RecoveryData, + } + chunk.Hash = poolRecoveryData.Hash + + //TODO: fetch RecoveryData from DB + poolRecoveryData.RecoveryData = make([]common.DB_File, len(items)) + _, _, err = colFile.ReadDocuments(arangoCTX, items, poolRecoveryData.RecoveryData) + if err != nil { + return fmt.Errorf("error @ReadDocuments %v", err) + } + + json, err := json.MarshalIndent(poolRecoveryData, "", "\t") + if err != nil { + return fmt.Errorf("error @json.MarshalIndent %v", err) + } + + recoveryFile, err := os.Create(jsonPath) + if err != nil { + return err + } + _, err = recoveryFile.Write(json) + if err != nil { + return fmt.Errorf("error @recoveryFile.Write %v", err) + } + } + } else { + var poolRecoveryData PoolRecoveryData + + readJSONFile, err := os.Open(jsonPath) + if err != nil { + return err + } + defer readJSONFile.Close() + + readBytes, err := io.ReadAll(readJSONFile) + if err != nil { + return err + } + err = json.Unmarshal(readBytes, &poolRecoveryData) + if err != nil { + return err + } + + poolRecoveryData.Size = uint64(stats.Size()) + poolRecoveryData.Created = time.Now() + poolRecoveryData.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil)) + chunk.Hash = poolRecoveryData.Hash + + json, err := json.MarshalIndent(poolRecoveryData, "", "\t") + if err != nil { + return fmt.Errorf("error @json.MarshalIndent %v", err) + } + recoveryFile, err := os.Create(jsonPath) + if err != nil { + return err + } + _, err = recoveryFile.Write(json) + if err != nil { + return fmt.Errorf("error @recoveryFile.Write %v", err) + } + } + _, err = colChunk.UpdateDocument(arangoCTX, parts[0], &chunk) + if err != nil { + return err + } + + } + } + return nil +} +func main() { + err := InitDatabase() + if err != nil { + panic(err) + } + + err = bla() + if err != nil { + panic(err) + } +} diff --git a/go.mod b/go.mod index e33d182..020e1c7 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/djherbis/times v1.5.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect + github.com/klauspost/compress v1.16.6 // indirect github.com/labstack/echo v3.3.10+incompatible // indirect github.com/labstack/echo/v4 v4.10.2 // indirect github.com/labstack/gommon v0.4.0 // indirect diff --git a/main.go b/main.go index 09385e1..3f80fd4 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "git.cheetah.cat/worksucc/gma-puzzles/common" "git.cheetah.cat/worksucc/gma-puzzles/gma" + "github.com/arangodb/go-driver" adriver "github.com/arangodb/go-driver" ahttp "github.com/arangodb/go-driver/http" "github.com/schollz/progressbar/v3" @@ -35,6 +36,8 @@ var ( //colFile2Chunk adriver.Collection colGMA adriver.Collection colGMA2File adriver.Collection + + workerID string ) func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { @@ -117,12 +120,14 @@ var GlobalWriteLock sync.Mutex func main() { folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string") skipNameP := flag.String("skip", "", "skip until this name") + workerNameP := flag.String("worker", "def", "worker name") + debug.SetMemoryLimit(6e9) flag.Parse() folderPath := *folderPathP skipName := *skipNameP skipNameEnabled := len(skipName) > 0 - + workerID = *workerNameP go func() { log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) }() @@ -191,7 +196,7 @@ func main() { } func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { - log.Printf("undoBatch(%x, %s)\n", undoBatch, gmaID) + log.Printf("undoBatch(%v, %s)\n", undoBatch, gmaID) /* _, err = colGMA.RemoveDocument(arangoCTX, gmaID) if err != nil { @@ -220,7 +225,7 @@ func ProcessGMA(filePath string) (err error) { fmt.Println("aquired global write lock") defer unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway defer fmt.Println("unlocking GlobalWriteLock") - time.Sleep(5 * time.Second) + //time.Sleep(5 * time.Second) var ( fileIDs []string @@ -231,6 +236,31 @@ func ProcessGMA(filePath string) (err error) { dboGMA.OriginalPath = filePath dboGMA.ProcessingStart = time.Now() + + cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR g IN gma FILTER g.originalPath == '%s' RETURN g", dboGMA.OriginalPath), nil) + if err != nil { + return err + } + + defer cursor.Close() + skipHashCheck := false + if cursor.Count() > 0 || cursor.HasMore() { + for { + gma := common.DB_GMA{} + _, err = cursor.ReadDocument(arangoCTX, &gma) + if driver.IsNoMoreDocuments(err) { + break + } else if err != nil { + return err + } + if gma.Success { + return fmt.Errorf("GMA with ID %s was successfull at %v", gma.ID, gma.ProcessingEnd) + } else { + skipHashCheck = true + } + } + } + fileStat, err := os.Stat(filePath) if err != nil { return err @@ -241,6 +271,7 @@ func ProcessGMA(filePath string) (err error) { if dboGMA.GMASize < 200 { return fmt.Errorf("GMA File too small, skipping") } + log.Printf("Opening %s\n", filePath) gmaReader, err := gma.NewReader(filePath) if err != nil { @@ -262,9 +293,15 @@ func ProcessGMA(filePath string) (err error) { if err != nil { return err } - if dboIDExists { + if dboIDExists && !skipHashCheck { return fmt.Errorf("GMA with ID %s exists", dboGMA.ID) } + if dboIDExists && skipHashCheck { + _, err = colGMA.RemoveDocument(arangoCTX, dboGMA.ID) + if err != nil { + return err + } + } header, err := gmaReader.ReadHeader() if err != nil { @@ -444,7 +481,7 @@ func ProcessGMA(filePath string) (err error) { } uploadBar.Describe("Uploading") - err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON) + err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON, workerID) if err != nil { log.Println("err @common.MultipartUpload") log.Println(err) diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index 94cef95..8487ceb 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -22,6 +22,7 @@ import ( "git.cheetah.cat/worksucc/gma-puzzles/common" adriver "github.com/arangodb/go-driver" ahttp "github.com/arangodb/go-driver/http" + "github.com/klauspost/compress/zstd" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/twinj/uuid" @@ -68,7 +69,7 @@ type PoolFile struct { type PoolMaster struct { cachePath string finalPath string - CurrentPool *Pool + CurrentPool map[string]*Pool lock sync.Mutex WORMLock sync.Mutex @@ -175,7 +176,14 @@ func (p *Pool) OpenTar() (err error) { return err } p.items = []string{} - p.tarReader = tar.NewReader(p.file) + + decompressor, err := zstd.NewReader(p.file, zstd.WithDecoderConcurrency(8)) + if err != nil { + panic(err) + } + defer decompressor.Close() + + p.tarReader = tar.NewReader(decompressor) for { header, err := p.tarReader.Next() if err == io.EOF { @@ -229,6 +237,7 @@ func (p *Pool) Unload() { func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { poolMaster.finalPath = finalPath poolMaster.cachePath = cachePath + poolMaster.CurrentPool = make(map[string]*Pool) poolMaster.WORMPools = make(map[string]*Pool) //poolMaster.lock = sync.Mutex{} @@ -266,30 +275,33 @@ func (p *PoolMaster) NewPool() (*Pool, error) { return &pool, nil } -func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) { +func (p *PoolMaster) GetCurrentWriteablePool(workerID string) (pool *Pool, err error) { //fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) - if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems { + if p.CurrentPool[workerID] != nil && p.CurrentPool[workerID].itemCount >= PoolMaxItems { fmt.Printf("Aquiring Lock for GetCurrentWriteablepool\n") p.lock.Lock() defer p.lock.Unlock() defer fmt.Printf("unlock GetCurrentWriteablepool\n") fmt.Printf("Aquired Lock success for GetCurrentWriteablepool\n") - p.CurrentPool.ReadOnly = true - p.FullPools = append(p.FullPools, p.CurrentPool) + + p.CurrentPool[workerID].ReadOnly = true + p.CurrentPool[workerID].LastTouchy = time.Now() + p.FullPools = append(p.FullPools, p.CurrentPool[workerID]) // queue for compression - fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool.PoolID, p.CurrentPool.itemCount) - p.CurrentPool = nil + fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool[workerID].PoolID, p.CurrentPool[workerID].itemCount) + p.CurrentPool[workerID] = nil } - if p.CurrentPool == nil { + if p.CurrentPool[workerID] == nil { fmt.Printf("Creating new Pool") - p.CurrentPool, err = p.AcquireNewOrRecoverPool() - fmt.Printf("... got [%s]\n", p.CurrentPool.PoolID) + p.CurrentPool[workerID], err = p.AcquireNewOrRecoverPool() + err := os.WriteFile(filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", p.CurrentPool[workerID].PoolID)), []byte(workerID), os.ModePerm) if err != nil { return pool, err } + fmt.Printf("... got [%s]\n", p.CurrentPool[workerID].PoolID) } - return p.CurrentPool, nil + return p.CurrentPool[workerID], nil } func RestorePoolFromFolder(folderPath string) (pool *Pool, err error) { pool = &Pool{} @@ -324,7 +336,7 @@ func (p *PoolMaster) ScanForLocalPools() (err error) { if e.IsDir() { fmt.Printf("Scanning For Local Pools, found %s:", e.Name()) - tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", e.Name())) + tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", e.Name())) _, err = os.Stat(tarFinalPath) finalPathExists := false if err != nil { @@ -358,8 +370,27 @@ func (p *PoolMaster) ScanForLocalPools() (err error) { if err != nil { return err } + + workerCacheFileName := filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", e.Name())) fmt.Printf("is readonly %v itemCount=%d\n", restoredPool.ReadOnly, restoredPool.itemCount) - p.LocalPools = append(p.LocalPools, restoredPool) + if restoredPool.itemCount == 500 { + p.LocalPools = append(p.LocalPools, restoredPool) + } else { + _, err = os.Stat(workerCacheFileName) //if we have a worker assingment file + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + // if not exists + p.LocalPools = append(p.LocalPools, restoredPool) + } else { + workerBytes, err := os.ReadFile(workerCacheFileName) + if err != nil { + return err + } + p.CurrentPool[string(workerBytes)] = restoredPool + } + } } } return nil @@ -373,6 +404,7 @@ func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error) Hash: packResult.Hash, Size: packResult.Size, FileCount: packResult.FileCount, + Created: time.Now(), NotReady: true, ReadOnly: true, Finalized: true, @@ -429,7 +461,7 @@ func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error) func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) { startTime := time.Now() - targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", packResult.PoolID)) + targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", packResult.PoolID)) err = common.MoveFile(packResult.outputFileName, targetFileName) if err != nil { return err @@ -501,19 +533,25 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err startTime := time.Now() packResult.PoolID = poolID - packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID)) + packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar.zst", poolID)) tarFile, err := os.Create(packResult.outputFileName) if err != nil { - return packResult, err + return packResult, fmt.Errorf("os.Create: %v", err) } defer tarFile.Close() - tw := tar.NewWriter(tarFile) + compressor, err := zstd.NewWriter(tarFile, zstd.WithEncoderLevel(4)) + if err != nil { + return packResult, fmt.Errorf("zstd.NewWriter: %v", err) + } + defer compressor.Close() + + tw := tar.NewWriter(compressor) defer tw.Close() entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool", poolID)) if err != nil { - return packResult, err + return packResult, fmt.Errorf("os.ReadDir: %v", err) } //fmt.Printf("len(entries) == %d\n", len(entries)) if len(entries) != PoolMaxItems { @@ -524,13 +562,13 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err file, err := os.Open(originalPath) if err != nil { - return packResult, err + return packResult, fmt.Errorf("for os.Open: %v", err) } defer file.Close() info, err := file.Stat() if err != nil { - return packResult, err + return packResult, fmt.Errorf("for fs.Stat: %v", err) } tarFileHeader, err := tar.FileInfoHeader(info, info.Name()) @@ -548,47 +586,88 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err } packResult.FileCount++ packResult.Files = append(packResult.Files, e.Name()) + fmt.Printf("*") } - err = tw.Flush() + err = tw.Close() + if err != nil { + return packResult, err + } + err = compressor.Close() if err != nil { return packResult, err } - err = tarFile.Close() if err != nil { return packResult, err } // re-open and check + { + tarFileCheck, err := os.Open(packResult.outputFileName) + if err != nil { + return packResult, err + } + defer tarFileCheck.Close() - tarFileCheck, err := os.Open(packResult.outputFileName) - if err != nil { - return packResult, err - } - defer tarFileCheck.Close() + shaHasher := sha256.New() + hashedBytes, err := io.Copy(shaHasher, tarFileCheck) + if err != nil { + return packResult, err + } + packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil)) + fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash) + + tarFileCheck.Seek(0, 0) + // validate written tar-chunk + + decompressor, err := zstd.NewReader(tarFileCheck, zstd.WithDecoderConcurrency(8)) + if err != nil { + panic(err) + } + defer decompressor.Close() + tarFileCheckReader := tar.NewReader(decompressor) + + //filenamesReadBackList := []string{} + for { + header, err := tarFileCheckReader.Next() + //header.PAXRecords + if err == io.EOF { + break + } + if err != nil { + return packResult, err + } + hasher := sha256.New() + hashedBytes, err := io.Copy(hasher, tarFileCheckReader) + if err != nil { + return packResult, err + } + readBackChecksum := fmt.Sprintf("%x", hasher.Sum(nil)) + if hashedBytes != header.Size { + return packResult, fmt.Errorf("validation on output archive, incorrect size file %s has %d should be %d", header.Name, hashedBytes, header.Size) + } + if header.Name != readBackChecksum { + return packResult, fmt.Errorf("validation on output archive, incorrect checksum file %s has %s", header.Name, readBackChecksum) + } + //filenamesReadBackList = append(filenamesReadBackList, header.Name) + } + + packFileStats, err := tarFileCheck.Stat() + if err != nil { + return packResult, err + } + packResult.Size = packFileStats.Size() + if hashedBytes != packResult.Size { + return packResult, fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, packResult.Size) + } + fmt.Printf("PackPool Duration %dms\n", time.Since(startTime).Milliseconds()) - shaHasher := sha256.New() - hashedBytes, err := io.Copy(shaHasher, tarFileCheck) - if err != nil { - return packResult, err } - packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil)) - fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash) // TODO: write index json describing the files inside, pure hash list, aswell // as dictionary containing all the infos for restore/disaster-recovery into Arango - packFileStats, err := tarFileCheck.Stat() - if err != nil { - return packResult, err - } - packResult.Size = packFileStats.Size() - if hashedBytes != packResult.Size { - return packResult, fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, packResult.Size) - } - fmt.Printf("PackPool Duration %dms\n", time.Since(startTime).Milliseconds()) - return packResult, nil } @@ -603,10 +682,12 @@ func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { } func (p *PoolMaster) Lookup(id string) (exists bool) { - if p.CurrentPool != nil { // CurrentPool - for _, poolItem := range p.CurrentPool.items { - if poolItem == id { - return true + for _, cp := range p.CurrentPool { + if cp != nil { // CurrentPool + for _, poolItem := range cp.items { + if poolItem == id { + return true + } } } } @@ -643,13 +724,15 @@ func (p *PoolMaster) Lookup(id string) (exists bool) { func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writer) (err error) { fmt.Printf("FetchLoadWORM(chunkID %s, fileID %s, ...)\n", chunkID, fileID) // search within loaded worm-pools + //fmt.Println("WormPool For Start") for wormID, wormPool := range p.WORMPools { + //fmt.Printf("WORMPool[%s] for-iter\n", wormID) if wormID != chunkID { continue } for _, poolItem := range wormPool.items { if poolItem == fileID { - fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID) + //fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID) return wormPool.Fetch(fileID, writer) } } @@ -660,11 +743,11 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ // TODO: every Method here should have locked the WORMLock!! /* - fmt.Printf("Aquiring WORMLock for FetchLoadWORM\n") - p.WORMLock.Lock() - defer p.WORMLock.Unlock() - defer fmt.Printf("unlock WORMLock FetchLoadWORM\n") - fmt.Printf("Aquired WORMLock success for FetchLoadWORM\n") + fmt.Printf("Aquiring WORMLock for FetchLoadWORM\n") + p.WORMLock.Lock() + defer p.WORMLock.Unlock() + 0 defer fmt.Printf("unlock WORMLock FetchLoadWORM\n") + fmt.Printf("Aquired WORMLock success for FetchLoadWORM\n") */ fmt.Printf("Aquiring Lock for FetchLoadWORM\n") @@ -685,15 +768,17 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ Finalized: dboChunk.Finalized, LastTouchy: time.Now(), - filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", dboChunk.ID)), + filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", dboChunk.ID)), } - fmt.Println("initialized loadedWormPool, Opening tar...") + fmt.Printf("initialized loadedWormPool (%s), Opening tar...\n", dboChunk.ID) err = loadedWormPool.OpenTar() if err != nil { + fmt.Println(err) return err } - fmt.Println("extracted") + fmt.Printf("extracted and key = %s\n", loadedWormPool.PoolID) p.WORMPools[loadedWormPool.PoolID] = &loadedWormPool + fmt.Println(p.WORMPools) return loadedWormPool.Fetch(fileID, writer) //return nil } @@ -714,24 +799,26 @@ func (p *PoolMaster) CleanWORMTemp() (err error) { } func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { - if p.CurrentPool != nil { - for _, poolItem := range p.CurrentPool.items { - if poolItem == id { - fmt.Printf("Fetch CurrentPool %s\n", id) - poolLocalFilePath := filepath.Join(p.cachePath, "pool", p.CurrentPool.PoolID, id) - //fmt.Println(poolLocalFilePath) - //fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem) - srcLocalFile, err := os.Open(poolLocalFilePath) - if err != nil { - return err - } - //fmt.Println("Closer") - defer srcLocalFile.Close() - //fmt.Println("io.Copy") - if _, err = io.Copy(writer, srcLocalFile); err != nil { - return err + for _, cp := range p.CurrentPool { + if cp != nil { + for _, poolItem := range cp.items { + if poolItem == id { + fmt.Printf("Fetch CurrentPool %s\n", id) + poolLocalFilePath := filepath.Join(p.cachePath, "pool", cp.PoolID, id) + //fmt.Println(poolLocalFilePath) + //fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem) + srcLocalFile, err := os.Open(poolLocalFilePath) + if err != nil { + return err + } + //fmt.Println("Closer") + defer srcLocalFile.Close() + //fmt.Println("io.Copy") + if _, err = io.Copy(writer, srcLocalFile); err != nil { + return err + } + return nil } - return nil } } } @@ -750,6 +837,7 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { for _, poolItem := range fullPool.items { if poolItem == id { fmt.Printf("Fetch FullPool %s\n", id) + fullPool.LastTouchy = time.Now() poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { @@ -800,9 +888,8 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { } return nil } -func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err error) { - - pool, err := p.GetCurrentWriteablePool() +func (p *PoolMaster) Store(id string, workerID string, src io.Reader, targetSize int64) (err error) { + pool, err := p.GetCurrentWriteablePool(workerID) if err != nil { return err } @@ -899,26 +986,34 @@ func main() { var deletedPools []string for _, fullPool := range poolMaster.FullPools { + if time.Since(fullPool.LastTouchy).Minutes() < 1 { + continue + } packResult, err := poolMaster.PackPool(fullPool.PoolID) if err != nil { - panic(err) + panic(fmt.Errorf("error @PackPool: %v", err)) } err = poolMaster.ImportPoolPackResult(packResult) if err != nil { - panic(err) + panic(fmt.Errorf("error @ImportPoolPackResult: %v", err)) } err = poolMaster.MovePoolPackToWORM(packResult) if err != nil { - panic(err) + panic(fmt.Errorf("error @MovePoolPackToWORM: %v", err)) } err = poolMaster.WriteRecoveryFile(packResult, fullPool) if err != nil { - panic(err) + panic(fmt.Errorf("error @WriteRecoveryFile: %v", err)) } os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) deletedPools = append(deletedPools, fullPool.PoolID) + _, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{ - NotReady: false, + NotReady: false, + Finalized: true, + ReadOnly: true, + Hash: packResult.Hash, + Size: packResult.Size, }) if err != nil { panic(err) @@ -966,6 +1061,10 @@ func main() { if err != nil { panic(err) } + err = poolMaster.WriteRecoveryFile(packResult, localPool) + if err != nil { + panic(err) + } os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) } //os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) @@ -981,6 +1080,9 @@ func main() { e.GET("/", func(c echo.Context) error { return c.String(http.StatusOK, "Hello, World!") }) + e.GET("/pmdump", func(c echo.Context) error { + return c.JSON(http.StatusOK, poolMaster) + }) e.GET("/check/:id", func(c echo.Context) error { id := c.Param("id") @@ -1021,6 +1123,7 @@ func main() { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } + workerID := c.FormValue("worker") infoJSON := c.FormValue("info") fileInfo := common.DB_File{} err = json.Unmarshal([]byte(infoJSON), &fileInfo) @@ -1061,13 +1164,14 @@ func main() { return c.String(http.StatusExpectationFailed, "Error") } defer formStream.Close() - err = poolMaster.Store(id, formStream, sizeVal) + err = poolMaster.Store(id, workerID, formStream, sizeVal) if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } fmt.Println("...stashed") fmt.Println(fileInfo) + fileInfo.Created = time.Now() _, err = colFile.CreateDocument(arangoCTX, fileInfo) if err != nil { fmt.Println(err) diff --git a/zstd-tar-test/tartest.go b/zstd-tar-test/tartest.go new file mode 100644 index 0000000..59505f4 --- /dev/null +++ b/zstd-tar-test/tartest.go @@ -0,0 +1,76 @@ +package main + +import ( + "archive/tar" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/klauspost/compress/zstd" +) + +func main() { + tarFile, err := os.Create("test.tar.zst") + if err != nil { + panic(err) + } + defer tarFile.Close() + + compressor, err := zstd.NewWriter(tarFile, zstd.WithEncoderLevel(4)) + if err != nil { + panic(err) + } + defer compressor.Close() + + tw := tar.NewWriter(compressor) + defer tw.Close() + + entries, err := os.ReadDir("/home/cheetah/dev/gma-puzzles/zstd-tar-test/testpayload") + if err != nil { + panic(err) + } + + for _, e := range entries { + originalPath := filepath.Join("/home/cheetah/dev/gma-puzzles/zstd-tar-test/testpayload", e.Name()) + + file, err := os.Open(originalPath) + if err != nil { + panic(err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + panic(err) + } + + tarFileHeader, err := tar.FileInfoHeader(info, info.Name()) + if err != nil { + panic(err) + } + err = tw.WriteHeader(tarFileHeader) + if err != nil { + panic(err) + } + + _, err = io.Copy(tw, file) + if err != nil { + panic(err) + } + fmt.Println(info.Name()) + } + + err = tw.Close() + if err != nil { + panic(err) + } + err = compressor.Close() + if err != nil { + panic(err) + } + err = tarFile.Close() + if err != nil { + panic(err) + } +}