From 4ed5059ec68fc62a7a82930636f949daca972d33 Mon Sep 17 00:00:00 2001 From: cheetah Date: Mon, 26 Jun 2023 06:32:36 -0500 Subject: [PATCH] improved, fuckup handling --- go.mod | 7 +- go.sum | 17 +++ main.go | 207 +++++++++++++++++++++++---------- storageserver/storageserver.go | 11 +- 4 files changed, 179 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index d16357f..e33d182 100644 --- a/go.mod +++ b/go.mod @@ -13,13 +13,18 @@ require ( github.com/labstack/gommon v0.4.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/rivo/uniseg v0.4.4 // indirect + github.com/schollz/progressbar/v3 v3.13.1 // indirect github.com/twinj/uuid v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/net v0.7.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/sys v0.9.0 // indirect + golang.org/x/term v0.9.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/time v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index ce6fa57..806dd6c 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,7 @@ github.com/djherbis/times v1.5.0 h1:79myA211VwPhFTqUk8xehWrsEO+zcIZj0zT8mXPVARU= github.com/djherbis/times v1.5.0/go.mod h1:5q7FDLvbNg1L/KaBmPcWlVR9NmoKo3+ucqUA3ijQhA0= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8bbnE7CX5OEgg= github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s= github.com/labstack/echo/v4 v4.10.2 h1:n1jAhnq/elIFTHr1EYpiYtyKgx4RW9ccVgkqByZaN2M= @@ -23,10 +24,20 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/schollz/progressbar/v3 v3.13.1 h1:o8rySDYiQ59Mwzy2FELeHY5ZARXZTVJC7iHD6PEFUiE= +github.com/schollz/progressbar/v3 v3.13.1/go.mod h1:xvrbki8kfT1fzWzBT/UZd9L6GA+jdL7HAgq2RFnO6fQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/twinj/uuid v1.0.0 h1:fzz7COZnDrXGTAOHGuUGYd6sG+JMq+AoE7+Jlu0przk= @@ -46,6 +57,12 @@ golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= +golang.org/x/term v0.9.0 h1:GRRCnKYhdQrD8kfRAdQ6Zcw1P0OcELxGLKJvtjVMZ28= +golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= diff --git a/main.go b/main.go index 8a8d3ef..9f19d9d 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "flag" "fmt" + "io" "log" "net/http" "os" @@ -18,6 +19,7 @@ import ( "git.cheetah.cat/worksucc/gma-puzzles/gma" adriver "github.com/arangodb/go-driver" ahttp "github.com/arangodb/go-driver/http" + "github.com/schollz/progressbar/v3" "github.com/twinj/uuid" _ "net/http/pprof" @@ -143,7 +145,7 @@ func main() { //gma, err := gma.NewReader("2143898000.1593250551.bin.gma") //folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/" - folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/" + //folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/" // entries, err := os.ReadDir(folderPath) if err != nil { @@ -181,7 +183,7 @@ func main() { //panic(err) continue } - os.Rename(jobFile, filepath.Join(folderPathTarget, filepath.Base(jobFile))) + //os.Rename(jobFile, filepath.Join(folderPathTarget, filepath.Base(jobFile))) // sem.Release() // Release the slot // wg.Done() // Finish job @@ -247,7 +249,7 @@ func ProcessGMA(filePath string) (err error) { dboGMA.ID = dboGMA.GMAHash gmaReader.FileHandle.Seek(0, 0) - gmaTempPath := filepath.Join("/home/cheetah/dev/gma-puzzles/temp", dboGMA.ID) + gmaTempPath := filepath.Join("/mnt/ramfs/gma-extr-temp", dboGMA.ID) defer os.RemoveAll(gmaTempPath) // clean up under any circumstances dboIDExists, err := colGMA.DocumentExists(arangoCTX, dboGMA.ID) @@ -278,13 +280,17 @@ func ProcessGMA(filePath string) (err error) { dboGMA2Files []common.DB_GMA2File dboFiles []common.DB_File ) + for _, file := range files { + if file.FileSize < 0 { // Something is fucked + return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) + } //fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber) if file.NextType > uint32(file.FileNumber+10) { // Something is fucked - log.Printf("Current Cursor %d", gmaReader.GetOffset()) + /*log.Printf("Current Cursor %d", gmaReader.GetOffset()) for _, otherFile := range files[file.FileNumber:] { log.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber) - } + }*/ return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) } destPath := filepath.Join(gmaTempPath, "contents", file.FileName) @@ -357,83 +363,130 @@ func ProcessGMA(filePath string) (err error) { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } + /* + importStartTime := time.Now() - importStartTime := time.Now() - - var newUnknownFiles []string - chunkSize := 5 - for { - if len(dboFiles) == 0 { - break - } + var newUnknownFiles []string + chunkSize := 5 + for { + if len(dboFiles) == 0 { + break + } - // necessary check to avoid slicing beyond - // slice capacity - if len(dboFiles) < chunkSize { - chunkSize = len(dboFiles) - } + // necessary check to avoid slicing beyond + // slice capacity + if len(dboFiles) < chunkSize { + chunkSize = len(dboFiles) + } - // process and work withj - metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize]) + // process and work withj + metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize]) - for _, meta := range metaSlice { - if !meta.ID.IsEmpty() { - newUnknownFiles = append(newUnknownFiles, meta.Key) - fileIDs = append(fileIDs, meta.Key) + for _, meta := range metaSlice { + if !meta.ID.IsEmpty() { + newUnknownFiles = append(newUnknownFiles, meta.Key) + fileIDs = append(fileIDs, meta.Key) + } + } + //fmt.Println("ErrorSlice") + //fmt.Println(errorSlice) + for _, createError := range errorSlice { + if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") { + } else if createError != nil { + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return createError + } } + + dboFiles = dboFiles[chunkSize:] } - //fmt.Println("ErrorSlice") - //fmt.Println(errorSlice) - for _, createError := range errorSlice { - if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") { - } else if createError != nil { - undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) - return createError + log.Println() + log.Printf("Imported dboFiles into Arango and now we have %d new files from %d addon files\n", len(newUnknownFiles), len(files)) + deltaFileSize := int64(0) + for _, unknownFile := range newUnknownFiles { + unknownFileID := fmt.Sprintf("file/%s", unknownFile) + for _, dboGMA2File := range dboGMA2Files { + if unknownFileID == dboGMA2File.File { + deltaFileSize += dboGMA2File.FileSize + } } } + dboGMA.OptimizedSize = deltaFileSize + log.Printf("Delta Storage %d bytes\n", deltaFileSize) + */ + /* + _, err = colGMA2File.ImportDocuments(arangoCTX, dboGMA2Files, &adriver.ImportDocumentOptions{ + OnDuplicate: adriver.ImportOnDuplicateIgnore, + //FromPrefix: "gma/", + //ToPrefix: "file/", + Complete: true, // will make it fail if any error occurs (and hopefully reverse the trans-action) + }) + if err != nil { + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return fmt.Errorf("ImportDocuments File fail: %v", err) + } + //fmt.Printf("Code: %d, Created: %d, Ignored: %d, Errors: %d", statsImportGMA2File.Code, statsImportGMA2File.Created, statsImportGMA2File.Ignored, statsImportGMA2File.Errors) - dboFiles = dboFiles[chunkSize:] - } - log.Println() + log.Printf("Import Duration %dms\n", time.Since(importStartTime).Milliseconds()) + log.Println() + */ + // TODO: Check all dboFiles and dboGMA2Files if they exist, if something is odd, queue reupload - log.Printf("Imported dboFiles into Arango and now we have %d new files from %d addon files\n", len(newUnknownFiles), len(files)) - deltaFileSize := int64(0) - for _, unknownFile := range newUnknownFiles { - unknownFileID := fmt.Sprintf("file/%s", unknownFile) - for _, dboGMA2File := range dboGMA2Files { - if unknownFileID == dboGMA2File.File { - deltaFileSize += dboGMA2File.FileSize - } + dboExistFile := map[string]bool{} + dboExistFile2GMA := map[string]bool{} + for _, dboFile := range dboFiles { + exists, err := colFile.DocumentExists(arangoCTX, dboFile.ID) + if err != nil { + return err } + dboExistFile[dboFile.ID] = exists } - dboGMA.OptimizedSize = deltaFileSize - log.Printf("Delta Storage %d bytes\n", deltaFileSize) - - _, err = colGMA2File.ImportDocuments(arangoCTX, dboGMA2Files, &adriver.ImportDocumentOptions{ - OnDuplicate: adriver.ImportOnDuplicateIgnore, - //FromPrefix: "gma/", - //ToPrefix: "file/", - Complete: true, // will make it fail if any error occurs (and hopefully reverse the trans-action) - }) - if err != nil { - undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) - return fmt.Errorf("ImportDocuments File fail: %v", err) + for _, dboGMA2File := range dboGMA2Files { + exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID) + if err != nil { + return err + } + dboExistFile2GMA[dboGMA2File.ID] = exists } - //fmt.Printf("Code: %d, Created: %d, Ignored: %d, Errors: %d", statsImportGMA2File.Code, statsImportGMA2File.Created, statsImportGMA2File.Ignored, statsImportGMA2File.Errors) - log.Printf("Import Duration %dms\n", time.Since(importStartTime).Milliseconds()) - log.Println() // TODO: upload all unknownNewFiles to StorageServer http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200 var httpClient *http.Client = http.DefaultClient + uploadBar := progressbar.Default(int64(len(dboFiles)), "Uploading to StorageServer") + for _, dboFile := range dboFiles { + dboFileID := fmt.Sprintf("file/%s", dboFile.ID) + //dboFile2ChunkID := fmt.Sprintf("file_chunk_map/%s", dboFile.ID) + + // TODO: Check against Storage backend + res, err := http.Get(fmt.Sprintf("http://127.0.0.1:13371/check/%s", dboFile.ID)) + if err != nil { + return err + } + defer res.Body.Close() + if _, err = io.Copy(io.Discard, res.Body); err != nil { + return err + } + //body, _ := ioutil.ReadAll(res.Body) + if res.StatusCode == http.StatusAlreadyReported { + uploadBar.Add(1) + uploadBar.Describe("Skipping") + continue + } - for _, unknownFile := range newUnknownFiles { - unknownFileID := fmt.Sprintf("file/%s", unknownFile) for _, dboGMA2File := range dboGMA2Files { - if unknownFileID == dboGMA2File.File { + if dboFileID == dboGMA2File.File { // find corresponding dboGMA2File uploadSuccess := true for { - log.Printf("Uploading %s to Storage\n", dboGMA2File.UploadID) + //log.Printf("Uploading %s to Storage\n", dboGMA2File.UploadID) + if !dboExistFile[dboFile.ID] { + _, err := colFile.CreateDocument(arangoCTX, dboFile) + if err != nil { + // TODO: error handling + return err + } + } + + uploadBar.Describe("Uploading") err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) if err != nil { log.Println(err) @@ -441,11 +494,41 @@ func ProcessGMA(filePath string) (err error) { uploadSuccess = false } else { log.Println("oopsie") + // remove fileObject + if !dboExistFile[dboFile.ID] { + _, _ = colFile.RemoveDocument(arangoCTX, dboFile.ID) + } undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } } if uploadSuccess { + // Create File and dboGMA2File Object + if !dboExistFile2GMA[dboGMA2File.ID] { + exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID) + if err != nil { + log.Println("oopsie") + // remove fileObject + if !dboExistFile[dboFile.ID] { // if the file did not exist prior to this + _, _ = colFile.RemoveDocument(arangoCTX, dboFile.ID) + } + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return err + } + if !exists { + _, err = colGMA2File.CreateDocument(arangoCTX, dboGMA2File) + if err != nil { + log.Println("oopsie") + // remove fileObject + if !dboExistFile[dboFile.ID] { // if the file did not exist prior to this + _, _ = colFile.RemoveDocument(arangoCTX, dboFile.ID) + } + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return err + } + } + } + uploadBar.Add(1) break } time.Sleep(10 * time.Second) @@ -458,6 +541,7 @@ func ProcessGMA(filePath string) (err error) { // TODO : compare hashes { log.Println("rewriting gma") + rewriteBar := progressbar.Default(int64(len(dboGMA2Files)), "Rewriting GMA") destPath := filepath.Join(gmaTempPath, "rewrite.gma") dir := filepath.Dir(destPath) @@ -512,6 +596,7 @@ func ProcessGMA(filePath string) (err error) { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } + rewriteBar.Add(1) } gmaWriter.FileHandle.Seek(0, 2) log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index 4cbf18a..0c1f4cd 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -643,7 +643,7 @@ func (p *PoolMaster) CleanWORMTemp() (err error) { defer p.lock.Unlock() for _, wormPool := range p.WORMPools { - if time.Since(wormPool.LastTouchy).Minutes() > 1 { + if time.Since(wormPool.LastTouchy).Minutes() > 4 { wormPool.Unload() delete(p.WORMPools, wormPool.PoolID) os.RemoveAll(filepath.Join(poolMaster.cachePath, "worm", wormPool.PoolID)) @@ -903,6 +903,15 @@ func main() { return c.String(http.StatusOK, "Hello, World!") }) + e.GET("/check/:id", func(c echo.Context) error { + id := c.Param("id") + exists := poolMaster.Lookup(id) + if exists { + return c.JSON(http.StatusAlreadyReported, exists) + } + return c.JSON(http.StatusOK, exists) + }) + e.GET("/fetch/:id", func(c echo.Context) error { id := c.Param("id") exists := poolMaster.Lookup(id)