From 4d6606d3925413f63c70148b6e943678a30a47e0 Mon Sep 17 00:00:00 2001 From: cheetah Date: Wed, 23 Aug 2023 12:14:19 -0500 Subject: [PATCH] commit before server-exitus lefuckalitis --- common/common.go | 6 ++ integritycheck/integritycheck.go | 8 +- main.go | 140 +++++++++++++++++++++++++++++-- storageserver/storageserver.go | 46 +++++++++- 4 files changed, 184 insertions(+), 16 deletions(-) diff --git a/common/common.go b/common/common.go index 3233a84..059d036 100644 --- a/common/common.go +++ b/common/common.go @@ -40,6 +40,7 @@ type DB_File struct { Size int64 `json:"size"` CRC uint32 `json:"crc"` Hash string `json:"hash"` + G2FRef string `json:"-"` } type DB_GMA2File struct { @@ -79,6 +80,11 @@ type DB_File2Chunk struct { File string `json:"_from"` } +type JSON_GMARecovery struct { + GMA DB_GMA `json:"gma"` + Refs []DB_GMA2File `json:"refs"` +} + func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte, workerID string) (err error) { //fmt.Printf("\nMultipartUpload(%s, %s)\n", url, path) file, err := os.Open(path) diff --git a/integritycheck/integritycheck.go b/integritycheck/integritycheck.go index 1595149..1baaf91 100644 --- a/integritycheck/integritycheck.go +++ b/integritycheck/integritycheck.go @@ -119,7 +119,7 @@ func bla() error { sem := common.NewSemaphore(ConcurrencyLimit) wg := sync.WaitGroup{} - entries, err := os.ReadDir("/mnt/SC9000/storagePools/") + entries, err := os.ReadDir("/mnt/SC9000/storagePool2/") if err != nil { return err } @@ -156,6 +156,7 @@ func bla() error { }() for _, chunkName := range chunkNames { wg.Add(1) + TotalTaskCount <- 1 go func(job string, wg *sync.WaitGroup) (err error) { sem.Acquire() // Wait for worker to have slot open @@ -164,16 +165,15 @@ func bla() error { defer func() { DoneTaskCount <- 1 }() - //fmt.Printf("Scanning For Local Pools, found %s:", job) - tarFinalPath := filepath.Join("/mnt/SC9000/storagePools/", job) + tarFinalPath := filepath.Join("/mnt/SC9000/storagePool2/", job) _, err = os.Stat(tarFinalPath) if err != nil { log.Println(err) return err } parts := strings.Split(job, ".") - jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0])) + jsonPath := filepath.Join("/mnt/SC9000/storagePool2/", fmt.Sprintf("%s.json", parts[0])) _, err = os.Stat(jsonPath) if err != nil { log.Println(err) diff --git a/main.go b/main.go index c5f23a9..91b583d 100644 --- a/main.go +++ b/main.go @@ -149,9 +149,71 @@ func main() { modeIngress(*folderPathP, *skipNameP) case "rebuild": flag.Parse() - modeRebuild(*rebuildIDP) + err = modeRebuild(*rebuildIDP) + if err != nil { + panic(err) + } + case "test": + modeTest() + } +} +func modeTest() (err error) { + filePath := "/mnt/worksucc/san1/gma/2/5/0/0/2500735732.1623884796.gma" + + gmaReader, err := gma.NewReader(filePath) + if err != nil { + return err + } + defer gmaReader.Close() + + hash, err := gmaReader.GetSHA256() + if err != nil { + return err + } + fmt.Printf("GMA Hash: %s\n", hash) + gmaReader.FileHandle.Seek(0, 0) + + header, err := gmaReader.ReadHeader() + if err != nil { + return err + } + log.Printf("Name=%s\n", header.Title) + log.Printf("Desc=%s\n", header.Description) + log.Printf("AddonVersion=%d\n", header.AddonVersion) + log.Printf("FormatVersion=%d\n", header.FormatVersion) + log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte) + firstType, files, err := gmaReader.ReadFiles() + if err != nil { + return err + } + fmt.Printf("firstType = %d\n", firstType) + + for _, file := range files { + if file.FileSize < 0 { // Something is fucked + return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) + } + //fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber) + if file.NextType > uint32(file.FileNumber+10) { // Something is fucked + /*log.Printf("Current Cursor %d", gmaReader.GetOffset()) + for _, otherFile := range files[file.FileNumber:] { + log.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber) + }*/ + return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) + } + extractMeta, err := gmaReader.ExtractFileTo(file, io.Discard) + if err != nil { + return err + } + + if extractMeta.ExtractedCRC != extractMeta.OriginalMeta.CRC { + fmt.Printf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName) + } + fmt.Println("#%d = %s -%d bytes [%s]\n", extractMeta.OriginalMeta.FileNumber, extractMeta.OriginalMeta.FileName, extractMeta.OriginalMeta.FileSize, extractMeta.ExtractedSHA256) + //fmt.Printf("Extra } + return nil } + func modeRebuild(id string) (err error) { var ( dboGMA common.DB_GMA @@ -545,9 +607,10 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { Size: file.FileSize, Hash: extractMeta.ExtractedSHA256, Extension: filepath.Ext(file.FileName), + G2FRef: fmt.Sprintf("%s_%s_%d", dboGMA.ID, extractMeta.ExtractedSHA256, extractMeta.OriginalMeta.FileNumber), // reference for the GMA2File Thing so that we can find it agian in the list } dboGMA2File := common.DB_GMA2File{ - ID: fmt.Sprintf("%s_%s", dboGMA.ID, extractMeta.ExtractedSHA256), + ID: dboFile.G2FRef, BatchID: dboGMA.BatchID, File: fmt.Sprintf("file/%s", extractMeta.ExtractedSHA256), GMA: fmt.Sprintf("gma/%s", dboGMA.ID), @@ -562,7 +625,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { LocalFileName: destPath, UploadID: extractMeta.ExtractedSHA256, } - //fmt.Println(dboFile) + //fmt.Println(dboGMA2File) // Add fileIDs from new unknowns dboFiles = append(dboFiles, dboFile) //fmt.Println(dboGMA2File) @@ -638,7 +701,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { trackerUpload.UpdateMessage(fmt.Sprintf("Skipping %s", niceName)) for _, dboGMA2File := range dboGMA2Files { - if dboFileID == dboGMA2File.File { + if dboFile.G2FRef == dboGMA2File.ID { // Create File and dboGMA2File Object exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID) if err != nil { @@ -657,6 +720,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { trackerUpload.MarkAsErrored() return err } + //} else { + //log.Println("already exists... weird") } break } @@ -685,6 +750,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { log.Println(err) if strings.Contains(err.Error(), "cannot assign requested address") { uploadSuccess = false + } else if strings.Contains(err.Error(), "refused") { + panic(err) } else { log.Println("oopsie") undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) @@ -711,6 +778,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { trackerUpload.MarkAsErrored() return err } + //} else { + //log.Println("already exists... weird") } trackerUpload.Increment(1) break @@ -732,6 +801,32 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { // TODO : write new gma from arangoinfo // TODO : compare hashes { + var ( + rw_dboGMA2Files []common.DB_GMA2File + ) + + cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR gf IN gma_file_map FILTER gf._from == 'gma/%s' RETURN gf", dboGMA.ID), nil) + if err != nil { + return err + } + + defer cursor.Close() + if cursor.Count() > 0 || cursor.HasMore() { + for { + gma2File := common.DB_GMA2File{} + _, err = cursor.ReadDocument(arangoCTX, &gma2File) + if driver.IsNoMoreDocuments(err) { + break + } else if err != nil { + return err + } + gma2File.UploadID = gma2File.File[5:] + rw_dboGMA2Files = append(rw_dboGMA2Files, gma2File) + } + } else { + return fmt.Errorf("no files for gma available") + } + trackerRewriteDoneMarker := sync.Once{} trackerRewrite := progress.Tracker{Message: fmt.Sprintf("Rewriting %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault} pw.AppendTracker(&trackerRewrite) @@ -739,7 +834,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { destPath := filepath.Join(gmaTempPath, "rewrite.gma") dir := filepath.Dir(destPath) - err := os.MkdirAll(dir, os.ModePerm) + err = os.MkdirAll(dir, os.ModePerm) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err @@ -764,8 +859,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { return err } - sort.SliceStable(dboGMA2Files, func(i, j int) bool { return dboGMA2Files[i].FileNumber < dboGMA2Files[j].FileNumber }) - for _, dboGMA2File := range dboGMA2Files { + sort.SliceStable(rw_dboGMA2Files, func(i, j int) bool { return rw_dboGMA2Files[i].FileNumber < rw_dboGMA2Files[j].FileNumber }) + for _, dboGMA2File := range rw_dboGMA2Files { //fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber) err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType) if err != nil { @@ -777,7 +872,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { Timeout: 15 * time.Minute, } - for _, dboGMA2File := range dboGMA2Files { + for _, dboGMA2File := range rw_dboGMA2Files { //fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID) resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID)) if err != nil { @@ -814,6 +909,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { writeSize := writeStat.Size() if writeSize != dboGMA.GMASize { //fail + //createDebugInformation + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) trackerRewrite.MarkAsErrored() return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) @@ -825,6 +922,33 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) } trackerRewriteDoneMarker.Do(trackerRewrite.MarkAsDone) + + recoveryData := common.JSON_GMARecovery{ + GMA: dboGMA, + Refs: rw_dboGMA2Files, + } + recoveryBytes, err := json.MarshalIndent(recoveryData, "", "\t") + if err != nil { + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return err + } + + recoveryPath := filepath.Join("/mnt/SC9000/gmaRecovery", fmt.Sprintf("%s.json", dboGMA.OriginalPath)) + err = os.MkdirAll(filepath.Dir(recoveryPath), os.ModePerm) + if err != nil { + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return err + } + recoveryFile, err := os.Create(recoveryPath) + if err != nil { + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return err + } + _, err = recoveryFile.Write(recoveryBytes) + if err != nil { + return fmt.Errorf("error @recoveryFile.Write %v", err) + } + } // TODO: 4... profit? diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index 6596a2a..8411b3e 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -29,8 +29,12 @@ import ( ) var ( + FastCacheEnabled = true + FastCachePath = "/mnt/SC9000/fastCache2" + WORMCachePath = FastCachePath + PoolMaxItems = 500 - PoolPathFinal = "/mnt/SC9000/storagePools" + PoolPathFinal = "/mnt/SC9000/storagePool2" PoolPathTemp = "/mnt/ramfs/" ) @@ -165,7 +169,7 @@ func InitDatabase() (err error) { func (p *Pool) OpenTar() (err error) { p.wormMode = true - outputDir := filepath.Join(poolMaster.cachePath, "worm", p.PoolID) + outputDir := filepath.Join(WORMCachePath, "worm", p.PoolID) err = os.MkdirAll(outputDir, os.ModePerm) if err != nil { return err @@ -215,7 +219,7 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) { if poolItem == id { //fmt.Printf("Fetch WORMPool %s\n", id) p.LastTouchy = time.Now() - poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id) + poolLocalFilePath := filepath.Join(WORMCachePath, "worm", p.PoolID, id) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { return err @@ -715,6 +719,7 @@ func (p *PoolMaster) Lookup(id string) (exists bool) { } } } + dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id) if err != nil { return false @@ -962,11 +967,44 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { } //FetchFromPoolPack + apparentPoolID := dboFile2Chunk.Chunk[6:] + fastCachePath := filepath.Join(WORMCachePath, "worm", apparentPoolID, id) + if checkFileExists(fastCachePath) { + fastCacheDir := filepath.Join(WORMCachePath, "worm", apparentPoolID) + srcLocalFile, err := os.Open(fastCachePath) + if err != nil { + return err + } + defer srcLocalFile.Close() + if _, err = io.Copy(writer, srcLocalFile); err != nil { + return err + } + // at this point acquire lock til end + p.lock.Lock() + defer p.lock.Unlock() + if _, ok := p.WORMPools[apparentPoolID]; !ok { + // create virtual worm + fastCachePool, err := RestorePoolFromFolder(fastCacheDir) + if err != nil { + return err + } + // fastCachePool + p.WORMPools[fastCachePool.PoolID] = fastCachePool + + } + return nil + } //dboFile2Chunk.Chunk <- which chunk i need to find return p.FetchLoadWORM(dboFile2Chunk.Chunk[6:], id, writer) } return nil } +func checkFileExists(filePath string) bool { + _, error := os.Stat(filePath) + //return !os.IsNotExist(err) + return !errors.Is(error, os.ErrNotExist) +} + func (p *PoolMaster) Store(id string, workerID string, src io.Reader, targetSize int64) (err error) { pool, err := p.GetCurrentWriteablePool(workerID) if err != nil { @@ -1058,7 +1096,7 @@ func main() { go func() { for { poolMaster.PackFullPools() - poolMaster.CleanWORMTemp() + //poolMaster.CleanWORMTemp() time.Sleep(time.Minute * 2) } }()