package main import ( "context" "crypto/tls" "flag" "fmt" "log" "net/http" "os" "path/filepath" "runtime/debug" "sort" "strings" "time" "git.cheetah.cat/worksucc/gma-puzzles/common" "git.cheetah.cat/worksucc/gma-puzzles/gma" adriver "github.com/arangodb/go-driver" ahttp "github.com/arangodb/go-driver/http" "github.com/twinj/uuid" _ "net/http/pprof" ) var ( arangoDB adriver.Database arangoCTX context.Context //colChunk adriver.Collection colFile adriver.Collection //colFile2Chunk adriver.Collection colGMA adriver.Collection colGMA2File adriver.Collection ) func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { log.Println("connectDB:", "Starting Connection Process...") // Retry Loop for Failed Connections for i := 0; i < 6; i++ { if i == 5 { return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i) } else if i > 0 { time.Sleep(30 * time.Second) } // Connect to ArangoDB URL conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ Endpoints: []string{baseURL}, TLSConfig: &tls.Config{ /*...*/ }, }) if err != nil { log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) continue } // Connect Driver to User client, err := adriver.NewClient(adriver.ClientConfig{ Connection: conn, Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), }) if err != nil { log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) continue } // Create Context for Database Access ctx = context.Background() driver, err = client.Database(ctx, arangoDatabase) if err != nil { log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) continue } log.Println("connectDB:", "Connection Sucessful!") return driver, ctx, nil } return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") } func InitDatabase() (err error) { arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") if err != nil { return err } /*colChunk, err = arangoDB.Collection(arangoCTX, "chunk") if err != nil { return err }*/ colFile, err = arangoDB.Collection(arangoCTX, "file") if err != nil { return err } colGMA, err = arangoDB.Collection(arangoCTX, "gma") if err != nil { return err } /*colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") if err != nil { return err }*/ colGMA2File, err = arangoDB.Collection(arangoCTX, "gma_file_map") if err != nil { return err } return nil } var JobPoolSize int = 5 var ConcurrencyLimit int = 5 var WorkerJobPool chan string func main() { folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string") debug.SetMemoryLimit(6e9) flag.Parse() folderPath := *folderPathP go func() { log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) }() err := InitDatabase() if err != nil { panic(err) } /* err = colGMA.Truncate(arangoCTX) if err != nil { panic(err) } err = colFile.Truncate(arangoCTX) if err != nil { panic(err) } err = colGMA2File.Truncate(arangoCTX) if err != nil { panic(err) }*/ // /mnt/worksucc/san2/gma/2/1/2/3/2123406190.1591573904.gma //fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin") //gma, err := gma.NewReader("2143898000.1593250551.bin.gma") //folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/" folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/" // entries, err := os.ReadDir(folderPath) if err != nil { panic(err) } skipBla := false var WorkerJobPool []string for _, e := range entries { if !e.IsDir() && skipBla { if e.Name() == "2547463094.1626322945.gma" { skipBla = false } else { continue } } if !e.IsDir() { WorkerJobPool = append(WorkerJobPool, filepath.Join(folderPath, e.Name())) } } /* sem := common.NewSemaphore(ConcurrencyLimit) wg := sync.WaitGroup{} */ for _, jobFile := range WorkerJobPool { //wg.Add(1) //go func(jobFile string, wg *sync.WaitGroup) { // sem.Acquire() // Wait for worker to have slot open err = ProcessGMA(jobFile) if err != nil { log.Printf("\nERROR: %v\n", err) //panic(err) continue } os.Rename(jobFile, filepath.Join(folderPathTarget, filepath.Base(jobFile))) // sem.Release() // Release the slot // wg.Done() // Finish job //}(job, &wg) } // Wait for all jobs to finish //wg.Wait() } func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { log.Printf("undoBatch(%x, %s)\n", undoBatch, gmaID) /* _, err = colGMA.RemoveDocument(arangoCTX, gmaID) if err != nil { return err } */ /* _, _, err = colFile.RemoveDocuments(arangoCTX, fileIDs) if err != nil { return err } */ _, _, err = colGMA2File.RemoveDocuments(arangoCTX, gma2FileIDs) if err != nil { return err } return nil } func ProcessGMA(filePath string) (err error) { var ( fileIDs []string gma2FileIDs []string ) dboGMA := common.DB_GMA{} dboGMA.BatchID = uuid.NewV4().String() // use this for rapid unscheduled dissassembly dboGMA.OriginalPath = filePath dboGMA.ProcessingStart = time.Now() fileStat, err := os.Stat(filePath) if err != nil { return err } dboGMA.StatModTime = fileStat.ModTime() dboGMA.GMASize = fileStat.Size() if dboGMA.GMASize < 200 { return fmt.Errorf("GMA File too small, skipping") } log.Printf("Opening %s\n", filePath) gmaReader, err := gma.NewReader(filePath) if err != nil { return err } defer gmaReader.Close() dboGMA.GMAHash, err = gmaReader.GetSHA256() if err != nil { return err } dboGMA.ID = dboGMA.GMAHash gmaReader.FileHandle.Seek(0, 0) gmaTempPath := filepath.Join("/home/cheetah/dev/gma-puzzles/temp", dboGMA.ID) defer os.RemoveAll(gmaTempPath) // clean up under any circumstances dboIDExists, err := colGMA.DocumentExists(arangoCTX, dboGMA.ID) if err != nil { return err } if dboIDExists { return fmt.Errorf("GMA with ID %s exists", dboGMA.ID) } header, err := gmaReader.ReadHeader() if err != nil { return err } dboGMA.Header = header log.Printf("AddonVersion=%d\n", header.AddonVersion) log.Printf("FormatVersion=%d\n", header.FormatVersion) log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte) //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) firstType, files, err := gmaReader.ReadFiles() if err != nil { return err } dboGMA.FirstType = firstType //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) var ( dboGMA2Files []common.DB_GMA2File dboFiles []common.DB_File ) for _, file := range files { //fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber) if file.NextType > uint32(file.FileNumber+10) { // Something is fucked log.Printf("Current Cursor %d", gmaReader.GetOffset()) for _, otherFile := range files[file.FileNumber:] { log.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber) } return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) } destPath := filepath.Join(gmaTempPath, "contents", file.FileName) dir := filepath.Dir(destPath) err := os.MkdirAll(dir, os.ModePerm) if err != nil { return err } destFile, err := os.Create(destPath) if err != nil { return err } defer destFile.Close() extractMeta, err := gmaReader.ExtractFileTo(file, destFile) if err != nil { return err } if extractMeta.ExtractedCRC != extractMeta.OriginalMeta.CRC { log.Printf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName) } //fmt.Printf("ExtractedMeta %s CRC: %d SHA256: %s\n", file.FileName, extractMeta.ExtractedCRC, extractMeta.ExtractedSHA256) dboFile := common.DB_File{ ID: extractMeta.ExtractedSHA256, // ID is the SHA256, i guess that is good enough? BatchID: dboGMA.BatchID, InitialPath: file.FileName, CRC: file.CRC, Size: file.FileSize, Hash: extractMeta.ExtractedSHA256, Extension: filepath.Ext(file.FileName), } dboGMA2File := common.DB_GMA2File{ ID: fmt.Sprintf("%s_%s", dboGMA.ID, extractMeta.ExtractedSHA256), BatchID: dboGMA.BatchID, File: fmt.Sprintf("file/%s", extractMeta.ExtractedSHA256), GMA: fmt.Sprintf("gma/%s", dboGMA.ID), FileNumber: extractMeta.OriginalMeta.FileNumber, FileName: extractMeta.OriginalMeta.FileName, Offset: extractMeta.OriginalMeta.Offset, FileSize: extractMeta.OriginalMeta.FileSize, CRCMatch: extractMeta.ExtractedCRC == extractMeta.OriginalMeta.CRC, CRC: extractMeta.OriginalMeta.CRC, NextType: extractMeta.OriginalMeta.NextType, LocalFileName: destPath, UploadID: extractMeta.ExtractedSHA256, } //fmt.Println(dboFile) // Add fileIDs from new unknowns dboFiles = append(dboFiles, dboFile) //fmt.Println(dboGMA2File) gma2FileIDs = append(gma2FileIDs, dboGMA2File.ID) dboGMA2Files = append(dboGMA2Files, dboGMA2File) } lastFile := files[len(files)-1] dboGMA.FooterAddonCRC, err = gmaReader.ReadAddonCRC(lastFile.Offset + lastFile.FileSize) if err != nil { return err } dboGMA.ProcessingEnd = time.Now() dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds() // TODO: Calculate dboGMA.OptimizedSize dboGMA.OptimizedSize = 0 _, err = colGMA.CreateDocument(arangoCTX, dboGMA) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } importStartTime := time.Now() var newUnknownFiles []string chunkSize := 5 for { if len(dboFiles) == 0 { break } // necessary check to avoid slicing beyond // slice capacity if len(dboFiles) < chunkSize { chunkSize = len(dboFiles) } // process and work withj metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize]) for _, meta := range metaSlice { if !meta.ID.IsEmpty() { newUnknownFiles = append(newUnknownFiles, meta.Key) fileIDs = append(fileIDs, meta.Key) } } //fmt.Println("ErrorSlice") //fmt.Println(errorSlice) for _, createError := range errorSlice { if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") { } else if createError != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return createError } } dboFiles = dboFiles[chunkSize:] } log.Println() log.Printf("Imported dboFiles into Arango and now we have %d new files from %d addon files\n", len(newUnknownFiles), len(files)) deltaFileSize := int64(0) for _, unknownFile := range newUnknownFiles { unknownFileID := fmt.Sprintf("file/%s", unknownFile) for _, dboGMA2File := range dboGMA2Files { if unknownFileID == dboGMA2File.File { deltaFileSize += dboGMA2File.FileSize } } } dboGMA.OptimizedSize = deltaFileSize log.Printf("Delta Storage %d bytes\n", deltaFileSize) _, err = colGMA2File.ImportDocuments(arangoCTX, dboGMA2Files, &adriver.ImportDocumentOptions{ OnDuplicate: adriver.ImportOnDuplicateIgnore, //FromPrefix: "gma/", //ToPrefix: "file/", Complete: true, // will make it fail if any error occurs (and hopefully reverse the trans-action) }) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return fmt.Errorf("ImportDocuments File fail: %v", err) } //fmt.Printf("Code: %d, Created: %d, Ignored: %d, Errors: %d", statsImportGMA2File.Code, statsImportGMA2File.Created, statsImportGMA2File.Ignored, statsImportGMA2File.Errors) log.Printf("Import Duration %dms\n", time.Since(importStartTime).Milliseconds()) log.Println() // TODO: upload all unknownNewFiles to StorageServer http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200 var httpClient *http.Client = http.DefaultClient for _, unknownFile := range newUnknownFiles { unknownFileID := fmt.Sprintf("file/%s", unknownFile) for _, dboGMA2File := range dboGMA2Files { if unknownFileID == dboGMA2File.File { uploadSuccess := true for { log.Printf("Uploading %s to Storage\n", dboGMA2File.UploadID) err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) if err != nil { log.Println(err) if strings.Contains(err.Error(), "cannot assign requested address") { uploadSuccess = false } else { log.Println("oopsie") undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } } if uploadSuccess { break } time.Sleep(10 * time.Second) } } } } // TODO : fetch all files from storageServer // TODO : write new gma from arangoinfo // TODO : compare hashes { log.Println("rewriting gma") destPath := filepath.Join(gmaTempPath, "rewrite.gma") dir := filepath.Dir(destPath) err := os.MkdirAll(dir, os.ModePerm) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } gmaWriter, err := gma.NewWriter(destPath) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } defer gmaWriter.Close() //fmt.Printf("Writing Header with FormatVersion: %d\n", dboGMA.Header.FormatVersion) err = gmaWriter.WriteHeader(dboGMA.Header) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } err = gmaWriter.WriteFirstType(dboGMA.FirstType) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } sort.SliceStable(dboGMA2Files, func(i, j int) bool { return dboGMA2Files[i].FileNumber < dboGMA2Files[j].FileNumber }) for _, dboGMA2File := range dboGMA2Files { //fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber) err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } } var httpClient *http.Client = &http.Client{ Timeout: 15 * time.Minute, } for _, dboGMA2File := range dboGMA2Files { //fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID) resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID)) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } defer resp.Body.Close() err = gmaWriter.WriteFile(resp.Body) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } } gmaWriter.FileHandle.Seek(0, 2) log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC) gmaWriter.FileHandle.Seek(0, 0) writeHash, err := gmaWriter.GetSHA256() if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath) log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath) log.Println() writeStat, err := os.Stat(destPath) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } writeSize := writeStat.Size() if writeSize != dboGMA.GMASize { //fail undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) } if writeHash != dboGMA.GMAHash { //fail undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) } } // TODO: 4... profit? dboGMA.ProcessingEnd = time.Now() dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds() dboGMA.Success = true _, err = colGMA.UpdateDocument(arangoCTX, dboGMA.ID, dboGMA) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } return nil }