From 06c2ede537a758454174ced50be190750d61189d Mon Sep 17 00:00:00 2001 From: cheetah Date: Sat, 1 Jul 2023 10:50:00 -0500 Subject: [PATCH] working again --- common/common.go | 7 +- main.go | 91 ++++++++++------- storageserver/storageserver.go | 174 +++++++++++++++++++++++++-------- 3 files changed, 195 insertions(+), 77 deletions(-) diff --git a/common/common.go b/common/common.go index 693e89d..b7c6806 100644 --- a/common/common.go +++ b/common/common.go @@ -77,7 +77,7 @@ type DB_File2Chunk struct { File string `json:"_from"` } -func MultipartUpload(client *http.Client, url string, path string) (err error) { +func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte) (err error) { //fmt.Printf("\nMultipartUpload(%s, %s)\n", url, path) file, err := os.Open(path) if err != nil { @@ -98,6 +98,11 @@ func MultipartUpload(client *http.Client, url string, path string) (err error) { writer := multipart.NewWriter(pw) go func() { defer pw.Close() + err = writer.WriteField("info", string(jsonBytes)) + if err != nil { + return + } + part, err := writer.CreateFormFile("file", fi.Name()) if err != nil { return diff --git a/main.go b/main.go index 9f19d9d..83879a9 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "crypto/tls" + "encoding/json" "flag" "fmt" "io" @@ -113,9 +114,12 @@ var WorkerJobPool chan string func main() { folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string") + skipNameP := flag.String("skip", "", "skip until this name") debug.SetMemoryLimit(6e9) flag.Parse() folderPath := *folderPathP + skipName := *skipNameP + skipNameEnabled := len(skipName) > 0 go func() { log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) @@ -151,14 +155,12 @@ func main() { if err != nil { panic(err) } - skipBla := false - var WorkerJobPool []string for _, e := range entries { - if !e.IsDir() && skipBla { - if e.Name() == "2547463094.1626322945.gma" { - skipBla = false + if !e.IsDir() && skipNameEnabled { + if e.Name() == skipName { + skipNameEnabled = false } else { continue } @@ -209,10 +211,12 @@ func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []str return err } */ - _, _, err = colGMA2File.RemoveDocuments(arangoCTX, gma2FileIDs) - if err != nil { - return err - } + /* + _, _, err = colGMA2File.RemoveDocuments(arangoCTX, gma2FileIDs) + if err != nil { + return err + } + */ return nil } func ProcessGMA(filePath string) (err error) { @@ -265,10 +269,10 @@ func ProcessGMA(filePath string) (err error) { return err } dboGMA.Header = header - log.Printf("AddonVersion=%d\n", header.AddonVersion) - log.Printf("FormatVersion=%d\n", header.FormatVersion) - log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte) - //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) + // log.Printf("AddonVersion=%d\n", header.AddonVersion) + // log.Printf("FormatVersion=%d\n", header.FormatVersion) + // log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte) + // //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) firstType, files, err := gmaReader.ReadFiles() if err != nil { return err @@ -280,7 +284,7 @@ func ProcessGMA(filePath string) (err error) { dboGMA2Files []common.DB_GMA2File dboFiles []common.DB_File ) - + // Convert GMA Files into DB Metadata for _, file := range files { if file.FileSize < 0 { // Something is fucked return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) @@ -452,9 +456,11 @@ func ProcessGMA(filePath string) (err error) { // TODO: upload all unknownNewFiles to StorageServer http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200 var httpClient *http.Client = http.DefaultClient + uploadBar := progressbar.Default(int64(len(dboFiles)), "Uploading to StorageServer") for _, dboFile := range dboFiles { dboFileID := fmt.Sprintf("file/%s", dboFile.ID) + //fmt.Printf("Line 460: %s checking if we need to store this on the server", dboFileID) //dboFile2ChunkID := fmt.Sprintf("file_chunk_map/%s", dboFile.ID) // TODO: Check against Storage backend @@ -467,28 +473,43 @@ func ProcessGMA(filePath string) (err error) { return err } //body, _ := ioutil.ReadAll(res.Body) + //fmt.Printf("res.StatusCode = %d\n", res.StatusCode) if res.StatusCode == http.StatusAlreadyReported { - uploadBar.Add(1) uploadBar.Describe("Skipping") + uploadBar.Add(1) continue } for _, dboGMA2File := range dboGMA2Files { if dboFileID == dboGMA2File.File { // find corresponding dboGMA2File + //log.Println("Found dboFileID == dboGMA2File.Ref ID") uploadSuccess := true for { //log.Printf("Uploading %s to Storage\n", dboGMA2File.UploadID) - if !dboExistFile[dboFile.ID] { + // TODO: move file management to storageserver + /*existsFile, err := colFile.DocumentExists(arangoCTX, dboFile.ID) + if err != nil { + log.Println("err @colFile.DocumentExist") + return err + } + if !existsFile { _, err := colFile.CreateDocument(arangoCTX, dboFile) if err != nil { // TODO: error handling + log.Println("err @colFile.CreateDocument") return err } + }*/ + fileInfoJSON, err := json.Marshal(dboFile) + if err != nil { + log.Println("err @json.Marshal dboFile") + return err } uploadBar.Describe("Uploading") - err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) + err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON) if err != nil { + log.Println("err @common.MultipartUpload") log.Println(err) if strings.Contains(err.Error(), "cannot assign requested address") { uploadSuccess = false @@ -504,35 +525,38 @@ func ProcessGMA(filePath string) (err error) { } if uploadSuccess { // Create File and dboGMA2File Object - if !dboExistFile2GMA[dboGMA2File.ID] { - exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID) + exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID) + if err != nil { + log.Println("err @colGMA2File.DocumentExists") + log.Println("oopsie") + // remove fileObject + /*if !dboExistFile[dboFile.ID] { // if the file did not exist prior to this + _, _ = colFile.RemoveDocument(arangoCTX, dboFile.ID) + }*/ + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return err + } + if !exists { + _, err = colGMA2File.CreateDocument(arangoCTX, dboGMA2File) if err != nil { + log.Println("err @colGMA2File.CreateDocument") log.Println("oopsie") // remove fileObject - if !dboExistFile[dboFile.ID] { // if the file did not exist prior to this + /*if !dboExistFile[dboFile.ID] { // if the file did not exist prior to this _, _ = colFile.RemoveDocument(arangoCTX, dboFile.ID) - } + }*/ undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } - if !exists { - _, err = colGMA2File.CreateDocument(arangoCTX, dboGMA2File) - if err != nil { - log.Println("oopsie") - // remove fileObject - if !dboExistFile[dboFile.ID] { // if the file did not exist prior to this - _, _ = colFile.RemoveDocument(arangoCTX, dboFile.ID) - } - undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) - return err - } - } } uploadBar.Add(1) break } time.Sleep(10 * time.Second) } + if uploadSuccess { + break + } } } } @@ -602,6 +626,7 @@ func ProcessGMA(filePath string) (err error) { log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC) + // TODO: maybe use io.MultiWriter ?? gmaWriter.FileHandle.Seek(0, 0) writeHash, err := gmaWriter.GetSHA256() if err != nil { diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index 0c1f4cd..4ea7bce 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -5,6 +5,7 @@ import ( "context" "crypto/sha256" "crypto/tls" + "encoding/json" "errors" "fmt" "io" @@ -32,6 +33,17 @@ var ( PoolPathTemp = "/mnt/ramfs/" ) +// TODO: write Recovery Data after Packing +type PoolRecoveryData struct { + PoolID string `json:"_key"` + Size uint64 `json:"size"` + Created time.Time `json:"date"` + Hash string `json:"hash"` + + ItemCount int `json:"itemCount"` + Items []string `json:"items"` + RecoveryData []common.DB_File `json:"recoveryData"` +} type Pool struct { PoolID string `json:"_key"` Finalized bool `json:"finalized"` @@ -58,6 +70,7 @@ type PoolMaster struct { finalPath string CurrentPool *Pool lock sync.Mutex + WORMLock sync.Mutex LocalPools []*Pool FullPools []*Pool @@ -442,17 +455,52 @@ func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) { fmt.Printf("MoveWORM Duration %dms\n", time.Since(startTime).Milliseconds()) return nil } + +func (p *PoolMaster) WriteRecoveryFile(packResult PoolPackResult, pool *Pool) (err error) { + fileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.json", packResult.PoolID)) + + recoveryFile, err := os.Create(fileName) + if err != nil { + return err + } + defer recoveryFile.Close() + + poolRecoveryData := PoolRecoveryData{ + PoolID: packResult.PoolID, + Size: uint64(packResult.Size), + Created: time.Now(), + Hash: packResult.Hash, + ItemCount: pool.itemCount, + Items: pool.items, + //RecoveryData, + } + + //TODO: fetch RecoveryData from DB + poolRecoveryData.RecoveryData = make([]common.DB_File, len(pool.items)) + _, _, err = colFile.ReadDocuments(arangoCTX, pool.items, poolRecoveryData.RecoveryData) + if err != nil { + return fmt.Errorf("error @ReadDocuments %v", err) + } + + json, err := json.MarshalIndent(poolRecoveryData, "", "\t") + if err != nil { + return fmt.Errorf("error @json.MarshalIndent %v", err) + } + _, err = recoveryFile.Write(json) + if err != nil { + return fmt.Errorf("error @recoveryFile.Write %v", err) + } + + return nil +} + +/* +* Only call this in a locked Sate + */ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err error) { startTime := time.Now() packResult.PoolID = poolID - /* - fmt.Printf("Aquiring Lock for PackPool(%s)\n", poolID) - p.lock.Lock() - defer p.lock.Unlock() - defer fmt.Printf("unlock PackPool\n") - fmt.Printf("Aquired Lock success for PackPool(%s)\n", poolID) - */ packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID)) tarFile, err := os.Create(packResult.outputFileName) if err != nil { @@ -528,6 +576,9 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil)) fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash) + // TODO: write index json describing the files inside, pure hash list, aswell + // as dictionary containing all the infos for restore/disaster-recovery into Arango + packFileStats, err := tarFileCheck.Stat() if err != nil { return packResult, err @@ -552,7 +603,6 @@ func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { } func (p *PoolMaster) Lookup(id string) (exists bool) { - // TODO: DB check if p.CurrentPool != nil { // CurrentPool for _, poolItem := range p.CurrentPool.items { if poolItem == id { @@ -560,6 +610,8 @@ func (p *PoolMaster) Lookup(id string) (exists bool) { } } } + p.WORMLock.Lock() + defer p.WORMLock.Unlock() for _, wormPool := range p.WORMPools { // WORM Pools for _, poolItem := range wormPool.items { if poolItem == id { @@ -581,8 +633,6 @@ func (p *PoolMaster) Lookup(id string) (exists bool) { } } } - // TODO : DB Check - // ArangoDB dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id) if err != nil { return false @@ -608,6 +658,15 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ // else load wormPool into disk-cache extract to "worm" // wormMode + // TODO: every Method here should have locked the WORMLock!! + /* + fmt.Printf("Aquiring WORMLock for FetchLoadWORM\n") + p.WORMLock.Lock() + defer p.WORMLock.Unlock() + defer fmt.Printf("unlock WORMLock FetchLoadWORM\n") + fmt.Printf("Aquired WORMLock success for FetchLoadWORM\n") + */ + fmt.Printf("Aquiring Lock for FetchLoadWORM\n") p.lock.Lock() defer p.lock.Unlock() @@ -640,6 +699,8 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ } func (p *PoolMaster) CleanWORMTemp() (err error) { p.lock.Lock() + p.WORMLock.Lock() + defer p.WORMLock.Unlock() defer p.lock.Unlock() for _, wormPool := range p.WORMPools { @@ -674,6 +735,8 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { } } } + p.WORMLock.Lock() + defer p.WORMLock.Unlock() for _, wormPool := range p.WORMPools { for _, poolItem := range wormPool.items { if poolItem == id { @@ -794,7 +857,7 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro newItemCount := 0 for _, e := range entries { if !e.IsDir() { - pool.items = append(pool.items, e.Name()) + //pool.items = append(pool.items, e.Name()) newItemCount++ } } @@ -827,39 +890,51 @@ func main() { go func() { for { - var deletedPools []string - for _, fullPool := range poolMaster.FullPools { + if len(poolMaster.FullPools) > 0 { poolMaster.lock.Lock() - - packResult, err := poolMaster.PackPool(fullPool.PoolID) - if err != nil { - panic(err) - } - err = poolMaster.ImportPoolPackResult(packResult) - if err != nil { - panic(err) - } - err = poolMaster.MovePoolPackToWORM(packResult) - if err != nil { - panic(err) - } - os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) - deletedPools = append(deletedPools, fullPool.PoolID) - _, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{ - NotReady: false, - }) - if err != nil { - panic(err) + fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n") + poolMaster.WORMLock.Lock() + fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n") + + var deletedPools []string + for _, fullPool := range poolMaster.FullPools { + packResult, err := poolMaster.PackPool(fullPool.PoolID) + if err != nil { + panic(err) + } + err = poolMaster.ImportPoolPackResult(packResult) + if err != nil { + panic(err) + } + err = poolMaster.MovePoolPackToWORM(packResult) + if err != nil { + panic(err) + } + err = poolMaster.WriteRecoveryFile(packResult, fullPool) + if err != nil { + panic(err) + } + os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) + deletedPools = append(deletedPools, fullPool.PoolID) + _, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{ + NotReady: false, + }) + if err != nil { + panic(err) + } } - poolMaster.lock.Unlock() - } - for _, deletedPoolID := range deletedPools { - for index, fullPool := range poolMaster.FullPools { - if fullPool.PoolID == deletedPoolID { - poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index) - break + for _, deletedPoolID := range deletedPools { + for index, fullPool := range poolMaster.FullPools { + if fullPool.PoolID == deletedPoolID { + poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index) + break + } } } + + poolMaster.lock.Unlock() + poolMaster.WORMLock.Unlock() + fmt.Printf("unlock WORMLock Regular FullPool Pack\n") } // poolMaster.CleanWORMTemp() @@ -905,7 +980,9 @@ func main() { e.GET("/check/:id", func(c echo.Context) error { id := c.Param("id") + fmt.Printf("/check/%s checking...\n", id) exists := poolMaster.Lookup(id) + fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists)) if exists { return c.JSON(http.StatusAlreadyReported, exists) } @@ -914,6 +991,7 @@ func main() { e.GET("/fetch/:id", func(c echo.Context) error { id := c.Param("id") + fmt.Printf("/fetch/%s fetching...\n", id) exists := poolMaster.Lookup(id) if exists { c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream) @@ -928,9 +1006,7 @@ func main() { fmt.Printf("/fetch/%s does not exist\n", id) return c.String(http.StatusNotFound, "Not Found") } - return nil - //return c.Stream(200, "application/x-octet-stream", nil) }) e.POST("/stash/:id/:size", func(c echo.Context) error { @@ -941,7 +1017,13 @@ func main() { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } - + infoJSON := c.FormValue("info") + fileInfo := common.DB_File{} + err = json.Unmarshal([]byte(infoJSON), &fileInfo) + if err != nil { + fmt.Println(err) + return c.String(http.StatusBadRequest, "Error") + } exists := poolMaster.Lookup(id) if exists { fmt.Printf("/stash/%s exists already\n", id) @@ -981,6 +1063,12 @@ func main() { return c.String(http.StatusExpectationFailed, "Error") } fmt.Println("...stashed") + fmt.Println(fileInfo) + _, err = colFile.CreateDocument(arangoCTX, fileInfo) + if err != nil { + fmt.Println(err) + return c.String(http.StatusExpectationFailed, "Error") + } return c.JSON(http.StatusOK, true) })