diff --git a/common/common.go b/common/common.go index 597c1c9..ef2c1cc 100644 --- a/common/common.go +++ b/common/common.go @@ -62,11 +62,13 @@ type DB_GMA2File struct { type DB_Chunk struct { ID string `json:"_key"` + NotReady bool `json:"notReady"` Finalized bool `json:"finalized"` ReadOnly bool `json:"readOnly"` - Size int64 `json:"size"` - Hash string `json:"hash"` + FileCount int `json:"fileCount"` + Size int64 `json:"size"` + Hash string `json:"hash"` } type DB_File2Chunk struct { diff --git a/gma/reader.go b/gma/reader.go index faad072..c188133 100644 --- a/gma/reader.go +++ b/gma/reader.go @@ -138,36 +138,47 @@ func (r *GMAReader) readFileMetadata() (GMAFileMetadata, error) { if err != nil { return metadata, err } - fileName = fileName[:len(fileName)-1] // remove nullbyte + //fmt.Printf("bufio ReadString(byte(0)) = len(%d) data=%x\n", len(fileName), fileName) + /*if len(fileName) == 1 { //fucky retry + fileName, err := r.gmaStreamReader.ReadString(byte(0)) + if err != nil { + return metadata, err + } + fmt.Printf("RETRY!! bufio ReadString(byte(0)) = len(%d) data=%x\n", len(fileName), fileName) + }*/ + fileName = fileName[:len(fileName)-1] // remove nullbyte that causes go string fuckyness r.cursorOffset += uint32(len(fileName) + 1) // Add name length + null byte metadata.FileName = fileName // Read the file size fileSizeBytes := make([]byte, 8) - _, err = r.gmaStreamReader.Read(fileSizeBytes) + _, err = io.ReadFull(r.gmaStreamReader, fileSizeBytes) if err != nil { return metadata, err } r.cursorOffset += 8 + //fmt.Printf("bufio Read([]byte(4)]) fileSizeBytes = bytesRead(%d) data=%x\n", bytesRead, fileSizeBytes) metadata.FileSize = int64(binary.LittleEndian.Uint64(fileSizeBytes)) // Read the file crc crcBytes := make([]byte, 4) - _, err = r.gmaStreamReader.Read(crcBytes) + _, err = io.ReadFull(r.gmaStreamReader, crcBytes) if err != nil { return metadata, err } r.cursorOffset += 4 + //fmt.Printf("bufio Read([]byte(4)]) crcBytes = bytesRead(%d) data=%x\n", bytesRead, crcBytes) metadata.CRC = binary.LittleEndian.Uint32(crcBytes) // Read the next type nextTypeBytes := make([]byte, 4) - _, err = r.gmaStreamReader.Read(nextTypeBytes) + _, err = io.ReadFull(r.gmaStreamReader, nextTypeBytes) if err != nil { return metadata, err } r.cursorOffset += 4 metadata.NextType = binary.LittleEndian.Uint32(nextTypeBytes) + //fmt.Printf("bufio Read([]byte(4)]) nextTypeBytes = bytesRead(%d) data=%x\n", bytesRead, nextTypeBytes) return metadata, nil } diff --git a/main.go b/main.go index 746b334..6023036 100644 --- a/main.go +++ b/main.go @@ -114,24 +114,26 @@ func main() { panic(err) } - err = colGMA.Truncate(arangoCTX) - if err != nil { - panic(err) - } - err = colFile.Truncate(arangoCTX) - if err != nil { - panic(err) - } - err = colGMA2File.Truncate(arangoCTX) - if err != nil { - panic(err) - } + /* + err = colGMA.Truncate(arangoCTX) + if err != nil { + panic(err) + } + err = colFile.Truncate(arangoCTX) + if err != nil { + panic(err) + } + err = colGMA2File.Truncate(arangoCTX) + if err != nil { + panic(err) + }*/ // /mnt/worksucc/san2/gma/2/1/2/3/2123406190.1591573904.gma //fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin") //gma, err := gma.NewReader("2143898000.1593250551.bin.gma") - folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/" + folderPath := "/mnt/SC9000/TemporaryTestingShit/" //"/mnt/worksucc/san1/gma/2/5/4/8/" + folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/" //0 entries, err := os.ReadDir(folderPath) if err != nil { @@ -140,22 +142,26 @@ func main() { skipBla := false for _, e := range entries { if !e.IsDir() && skipBla == true { - if e.Name() == "2548863549.1626463997.gma" { + if e.Name() == "2547463094.1626322945.gma" { skipBla = false + } else { + continue } - continue } if !e.IsDir() { err = ProcessGMA(filepath.Join(folderPath, e.Name())) if err != nil { fmt.Printf("\nERROR: %v\n", err) //panic(err) + continue } + os.Rename(filepath.Join(folderPath, e.Name()), filepath.Join(folderPathTarget, e.Name())) } } } -func undoBatch(gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { +func undoBatch(undoBatch *bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { + fmt.Printf("undoBatch(%x, %s)\n", undoBatch, gmaID) _, err = colGMA.RemoveDocument(arangoCTX, gmaID) if err != nil { return err @@ -172,8 +178,9 @@ func undoBatch(gmaID string, fileIDs []string, gma2FileIDs []string) (err error) } func ProcessGMA(filePath string) (err error) { var ( - fileIDs []string - gma2FileIDs []string + fileIDs []string + gma2FileIDs []string + failedProcessing = true ) dboGMA := common.DB_GMA{} dboGMA.BatchID = uuid.NewV4().String() // use this for rapid unscheduled dissassembly @@ -187,6 +194,9 @@ func ProcessGMA(filePath string) (err error) { dboGMA.StatModTime = fileStat.ModTime() dboGMA.GMASize = fileStat.Size() + if dboGMA.GMASize < 200 { + return fmt.Errorf("GMA File too small, skipping") + } fmt.Printf("Opening %s\n", filePath) gmaReader, err := gma.NewReader(filePath) if err != nil { @@ -217,6 +227,9 @@ func ProcessGMA(filePath string) (err error) { return err } dboGMA.Header = header + fmt.Printf("AddonVersion=%d\n", header.AddonVersion) + fmt.Printf("FormatVersion=%d\n", header.FormatVersion) + fmt.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte) //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) firstType, files, err := gmaReader.ReadFiles() if err != nil { @@ -233,6 +246,9 @@ func ProcessGMA(filePath string) (err error) { fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber) if file.NextType > uint32(file.FileNumber+10) { // Something is fucked fmt.Printf("Current Cursor %d", gmaReader.GetOffset()) + for _, otherFile := range files[file.FileNumber:] { + fmt.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber) + } return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) } destPath := filepath.Join(gmaTempPath, "contents", file.FileName) @@ -298,6 +314,8 @@ func ProcessGMA(filePath string) (err error) { dboGMA.ProcessingEnd = time.Now() dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds() + // if anything fails, lets undo the documents we imported + defer undoBatch(&failedProcessing, dboGMA.ID, fileIDs, gma2FileIDs) // TODO: Calculate dboGMA.OptimizedSize dboGMA.OptimizedSize = 0 _, err = colGMA.CreateDocument(arangoCTX, dboGMA) @@ -420,9 +438,12 @@ func ProcessGMA(filePath string) (err error) { return err } } - var httpClient *http.Client = http.DefaultClient + var httpClient *http.Client = &http.Client{ + Timeout: 15 * time.Minute, + } + for _, dboGMA2File := range dboGMA2Files { - //fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID) + fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID) resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID)) if err != nil { return err @@ -434,7 +455,7 @@ func ProcessGMA(filePath string) (err error) { } } gmaWriter.FileHandle.Seek(0, 2) - //fmt.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) + fmt.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC) gmaWriter.FileHandle.Seek(0, 0) @@ -471,5 +492,6 @@ func ProcessGMA(filePath string) (err error) { return err } + failedProcessing = false return nil } diff --git a/storageserver/arango.go b/storageserver/arango.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/storageserver/arango.go @@ -0,0 +1 @@ +package main diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index 478120e..c4af500 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -3,7 +3,9 @@ package main import ( "archive/tar" "context" + "crypto/sha256" "crypto/tls" + "errors" "fmt" "io" "log" @@ -12,9 +14,11 @@ import ( "path" "path/filepath" "strconv" + "strings" "sync" "time" + "git.cheetah.cat/worksucc/gma-puzzles/common" adriver "github.com/arangodb/go-driver" ahttp "github.com/arangodb/go-driver/http" "github.com/labstack/echo/v4" @@ -32,13 +36,15 @@ type Pool struct { Finalized bool `json:"finalized"` ReadOnly bool `json:"readOnly"` Size uint64 `json:"size"` - folder string `json:"-"` + //folder string `json:"-"` itemCount int items []string - file *os.File - tarWriter *tar.Writer + wormMode bool + filePath string + file *os.File + //tarWriter *tar.Writer tarReader *tar.Reader } type PoolFile struct { @@ -54,11 +60,154 @@ type PoolMaster struct { LocalPools []Pool FullPools []Pool + WORMPools map[string]Pool +} +type PoolPackResult struct { + PoolID string + Files []string + FileCount int + Size int64 + Hash string + outputFileName string +} + +var ( + arangoDB adriver.Database + arangoCTX context.Context + colChunk adriver.Collection + colFile adriver.Collection + colFile2Chunk adriver.Collection + + // PoolMaster + poolMaster PoolMaster +) + +func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { + log.Println("connectDB:", "Starting Connection Process...") + + // Retry Loop for Failed Connections + for i := 0; i < 6; i++ { + if i == 5 { + return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i) + } else if i > 0 { + time.Sleep(30 * time.Second) + } + + // Connect to ArangoDB URL + conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ + Endpoints: []string{baseURL}, + TLSConfig: &tls.Config{ /*...*/ }, + }) + if err != nil { + log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) + continue + } + + // Connect Driver to User + client, err := adriver.NewClient(adriver.ClientConfig{ + Connection: conn, + Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), + }) + if err != nil { + log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) + continue + } + + // Create Context for Database Access + ctx = context.Background() + driver, err = client.Database(ctx, arangoDatabase) + if err != nil { + log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) + continue + } + log.Println("connectDB:", "Connection Sucessful!") + return driver, ctx, nil + } + + return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") +} +func InitDatabase() (err error) { + arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") + + colChunk, err = arangoDB.Collection(arangoCTX, "chunk") + if err != nil { + return err + } + colFile, err = arangoDB.Collection(arangoCTX, "file") + if err != nil { + return err + } + colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") + if err != nil { + return err + } + return nil +} + +func (p *Pool) OpenTar() (err error) { + p.wormMode = true + + outputDir := filepath.Join(poolMaster.cachePath, "worm", p.PoolID) + err = os.MkdirAll(outputDir, os.ModePerm) + if err != nil { + return err + } + + p.file, err = os.Open(p.filePath) + if err != nil { + return err + } + p.items = []string{} + p.tarReader = tar.NewReader(p.file) + for { + header, err := p.tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return err + } + + path := filepath.Join(outputDir, header.Name) + info := header.FileInfo() + file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode()) + if err != nil { + return err + } + defer file.Close() + _, err = io.Copy(file, p.tarReader) + if err != nil { + return err + } + p.items = append(p.items, header.Name) + fmt.Print(".") + } + + return nil +} +func (p *Pool) Fetch(id string, writer io.Writer) (err error) { + for _, poolItem := range p.items { + if poolItem == id { + fmt.Printf("Fetch WORMPool %s\n", id) + poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id) + srcLocalFile, err := os.Open(poolLocalFilePath) + if err != nil { + return err + } + defer srcLocalFile.Close() + if _, err = io.Copy(writer, srcLocalFile); err != nil { + return err + } + return nil + } + } + return fmt.Errorf("%s not found", id) } func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { poolMaster.finalPath = finalPath poolMaster.cachePath = cachePath + poolMaster.WORMPools = make(map[string]Pool) //poolMaster.lock = sync.Mutex{} destPath := filepath.Join(poolMaster.cachePath, "pool") @@ -66,6 +215,13 @@ func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, e if err != nil { return poolMaster, err } + + destPath = filepath.Join(poolMaster.cachePath, "worm") + err = os.MkdirAll(destPath, os.ModePerm) + if err != nil { + return poolMaster, err + } + err = os.MkdirAll(poolMaster.finalPath, os.ModePerm) if err != nil { return poolMaster, err @@ -89,12 +245,14 @@ func (p *PoolMaster) NewPool() (pool *Pool, err error) { } func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) { + //fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems { p.lock.Lock() defer p.lock.Unlock() p.CurrentPool.ReadOnly = true p.FullPools = append(p.FullPools, *p.CurrentPool) // queue for compression + fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one", p.CurrentPool.PoolID, p.CurrentPool.itemCount) p.CurrentPool = nil } if p.CurrentPool == nil { @@ -135,7 +293,37 @@ func (p *PoolMaster) ScanForLocalPools() (err error) { } for _, e := range entries { if e.IsDir() { - fmt.Printf("Scanning For Local Pools, found %s:\n", e.Name()) + fmt.Printf("Scanning For Local Pools, found %s:", e.Name()) + + tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", e.Name())) + _, err = os.Stat(tarFinalPath) + finalPathExists := false + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + } + dboChunkExists, err := colChunk.DocumentExists(arangoCTX, e.Name()) + if err != nil { + return err + } + if dboChunkExists { + var dboChunk common.DB_Chunk + _, err := colChunk.ReadDocument(arangoCTX, e.Name(), &dboChunk) + if err != nil { + return err + } + finalPathExists = dboChunk.Finalized && dboChunk.ReadOnly && !dboChunk.NotReady + fmt.Printf("is in DB readonly %v finalized %v notready %v itemCount=%d size=%d hash=%s\n", dboChunk.ReadOnly, dboChunk.Finalized, dboChunk.NotReady, dboChunk.FileCount, dboChunk.Size, dboChunk.Hash) + if finalPathExists { + fmt.Println("skipping") + } + } + + if finalPathExists { + continue + } + poolDirPath := filepath.Join(p.cachePath, "pool", e.Name()) restoredPool, err := RestorePoolFromFolder(poolDirPath) if err != nil { @@ -147,6 +335,189 @@ func (p *PoolMaster) ScanForLocalPools() (err error) { } return nil } + +// Pool Packing +func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error) { + startTime := time.Now() + dboChunk := common.DB_Chunk{ + ID: packResult.PoolID, + Hash: packResult.Hash, + Size: packResult.Size, + FileCount: packResult.FileCount, + NotReady: true, + ReadOnly: true, + Finalized: true, + } + var dboChunk2File []common.DB_File2Chunk + for _, prFile := range packResult.Files { + dboChunk2File = append(dboChunk2File, common.DB_File2Chunk{ + ID: prFile, + File: fmt.Sprintf("file/%s", prFile), + Chunk: fmt.Sprintf("chunk/%s", dboChunk.ID), + }) + } + + _, err = colChunk.CreateDocument(arangoCTX, dboChunk) + if err != nil { + return err + } + chunkSize := 500 + for { + if len(dboChunk2File) == 0 { + break + } + + // necessary check to avoid slicing beyond + // slice capacity + if len(dboChunk2File) < chunkSize { + chunkSize = len(dboChunk2File) + } + + _, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize]) + //metaSlice, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize]) + + //fmt.Println("Metaslice") + //fmt.Println(metaSlice) + /*for _, meta := range metaSlice { + if !meta.ID.IsEmpty() { + newUnknownFiles = append(newUnknownFiles, meta.Key) + fileIDs = append(fileIDs, meta.Key) + } + }*/ + for _, createError := range errorSlice { + if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") { + } else if createError != nil { + return createError + } + } + + dboChunk2File = dboChunk2File[chunkSize:] + } + + fmt.Printf("ImportPool Duration %dms\n", time.Since(startTime).Milliseconds()) + return nil +} +func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) { + startTime := time.Now() + + targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", packResult.PoolID)) + os.Rename(packResult.outputFileName, targetFileName) + + tarFileCheck, err := os.Open(targetFileName) + if err != nil { + return err + } + defer tarFileCheck.Close() + + shaHasher := sha256.New() + _, err = io.Copy(shaHasher, tarFileCheck) + if err != nil { + return err + } + wormHash := fmt.Sprintf("%x", shaHasher.Sum(nil)) + fmt.Printf("WORMTarPool hash is %s , old is %s\n", wormHash, packResult.Hash) + if wormHash != packResult.Hash { + os.Remove(targetFileName) + return err + } + fmt.Printf("MoveWORM Duration %dms\n", time.Since(startTime).Milliseconds()) + return nil +} +func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err error) { + startTime := time.Now() + packResult.PoolID = poolID + + p.lock.Lock() + defer p.lock.Unlock() + + packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID)) + tarFile, err := os.Create(packResult.outputFileName) + if err != nil { + return packResult, err + } + defer tarFile.Close() + + tw := tar.NewWriter(tarFile) + defer tw.Close() + + entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool", poolID)) + if err != nil { + return packResult, err + } + //fmt.Printf("len(entries) == %d\n", len(entries)) + if len(entries) != PoolMaxItems { + return packResult, fmt.Errorf("Pool contains %d items, but there should be %d", len(entries), PoolMaxItems) + } + for _, e := range entries { + originalPath := filepath.Join(p.cachePath, "pool", poolID, e.Name()) + + file, err := os.Open(originalPath) + if err != nil { + return packResult, err + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return packResult, err + } + + tarFileHeader, err := tar.FileInfoHeader(info, info.Name()) + if err != nil { + return packResult, err + } + err = tw.WriteHeader(tarFileHeader) + if err != nil { + return packResult, err + } + + _, err = io.Copy(tw, file) + if err != nil { + return packResult, err + } + packResult.FileCount++ + packResult.Files = append(packResult.Files, e.Name()) + } + + err = tw.Flush() + if err != nil { + return packResult, err + } + + err = tarFile.Close() + if err != nil { + return packResult, err + } + + // re-open and check + + tarFileCheck, err := os.Open(packResult.outputFileName) + if err != nil { + return packResult, err + } + defer tarFileCheck.Close() + + shaHasher := sha256.New() + hashedBytes, err := io.Copy(shaHasher, tarFileCheck) + if err != nil { + return packResult, err + } + packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil)) + fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash) + + packFileStats, err := tarFileCheck.Stat() + if err != nil { + return packResult, err + } + packResult.Size = packFileStats.Size() + if hashedBytes != packResult.Size { + return packResult, fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, packResult.Size) + } + fmt.Printf("PackPool Duration %dms\n", time.Since(startTime).Milliseconds()) + + return packResult, nil +} + func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { // p.NewPool() for _, localPool := range p.LocalPools { @@ -156,22 +527,31 @@ func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { } return p.NewPool() } + func (p *PoolMaster) Lookup(id string) (exists bool) { - if p.CurrentPool != nil { + // TODO: DB check + if p.CurrentPool != nil { // CurrentPool for _, poolItem := range p.CurrentPool.items { if poolItem == id { return true } } } - for _, fullPool := range p.FullPools { + for _, wormPool := range p.WORMPools { // WORM Pools + for _, poolItem := range wormPool.items { + if poolItem == id { + return true + } + } + } + for _, fullPool := range p.FullPools { // Full Pools for _, poolItem := range fullPool.items { if poolItem == id { return true } } } - for _, localPool := range p.LocalPools { + for _, localPool := range p.LocalPools { // Local Pools for _, poolItem := range localPool.items { if poolItem == id { return true @@ -179,8 +559,59 @@ func (p *PoolMaster) Lookup(id string) (exists bool) { } } // TODO : DB Check - return false + // ArangoDB + dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id) + if err != nil { + return false + } + return dboFile2ChunkExists } + +func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writer) (err error) { + fmt.Printf("FetchLoadWORM(chunkID %s, fileID %s, ...)\n", chunkID, fileID) + // search within loaded worm-pools + for wormID, wormPool := range p.WORMPools { + if wormID != chunkID { + continue + } + for _, poolItem := range wormPool.items { + if poolItem == fileID { + fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID) + return wormPool.Fetch(fileID, writer) + } + } + break + } + // else load wormPool into disk-cache extract to "worm" + // wormMode + + p.lock.Lock() + defer p.lock.Unlock() + + var dboChunk common.DB_Chunk + _, err = colChunk.ReadDocument(arangoCTX, chunkID, &dboChunk) + if err != nil { + return err + } + loadedWormPool := Pool{ + PoolID: dboChunk.ID, + Size: uint64(dboChunk.Size), + ReadOnly: dboChunk.ReadOnly, + Finalized: dboChunk.Finalized, + + filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", dboChunk.ID)), + } + fmt.Println("initialized loadedWormPool, Opening tar...") + err = loadedWormPool.OpenTar() + if err != nil { + return err + } + fmt.Println("extracted") + p.WORMPools[loadedWormPool.PoolID] = loadedWormPool + return loadedWormPool.Fetch(fileID, writer) + //return nil +} + func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { if p.CurrentPool != nil { for _, poolItem := range p.CurrentPool.items { @@ -203,20 +634,24 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { } } } + for _, wormPool := range p.WORMPools { + for _, poolItem := range wormPool.items { + if poolItem == id { + fmt.Printf("Fetch WORMPool %s file %s\n", wormPool.PoolID, id) + return wormPool.Fetch(id, writer) + } + } + } for _, fullPool := range p.FullPools { for _, poolItem := range fullPool.items { if poolItem == id { fmt.Printf("Fetch FullPool %s\n", id) poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id) - //fmt.Println(poolLocalFilePath) - //fmt.Printf("%s %s\n", fullPool.PoolID, poolItem) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { return err } - //fmt.Println("Closer") defer srcLocalFile.Close() - //fmt.Println("io.Copy") if _, err = io.Copy(writer, srcLocalFile); err != nil { return err } @@ -229,15 +664,11 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { if poolItem == id { fmt.Printf("Fetch LocalPool %s\n", id) poolLocalFilePath := filepath.Join(p.cachePath, "pool", localPool.PoolID, id) - //fmt.Println(poolLocalFilePath) - //fmt.Printf("%s %s\n", localPool.PoolID, poolItem) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { return err } - //fmt.Println("Closer") defer srcLocalFile.Close() - //fmt.Println("io.Copy") if _, err = io.Copy(writer, srcLocalFile); err != nil { return err } @@ -245,6 +676,24 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { } } } + + // ArangoDB + dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id) + if err != nil { + return err + } + fmt.Printf("dboFile2ChunkExists %s = %v\n", id, dboFile2ChunkExists) + if dboFile2ChunkExists { + var dboFile2Chunk common.DB_File2Chunk + _, err = colFile2Chunk.ReadDocument(arangoCTX, id, &dboFile2Chunk) + if err != nil { + return err + } + + //FetchFromPoolPack + //dboFile2Chunk.Chunk <- which chunk i need to find + return p.FetchLoadWORM(dboFile2Chunk.Chunk[6:], id, writer) + } return nil } func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err error) { @@ -253,8 +702,15 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro if err != nil { return err } + if pool.ReadOnly { + return fmt.Errorf("WTF Pool %s is ReadOnly but GetCurrentWriteablePool returned it", pool.PoolID) + } + + fmt.Printf("Store(%s)\n", id) + p.lock.Lock() defer p.lock.Unlock() + // figuring out paths poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID) destPath := filepath.Join(poolFolder, id) dst, err := os.Create(destPath) @@ -263,7 +719,7 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro return err } defer dst.Close() - + // copy from ioReader to file writtenBytes, err := io.Copy(dst, src) if err != nil { _ = os.Remove(destPath) @@ -273,6 +729,18 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro _ = os.Remove(destPath) return err } + + // check transferred data + dst.Seek(0, 0) + shaHasher := sha256.New() + if _, err := io.Copy(shaHasher, dst); err != nil { + return err + } + outputHash := fmt.Sprintf("%x", shaHasher.Sum(nil)) + if outputHash != id { + return fmt.Errorf("Store() Sha256 Hash Mismatch") + } + pool.itemCount++ pool.items = append(pool.items, id) fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) @@ -297,83 +765,6 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro return nil } -var ( - arangoDB adriver.Database - arangoCTX context.Context - colChunk adriver.Collection - colFile adriver.Collection - colFile2Chunk adriver.Collection -) - -func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { - log.Println("connectDB:", "Starting Connection Process...") - - // Retry Loop for Failed Connections - for i := 0; i < 6; i++ { - if i == 5 { - return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i) - } else if i > 0 { - time.Sleep(30 * time.Second) - } - - // Connect to ArangoDB URL - conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ - Endpoints: []string{baseURL}, - TLSConfig: &tls.Config{ /*...*/ }, - }) - if err != nil { - log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) - continue - } - - // Connect Driver to User - client, err := adriver.NewClient(adriver.ClientConfig{ - Connection: conn, - Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), - }) - if err != nil { - log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) - continue - } - - // Create Context for Database Access - ctx = context.Background() - driver, err = client.Database(ctx, arangoDatabase) - if err != nil { - log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) - continue - } - log.Println("connectDB:", "Connection Sucessful!") - return driver, ctx, nil - } - - return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") -} -func InitDatabase() (err error) { - arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") - - colChunk, err = arangoDB.Collection(arangoCTX, "chunk") - if err != nil { - return err - } - colFile, err = arangoDB.Collection(arangoCTX, "file") - if err != nil { - return err - } - colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") - if err != nil { - return err - } - return nil -} - -var ( - poolMaster PoolMaster - //poolFiles = []PoolFile{} - //seq = 1 - //lock = sync.Mutex{} -) - func main() { err := InitDatabase() if err != nil { @@ -390,6 +781,24 @@ func main() { panic(err) } + for _, localPool := range poolMaster.LocalPools { + if localPool.ReadOnly { + fmt.Printf("Packing Pool %s\n", localPool.PoolID) + packResult, err := poolMaster.PackPool(localPool.PoolID) + if err != nil { + panic(err) + } + err = poolMaster.ImportPoolPackResult(packResult) + if err != nil { + panic(err) + } + err = poolMaster.MovePoolPackToWORM(packResult) + if err != nil { + panic(err) + } + } + // packResult.FileCount + } e := echo.New() //e.Use(middleware.Logger())