From 79cad442e298856837e2bbcab9e07d15fbea609f Mon Sep 17 00:00:00 2001 From: cheetah Date: Sun, 18 Jun 2023 22:46:53 -0500 Subject: [PATCH] fixin memleak --- common/common.go | 44 +-- gma/gma.go | 2 +- gma/reader.go | 2 +- gma/writer.go | 4 +- main.go | 475 +++++++++++++++++++++++++++++++++ storageserver/storageserver.go | 459 +++++++++++++++++++++++++++++++ 6 files changed, 963 insertions(+), 23 deletions(-) create mode 100644 main.go create mode 100644 storageserver/storageserver.go diff --git a/common/common.go b/common/common.go index d3096e8..597c1c9 100644 --- a/common/common.go +++ b/common/common.go @@ -1,8 +1,8 @@ package common import ( - "bytes" "fmt" + "io" "io/ioutil" "mime/multipart" "net/http" @@ -53,7 +53,7 @@ type DB_GMA2File struct { FileSize int64 `json:"size"` CRC uint32 `json:"crc"` CRCMatch bool `json:"crcMatch"` - NextType int32 `json:"nextType"` + NextType uint32 `json:"nextType"` LocalFileName string `json:"-"` UploadID string `json:"-"` @@ -91,23 +91,29 @@ func MultipartUpload(client *http.Client, url string, path string) (err error) { } file.Close() - body := new(bytes.Buffer) - writer := multipart.NewWriter(body) - part, err := writer.CreateFormFile("file", fi.Name()) - if err != nil { - return err - } - part.Write(fileContents) - - /*for key, val := range params { - _ = writer.WriteField(key, val) - }*/ - err = writer.Close() - if err != nil { - return err - } - - req, err := http.NewRequest("POST", url, body) + //body := new(bytes.Buffer) + pr, pw := io.Pipe() + writer := multipart.NewWriter(pw) + go func() { + defer pw.Close() + part, err := writer.CreateFormFile("file", fi.Name()) + if err != nil { + return + } + part.Write(fileContents) + + //defer body.Reset() + + /*for key, val := range params { + _ = writer.WriteField(key, val) + }*/ + err = writer.Close() + if err != nil { + return + } + }() + + req, err := http.NewRequest("POST", url, pr) req.Header.Set("Content-Type", writer.FormDataContentType()) if err != nil { return err diff --git a/gma/gma.go b/gma/gma.go index 53f1904..87a8eb0 100644 --- a/gma/gma.go +++ b/gma/gma.go @@ -21,7 +21,7 @@ type GMAFileMetadata struct { Offset int64 FileSize int64 CRC uint32 - NextType int32 + NextType uint32 } type GMAExtractionMeta struct { OriginalMeta GMAFileMetadata diff --git a/gma/reader.go b/gma/reader.go index 5f67148..faad072 100644 --- a/gma/reader.go +++ b/gma/reader.go @@ -167,7 +167,7 @@ func (r *GMAReader) readFileMetadata() (GMAFileMetadata, error) { return metadata, err } r.cursorOffset += 4 - metadata.NextType = int32(binary.LittleEndian.Uint32(nextTypeBytes)) + metadata.NextType = binary.LittleEndian.Uint32(nextTypeBytes) return metadata, nil } diff --git a/gma/writer.go b/gma/writer.go index c33db2e..ae10734 100644 --- a/gma/writer.go +++ b/gma/writer.go @@ -79,7 +79,7 @@ func (w *GMAWriter) WriteFirstType(firstType int32) (err error) { return nil } -func (w *GMAWriter) WriteFileIndex(fileName string, fileSize int64, fileCRC uint32, nextType int32) (err error) { +func (w *GMAWriter) WriteFileIndex(fileName string, fileSize int64, fileCRC uint32, nextType uint32) (err error) { w.gmaStreamWriter.WriteString(fileName) w.gmaStreamWriter.WriteByte(0) @@ -92,7 +92,7 @@ func (w *GMAWriter) WriteFileIndex(fileName string, fileSize int64, fileCRC uint w.gmaStreamWriter.Write(crcBytes) nextTypeBytes := make([]byte, 4) - binary.LittleEndian.PutUint32(nextTypeBytes, uint32(nextType)) + binary.LittleEndian.PutUint32(nextTypeBytes, nextType) w.gmaStreamWriter.Write(nextTypeBytes) w.gmaStreamWriter.Flush() diff --git a/main.go b/main.go new file mode 100644 index 0000000..746b334 --- /dev/null +++ b/main.go @@ -0,0 +1,475 @@ +package main + +import ( + "context" + "crypto/tls" + "fmt" + "log" + "net/http" + "os" + "path/filepath" + "runtime/debug" + "sort" + "strings" + "time" + + "git.cheetah.cat/worksucc/gma-puzzles/common" + "git.cheetah.cat/worksucc/gma-puzzles/gma" + adriver "github.com/arangodb/go-driver" + ahttp "github.com/arangodb/go-driver/http" + "github.com/twinj/uuid" + + _ "net/http/pprof" +) + +var ( + arangoDB adriver.Database + arangoCTX context.Context + colChunk adriver.Collection + colFile adriver.Collection + colFile2Chunk adriver.Collection + colGMA adriver.Collection + colGMA2File adriver.Collection +) + +func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { + log.Println("connectDB:", "Starting Connection Process...") + + // Retry Loop for Failed Connections + for i := 0; i < 6; i++ { + if i == 5 { + return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i) + } else if i > 0 { + time.Sleep(30 * time.Second) + } + + // Connect to ArangoDB URL + conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ + Endpoints: []string{baseURL}, + TLSConfig: &tls.Config{ /*...*/ }, + }) + if err != nil { + log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) + continue + } + + // Connect Driver to User + client, err := adriver.NewClient(adriver.ClientConfig{ + Connection: conn, + Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), + }) + if err != nil { + log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) + continue + } + + // Create Context for Database Access + ctx = context.Background() + driver, err = client.Database(ctx, arangoDatabase) + if err != nil { + log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) + continue + } + log.Println("connectDB:", "Connection Sucessful!") + return driver, ctx, nil + } + + return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") +} +func InitDatabase() (err error) { + arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") + + colChunk, err = arangoDB.Collection(arangoCTX, "chunk") + if err != nil { + return err + } + colFile, err = arangoDB.Collection(arangoCTX, "file") + if err != nil { + return err + } + colGMA, err = arangoDB.Collection(arangoCTX, "gma") + if err != nil { + return err + } + colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") + if err != nil { + return err + } + colGMA2File, err = arangoDB.Collection(arangoCTX, "gma_file_map") + if err != nil { + return err + } + return nil +} + +func main() { + debug.SetMemoryLimit(6e9) + + go func() { + log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) + }() + + err := InitDatabase() + if err != nil { + panic(err) + } + + err = colGMA.Truncate(arangoCTX) + if err != nil { + panic(err) + } + err = colFile.Truncate(arangoCTX) + if err != nil { + panic(err) + } + err = colGMA2File.Truncate(arangoCTX) + if err != nil { + panic(err) + } + + // /mnt/worksucc/san2/gma/2/1/2/3/2123406190.1591573904.gma + //fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin") + //gma, err := gma.NewReader("2143898000.1593250551.bin.gma") + + folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/" + //0 + entries, err := os.ReadDir(folderPath) + if err != nil { + panic(err) + } + skipBla := false + for _, e := range entries { + if !e.IsDir() && skipBla == true { + if e.Name() == "2548863549.1626463997.gma" { + skipBla = false + } + continue + } + if !e.IsDir() { + err = ProcessGMA(filepath.Join(folderPath, e.Name())) + if err != nil { + fmt.Printf("\nERROR: %v\n", err) + //panic(err) + } + } + } +} + +func undoBatch(gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { + _, err = colGMA.RemoveDocument(arangoCTX, gmaID) + if err != nil { + return err + } + _, _, err = colFile.RemoveDocuments(arangoCTX, fileIDs) + if err != nil { + return err + } + _, _, err = colGMA2File.RemoveDocuments(arangoCTX, gma2FileIDs) + if err != nil { + return err + } + return nil +} +func ProcessGMA(filePath string) (err error) { + var ( + fileIDs []string + gma2FileIDs []string + ) + dboGMA := common.DB_GMA{} + dboGMA.BatchID = uuid.NewV4().String() // use this for rapid unscheduled dissassembly + + dboGMA.OriginalPath = filePath + dboGMA.ProcessingStart = time.Now() + fileStat, err := os.Stat(filePath) + if err != nil { + return err + } + dboGMA.StatModTime = fileStat.ModTime() + dboGMA.GMASize = fileStat.Size() + + fmt.Printf("Opening %s\n", filePath) + gmaReader, err := gma.NewReader(filePath) + if err != nil { + return err + } + defer gmaReader.Close() + + dboGMA.GMAHash, err = gmaReader.GetSHA256() + if err != nil { + return err + } + dboGMA.ID = dboGMA.GMAHash + gmaReader.FileHandle.Seek(0, 0) + + gmaTempPath := filepath.Join("/home/cheetah/dev/gma-puzzles/temp", dboGMA.ID) + defer os.RemoveAll(gmaTempPath) // clean up under any circumstances + + dboIDExists, err := colGMA.DocumentExists(arangoCTX, dboGMA.ID) + if err != nil { + return err + } + if dboIDExists { + return fmt.Errorf("GMA with ID %s exists", dboGMA.ID) + } + + header, err := gmaReader.ReadHeader() + if err != nil { + return err + } + dboGMA.Header = header + //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) + firstType, files, err := gmaReader.ReadFiles() + if err != nil { + return err + } + dboGMA.FirstType = firstType + + //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) + var ( + dboGMA2Files []common.DB_GMA2File + dboFiles []common.DB_File + ) + for _, file := range files { + fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber) + if file.NextType > uint32(file.FileNumber+10) { // Something is fucked + fmt.Printf("Current Cursor %d", gmaReader.GetOffset()) + return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) + } + destPath := filepath.Join(gmaTempPath, "contents", file.FileName) + dir := filepath.Dir(destPath) + + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return err + } + + destFile, err := os.Create(destPath) + if err != nil { + return err + } + defer destFile.Close() + extractMeta, err := gmaReader.ExtractFileTo(file, destFile) + if err != nil { + return err + } + + if extractMeta.ExtractedCRC != extractMeta.OriginalMeta.CRC { + fmt.Printf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName) + } + //fmt.Printf("ExtractedMeta %s CRC: %d SHA256: %s\n", file.FileName, extractMeta.ExtractedCRC, extractMeta.ExtractedSHA256) + dboFile := common.DB_File{ + ID: extractMeta.ExtractedSHA256, // ID is the SHA256, i guess that is good enough? + BatchID: dboGMA.BatchID, + InitialPath: file.FileName, + CRC: file.CRC, + Size: file.FileSize, + Hash: extractMeta.ExtractedSHA256, + Extension: filepath.Ext(file.FileName), + } + dboGMA2File := common.DB_GMA2File{ + ID: fmt.Sprintf("%s_%s", dboGMA.ID, extractMeta.ExtractedSHA256), + BatchID: dboGMA.BatchID, + File: fmt.Sprintf("file/%s", extractMeta.ExtractedSHA256), + GMA: fmt.Sprintf("gma/%s", dboGMA.ID), + FileNumber: extractMeta.OriginalMeta.FileNumber, + FileName: extractMeta.OriginalMeta.FileName, + Offset: extractMeta.OriginalMeta.Offset, + FileSize: extractMeta.OriginalMeta.FileSize, + CRCMatch: extractMeta.ExtractedCRC == extractMeta.OriginalMeta.CRC, + CRC: extractMeta.OriginalMeta.CRC, + NextType: extractMeta.OriginalMeta.NextType, + + LocalFileName: destPath, + UploadID: extractMeta.ExtractedSHA256, + } + //fmt.Println(dboFile) + // Add fileIDs from new unknowns + dboFiles = append(dboFiles, dboFile) + //fmt.Println(dboGMA2File) + gma2FileIDs = append(gma2FileIDs, dboGMA2File.ID) + dboGMA2Files = append(dboGMA2Files, dboGMA2File) + } + + lastFile := files[len(files)-1] + dboGMA.FooterAddonCRC, err = gmaReader.ReadAddonCRC(lastFile.Offset + lastFile.FileSize) + if err != nil { + return err + } + dboGMA.ProcessingEnd = time.Now() + dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds() + + // TODO: Calculate dboGMA.OptimizedSize + dboGMA.OptimizedSize = 0 + _, err = colGMA.CreateDocument(arangoCTX, dboGMA) + if err != nil { + return err + } + + importStartTime := time.Now() + + var newUnknownFiles []string + chunkSize := 5 + for { + if len(dboFiles) == 0 { + break + } + + // necessary check to avoid slicing beyond + // slice capacity + if len(dboFiles) < chunkSize { + chunkSize = len(dboFiles) + } + + // process and work withj + metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize]) + + //fmt.Println("Metaslice") + //fmt.Println(metaSlice) + for _, meta := range metaSlice { + if !meta.ID.IsEmpty() { + newUnknownFiles = append(newUnknownFiles, meta.Key) + fileIDs = append(fileIDs, meta.Key) + } + } + //fmt.Println("ErrorSlice") + //fmt.Println(errorSlice) + for _, createError := range errorSlice { + if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") { + } else if createError != nil { + return createError + } + } + + dboFiles = dboFiles[chunkSize:] + } + fmt.Println() + + fmt.Printf("Imported dboFiles into Arango and now we have %d new files from %d addon files\n", len(newUnknownFiles), len(files)) + deltaFileSize := int64(0) + for _, unknownFile := range newUnknownFiles { + unknownFileID := fmt.Sprintf("file/%s", unknownFile) + for _, dboGMA2File := range dboGMA2Files { + if unknownFileID == dboGMA2File.File { + deltaFileSize += dboGMA2File.FileSize + } + } + } + dboGMA.OptimizedSize = deltaFileSize + fmt.Printf("Delta Storage %d bytes\n", deltaFileSize) + + _, err = colGMA2File.ImportDocuments(arangoCTX, dboGMA2Files, &adriver.ImportDocumentOptions{ + OnDuplicate: adriver.ImportOnDuplicateIgnore, + //FromPrefix: "gma/", + //ToPrefix: "file/", + Complete: true, // will make it fail if any error occurs (and hopefully reverse the trans-action) + }) + if err != nil { + return fmt.Errorf("ImportDocuments File fail: %v", err) + } + //fmt.Printf("Code: %d, Created: %d, Ignored: %d, Errors: %d", statsImportGMA2File.Code, statsImportGMA2File.Created, statsImportGMA2File.Ignored, statsImportGMA2File.Errors) + + fmt.Printf("Import Duration %dms\n", time.Since(importStartTime).Milliseconds()) + fmt.Println() + // TODO: upload all unknownNewFiles to StorageServer + var httpClient *http.Client = http.DefaultClient + for _, unknownFile := range newUnknownFiles { + unknownFileID := fmt.Sprintf("file/%s", unknownFile) + for _, dboGMA2File := range dboGMA2Files { + if unknownFileID == dboGMA2File.File { + //fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName) + err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) + if err != nil { + return err + } + } + } + } + // TODO : fetch all files from storageServer + // TODO : write new gma from arangoinfo + // TODO : compare hashes + { + destPath := filepath.Join(gmaTempPath, "rewrite.gma") + dir := filepath.Dir(destPath) + + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return err + } + + gmaWriter, err := gma.NewWriter(destPath) + if err != nil { + return err + } + defer gmaWriter.Close() + + //fmt.Printf("Writing Header with FormatVersion: %d\n", dboGMA.Header.FormatVersion) + err = gmaWriter.WriteHeader(dboGMA.Header) + if err != nil { + return err + } + err = gmaWriter.WriteFirstType(dboGMA.FirstType) + if err != nil { + return err + } + + sort.SliceStable(dboGMA2Files, func(i, j int) bool { return dboGMA2Files[i].FileNumber < dboGMA2Files[j].FileNumber }) + for _, dboGMA2File := range dboGMA2Files { + //fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber) + err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType) + if err != nil { + return err + } + } + var httpClient *http.Client = http.DefaultClient + for _, dboGMA2File := range dboGMA2Files { + //fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID) + resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID)) + if err != nil { + return err + } + defer resp.Body.Close() + err = gmaWriter.WriteFile(resp.Body) + if err != nil { + return err + } + } + gmaWriter.FileHandle.Seek(0, 2) + //fmt.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) + gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC) + + gmaWriter.FileHandle.Seek(0, 0) + writeHash, err := gmaWriter.GetSHA256() + if err != nil { + return err + } + + fmt.Printf("Rewrite Hash is %s %s\n", writeHash, destPath) + fmt.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath) + fmt.Println() + writeStat, err := os.Stat(destPath) + if err != nil { + return err + } + writeSize := writeStat.Size() + if writeSize != dboGMA.GMASize { + //fail + return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) + } + if writeHash != dboGMA.GMAHash { + //fail + return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) + } + } + // TODO: 4... profit? + + dboGMA.ProcessingEnd = time.Now() + dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds() + dboGMA.Success = true + + _, err = colGMA.UpdateDocument(arangoCTX, dboGMA.ID, dboGMA) + if err != nil { + return err + } + + return nil +} diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go new file mode 100644 index 0000000..478120e --- /dev/null +++ b/storageserver/storageserver.go @@ -0,0 +1,459 @@ +package main + +import ( + "archive/tar" + "context" + "crypto/tls" + "fmt" + "io" + "log" + "net/http" + "os" + "path" + "path/filepath" + "strconv" + "sync" + "time" + + adriver "github.com/arangodb/go-driver" + ahttp "github.com/arangodb/go-driver/http" + "github.com/labstack/echo/v4" + "github.com/twinj/uuid" +) + +var ( + PoolMaxItems = 2500 + PoolPathFinal = "/mnt/SC9000/storagePools" + PoolPathTemp = "/mnt/SC9000/storageTemp" +) + +type Pool struct { + PoolID string `json:"_key"` + Finalized bool `json:"finalized"` + ReadOnly bool `json:"readOnly"` + Size uint64 `json:"size"` + folder string `json:"-"` + + itemCount int + items []string + + file *os.File + tarWriter *tar.Writer + tarReader *tar.Reader +} +type PoolFile struct { + FileID string + Size uint64 +} + +type PoolMaster struct { + cachePath string + finalPath string + CurrentPool *Pool + lock sync.Mutex + + LocalPools []Pool + FullPools []Pool +} + +func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { + poolMaster.finalPath = finalPath + poolMaster.cachePath = cachePath + //poolMaster.lock = sync.Mutex{} + + destPath := filepath.Join(poolMaster.cachePath, "pool") + err = os.MkdirAll(destPath, os.ModePerm) + if err != nil { + return poolMaster, err + } + err = os.MkdirAll(poolMaster.finalPath, os.ModePerm) + if err != nil { + return poolMaster, err + } + return poolMaster, nil +} +func (p *PoolMaster) NewPool() (pool *Pool, err error) { + pool = &Pool{} + pool.PoolID = uuid.NewV4().String() + pool.Finalized = false + pool.ReadOnly = false + //TODO : Sync to DB + destPath := filepath.Join(p.cachePath, "pool", pool.PoolID) + //dir := filepath.Dir(destPath) + err = os.MkdirAll(destPath, os.ModePerm) + if err != nil { + return pool, err + } + + return pool, nil +} + +func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) { + if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems { + p.lock.Lock() + defer p.lock.Unlock() + p.CurrentPool.ReadOnly = true + p.FullPools = append(p.FullPools, *p.CurrentPool) + // queue for compression + p.CurrentPool = nil + } + if p.CurrentPool == nil { + pool, err = p.AcquireNewOrRecoverPool() + if err != nil { + return pool, err + } + p.CurrentPool = pool + return pool, nil + } + + return p.CurrentPool, nil +} +func RestorePoolFromFolder(folderPath string) (pool Pool, err error) { + pool.PoolID = path.Base(folderPath) + + entries, err := os.ReadDir(folderPath) + if err != nil { + return pool, err + } + for _, e := range entries { + if !e.IsDir() { + pool.items = append(pool.items, e.Name()) + pool.itemCount++ + } + } + pool.ReadOnly = pool.itemCount >= PoolMaxItems + pool.Finalized = false // we are still local + + return pool, err +} +func (p *PoolMaster) ScanForLocalPools() (err error) { + p.lock.Lock() + defer p.lock.Unlock() + entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool")) + if err != nil { + return err + } + for _, e := range entries { + if e.IsDir() { + fmt.Printf("Scanning For Local Pools, found %s:\n", e.Name()) + poolDirPath := filepath.Join(p.cachePath, "pool", e.Name()) + restoredPool, err := RestorePoolFromFolder(poolDirPath) + if err != nil { + return err + } + fmt.Printf("is readonly %v itemCount=%d\n", restoredPool.ReadOnly, restoredPool.itemCount) + p.LocalPools = append(p.LocalPools, restoredPool) + } + } + return nil +} +func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { + // p.NewPool() + for _, localPool := range p.LocalPools { + if !localPool.ReadOnly { + return &localPool, nil + } + } + return p.NewPool() +} +func (p *PoolMaster) Lookup(id string) (exists bool) { + if p.CurrentPool != nil { + for _, poolItem := range p.CurrentPool.items { + if poolItem == id { + return true + } + } + } + for _, fullPool := range p.FullPools { + for _, poolItem := range fullPool.items { + if poolItem == id { + return true + } + } + } + for _, localPool := range p.LocalPools { + for _, poolItem := range localPool.items { + if poolItem == id { + return true + } + } + } + // TODO : DB Check + return false +} +func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { + if p.CurrentPool != nil { + for _, poolItem := range p.CurrentPool.items { + if poolItem == id { + fmt.Printf("Fetch CurrentPool %s\n", id) + poolLocalFilePath := filepath.Join(p.cachePath, "pool", p.CurrentPool.PoolID, id) + //fmt.Println(poolLocalFilePath) + //fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem) + srcLocalFile, err := os.Open(poolLocalFilePath) + if err != nil { + return err + } + //fmt.Println("Closer") + defer srcLocalFile.Close() + //fmt.Println("io.Copy") + if _, err = io.Copy(writer, srcLocalFile); err != nil { + return err + } + return nil + } + } + } + for _, fullPool := range p.FullPools { + for _, poolItem := range fullPool.items { + if poolItem == id { + fmt.Printf("Fetch FullPool %s\n", id) + poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id) + //fmt.Println(poolLocalFilePath) + //fmt.Printf("%s %s\n", fullPool.PoolID, poolItem) + srcLocalFile, err := os.Open(poolLocalFilePath) + if err != nil { + return err + } + //fmt.Println("Closer") + defer srcLocalFile.Close() + //fmt.Println("io.Copy") + if _, err = io.Copy(writer, srcLocalFile); err != nil { + return err + } + return nil + } + } + } + for _, localPool := range p.LocalPools { + for _, poolItem := range localPool.items { + if poolItem == id { + fmt.Printf("Fetch LocalPool %s\n", id) + poolLocalFilePath := filepath.Join(p.cachePath, "pool", localPool.PoolID, id) + //fmt.Println(poolLocalFilePath) + //fmt.Printf("%s %s\n", localPool.PoolID, poolItem) + srcLocalFile, err := os.Open(poolLocalFilePath) + if err != nil { + return err + } + //fmt.Println("Closer") + defer srcLocalFile.Close() + //fmt.Println("io.Copy") + if _, err = io.Copy(writer, srcLocalFile); err != nil { + return err + } + return nil + } + } + } + return nil +} +func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err error) { + + pool, err := p.GetCurrentWriteablePool() + if err != nil { + return err + } + p.lock.Lock() + defer p.lock.Unlock() + poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID) + destPath := filepath.Join(poolFolder, id) + dst, err := os.Create(destPath) + if err != nil { + _ = os.Remove(destPath) + return err + } + defer dst.Close() + + writtenBytes, err := io.Copy(dst, src) + if err != nil { + _ = os.Remove(destPath) + return err + } + if writtenBytes != targetSize { + _ = os.Remove(destPath) + return err + } + pool.itemCount++ + pool.items = append(pool.items, id) + fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) + + entries, err := os.ReadDir(poolFolder) + if err != nil { + return err + } + newItemCount := 0 + for _, e := range entries { + if !e.IsDir() { + pool.items = append(pool.items, e.Name()) + newItemCount++ + } + } + pool.itemCount = newItemCount + fmt.Printf("Current Pool %s, Recounted ItemCount = %d\n", pool.PoolID, pool.itemCount) + if pool.itemCount >= PoolMaxItems { + pool.ReadOnly = true + } + + return nil +} + +var ( + arangoDB adriver.Database + arangoCTX context.Context + colChunk adriver.Collection + colFile adriver.Collection + colFile2Chunk adriver.Collection +) + +func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { + log.Println("connectDB:", "Starting Connection Process...") + + // Retry Loop for Failed Connections + for i := 0; i < 6; i++ { + if i == 5 { + return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i) + } else if i > 0 { + time.Sleep(30 * time.Second) + } + + // Connect to ArangoDB URL + conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ + Endpoints: []string{baseURL}, + TLSConfig: &tls.Config{ /*...*/ }, + }) + if err != nil { + log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) + continue + } + + // Connect Driver to User + client, err := adriver.NewClient(adriver.ClientConfig{ + Connection: conn, + Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), + }) + if err != nil { + log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) + continue + } + + // Create Context for Database Access + ctx = context.Background() + driver, err = client.Database(ctx, arangoDatabase) + if err != nil { + log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) + continue + } + log.Println("connectDB:", "Connection Sucessful!") + return driver, ctx, nil + } + + return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") +} +func InitDatabase() (err error) { + arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") + + colChunk, err = arangoDB.Collection(arangoCTX, "chunk") + if err != nil { + return err + } + colFile, err = arangoDB.Collection(arangoCTX, "file") + if err != nil { + return err + } + colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") + if err != nil { + return err + } + return nil +} + +var ( + poolMaster PoolMaster + //poolFiles = []PoolFile{} + //seq = 1 + //lock = sync.Mutex{} +) + +func main() { + err := InitDatabase() + if err != nil { + panic(err) + } + + poolMaster, err = NewPoolMaster(PoolPathFinal, PoolPathTemp) + if err != nil { + panic(err) + } + // Scan for local existing Pools + err = poolMaster.ScanForLocalPools() + if err != nil { + panic(err) + } + + e := echo.New() + //e.Use(middleware.Logger()) + + e.GET("/", func(c echo.Context) error { + return c.String(http.StatusOK, "Hello, World!") + }) + + e.GET("/fetch/:id", func(c echo.Context) error { + id := c.Param("id") + exists := poolMaster.Lookup(id) + if exists { + c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream) + c.Response().WriteHeader(http.StatusOK) + err = poolMaster.Fetch(id, c.Response()) + if err != nil { + fmt.Printf("%v", err) + return c.String(http.StatusInternalServerError, err.Error()) + } + c.Response().Flush() + } else { + fmt.Printf("/fetch/%s does not exist\n", id) + return c.String(http.StatusNotFound, "Not Found") + } + + return nil + //return c.Stream(200, "application/x-octet-stream", nil) + }) + + e.POST("/stash/:id/:size", func(c echo.Context) error { + id := c.Param("id") + sizeStr := c.Param("size") + sizeVal, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + fmt.Println(err) + return c.String(http.StatusExpectationFailed, "Error") + } + + exists := poolMaster.Lookup(id) + if exists { + fmt.Printf("/stash/%s exists already\n", id) + return c.String(http.StatusAlreadyReported, "Exists already") + } + fmt.Printf("stashing %s", id) + + file, err := c.FormFile("file") + if err != nil { + fmt.Println(err) + return c.String(http.StatusExpectationFailed, "Error") + } + formStream, err := file.Open() + if err != nil { + fmt.Println(err) + return c.String(http.StatusExpectationFailed, "Error") + } + defer formStream.Close() + err = poolMaster.Store(id, formStream, sizeVal) + if err != nil { + fmt.Println(err) + return c.String(http.StatusExpectationFailed, "Error") + } + fmt.Println("...stashed") + + return c.JSON(http.StatusOK, true) + }) + + e.Logger.Fatal(e.Start(":13371")) +}