package main import ( "archive/tar" "context" "crypto/sha256" "crypto/tls" "encoding/json" "errors" "fmt" "io" "log" "net/http" "os" "path" "path/filepath" "strconv" "strings" "sync" "time" "git.cheetah.cat/worksucc/gma-puzzles/common" adriver "github.com/arangodb/go-driver" ahttp "github.com/arangodb/go-driver/http" "github.com/klauspost/compress/zstd" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/twinj/uuid" ) var ( FastCacheEnabled = true FastCachePath = "/zpool0/cheetah/workshop/garrysmod/gma-inator/cache" WORMCachePath = FastCachePath PoolMaxItems = 500 PoolPathFinal = "/zpool0/cheetah/workshop/garrysmod/gma-inator/chunks/" PoolPathTemp = "/zpool0/cheetah/workshop/garrysmod/gma-inator/temp" ) type PoolRecoveryData struct { PoolID string `json:"_key"` Size uint64 `json:"size"` Created time.Time `json:"date"` Hash string `json:"hash"` ItemCount int `json:"itemCount"` Items []string `json:"items"` RecoveryData []common.DB_File `json:"recoveryData"` } type Pool struct { PoolID string `json:"_key"` Finalized bool `json:"finalized"` ReadOnly bool `json:"readOnly"` Size uint64 `json:"size"` LastTouchy time.Time `json:"-"` itemCount int items []string wormMode bool filePath string file *os.File //tarWriter *tar.Writer tarReader *tar.Reader } type PoolFile struct { FileID string Size uint64 } type PoolMaster struct { cachePath string finalPath string CurrentPool map[string]*Pool lock sync.Mutex WORMLock sync.Mutex LocalPools []*Pool FullPools []*Pool WORMPools map[string]*Pool } type PoolPackResult struct { PoolID string Files []string FileCount int Size int64 Hash string outputFileName string } var ( arangoDB adriver.Database arangoCTX context.Context colChunk adriver.Collection colFile adriver.Collection colFile2Chunk adriver.Collection // PoolMaster poolMaster PoolMaster ) func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { log.Println("connectDB:", "Starting Connection Process...") // Retry Loop for Failed Connections for i := 0; i < 6; i++ { if i == 5 { return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i) } else if i > 0 { time.Sleep(30 * time.Second) } // Connect to ArangoDB URL conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ Endpoints: []string{baseURL}, TLSConfig: &tls.Config{ /*...*/ }, }) if err != nil { log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) continue } // Connect Driver to User client, err := adriver.NewClient(adriver.ClientConfig{ Connection: conn, Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), }) if err != nil { log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) continue } // Create Context for Database Access ctx = context.Background() driver, err = client.Database(ctx, arangoDatabase) if err != nil { log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) continue } log.Println("connectDB:", "Connection Sucessful!") return driver, ctx, nil } return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") } func InitDatabase() (err error) { arangoDB, arangoCTX, err = ConnectDB("http://192.168.133.6:8529", "gma-inator", "gma-inator", "gma-inator") if err != nil { return err } colChunk, err = arangoDB.Collection(arangoCTX, "chunk") if err != nil { return err } colFile, err = arangoDB.Collection(arangoCTX, "file") if err != nil { return err } colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") if err != nil { return err } return nil } func (p *Pool) OpenTar() (err error) { p.wormMode = true outputDir := filepath.Join(WORMCachePath, "worm", p.PoolID) err = os.MkdirAll(outputDir, os.ModePerm) if err != nil { return err } p.file, err = os.Open(p.filePath) if err != nil { return err } p.items = []string{} decompressor, err := zstd.NewReader(p.file, zstd.WithDecoderConcurrency(8)) if err != nil { panic(err) } defer decompressor.Close() p.tarReader = tar.NewReader(decompressor) for { header, err := p.tarReader.Next() if err == io.EOF { break } if err != nil { return err } path := filepath.Join(outputDir, header.Name) info := header.FileInfo() file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode()) if err != nil { return err } defer file.Close() _, err = io.Copy(file, p.tarReader) if err != nil { return err } p.items = append(p.items, header.Name) fmt.Print(".") } return nil } func (p *Pool) Fetch(id string, writer io.Writer) (err error) { for _, poolItem := range p.items { if poolItem == id { //fmt.Printf("Fetch WORMPool %s\n", id) p.LastTouchy = time.Now() poolLocalFilePath := filepath.Join(WORMCachePath, "worm", p.PoolID, id) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { return err } defer srcLocalFile.Close() if _, err = io.Copy(writer, srcLocalFile); err != nil { return err } return nil } } return fmt.Errorf("%s not found", id) } func (p *Pool) Unload() { log.Printf("Unloading WORMPool [%s]\n", p.PoolID) p.file.Close() } func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { poolMaster.finalPath = finalPath poolMaster.cachePath = cachePath poolMaster.CurrentPool = make(map[string]*Pool) poolMaster.WORMPools = make(map[string]*Pool) //poolMaster.lock = sync.Mutex{} destPath := filepath.Join(poolMaster.cachePath, "pool") err = os.MkdirAll(destPath, os.ModePerm) if err != nil { return PoolMaster{}, err } destPath = filepath.Join(poolMaster.cachePath, "worm") err = os.MkdirAll(destPath, os.ModePerm) if err != nil { return PoolMaster{}, err } err = os.MkdirAll(poolMaster.finalPath, os.ModePerm) if err != nil { return PoolMaster{}, err } return } func (p *PoolMaster) NewPool() (*Pool, error) { var err error pool := Pool{} pool.PoolID = uuid.NewV4().String() pool.Finalized = false pool.ReadOnly = false //TODO : Sync to DB destPath := filepath.Join(p.cachePath, "pool", pool.PoolID) //dir := filepath.Dir(destPath) err = os.MkdirAll(destPath, os.ModePerm) if err != nil { return &pool, err } return &pool, nil } func (p *PoolMaster) GetCurrentWriteablePool(workerID string) (pool *Pool, err error) { //fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) if p.CurrentPool[workerID] != nil && p.CurrentPool[workerID].itemCount >= PoolMaxItems { fmt.Printf("Aquiring Lock for GetCurrentWriteablepool\n") p.lock.Lock() defer p.lock.Unlock() defer fmt.Printf("unlock GetCurrentWriteablepool\n") fmt.Printf("Aquired Lock success for GetCurrentWriteablepool\n") p.CurrentPool[workerID].ReadOnly = true p.CurrentPool[workerID].LastTouchy = time.Now() p.FullPools = append(p.FullPools, p.CurrentPool[workerID]) // queue for compression fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool[workerID].PoolID, p.CurrentPool[workerID].itemCount) p.CurrentPool[workerID] = nil } if p.CurrentPool[workerID] == nil { fmt.Printf("Creating new Pool") p.CurrentPool[workerID], err = p.AcquireNewOrRecoverPool() err := os.WriteFile(filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", p.CurrentPool[workerID].PoolID)), []byte(workerID), os.ModePerm) if err != nil { return pool, err } fmt.Printf("... got [%s]\n", p.CurrentPool[workerID].PoolID) } return p.CurrentPool[workerID], nil } func RestorePoolFromFolder(folderPath string) (pool *Pool, err error) { pool = &Pool{} pool.PoolID = path.Base(folderPath) entries, err := os.ReadDir(folderPath) if err != nil { return pool, err } for _, e := range entries { if !e.IsDir() { pool.items = append(pool.items, e.Name()) pool.itemCount++ } } pool.ReadOnly = pool.itemCount >= PoolMaxItems pool.Finalized = false // we are still local return pool, err } func (p *PoolMaster) ScanForLocalPools() (err error) { fmt.Printf("Aquiring Lock for ScanForLocalPools\n") p.lock.Lock() defer p.lock.Unlock() defer fmt.Printf("unlock ScanForLocalPools\n") fmt.Printf("Aquired Lock success for ScanForLocalPools\n") entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool")) if err != nil { return err } for _, e := range entries { if e.IsDir() { fmt.Printf("Scanning For Local Pools, found %s:", e.Name()) tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", e.Name())) _, err = os.Stat(tarFinalPath) finalPathExists := false if err != nil { if !errors.Is(err, os.ErrNotExist) { return err } } dboChunkExists, err := colChunk.DocumentExists(arangoCTX, e.Name()) if err != nil { return err } if dboChunkExists { var dboChunk common.DB_Chunk _, err := colChunk.ReadDocument(arangoCTX, e.Name(), &dboChunk) if err != nil { return err } finalPathExists = dboChunk.Finalized && dboChunk.ReadOnly && !dboChunk.NotReady fmt.Printf("is in DB readonly %v finalized %v notready %v itemCount=%d size=%d hash=%s\n", dboChunk.ReadOnly, dboChunk.Finalized, dboChunk.NotReady, dboChunk.FileCount, dboChunk.Size, dboChunk.Hash) if finalPathExists { fmt.Println("skipping") } } if finalPathExists { continue } poolDirPath := filepath.Join(p.cachePath, "pool", e.Name()) restoredPool, err := RestorePoolFromFolder(poolDirPath) if err != nil { return err } workerCacheFileName := filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", e.Name())) fmt.Printf("is readonly %v itemCount=%d\n", restoredPool.ReadOnly, restoredPool.itemCount) if restoredPool.itemCount == 500 { p.LocalPools = append(p.LocalPools, restoredPool) } else { _, err = os.Stat(workerCacheFileName) //if we have a worker assingment file if err != nil { if !errors.Is(err, os.ErrNotExist) { return err } // if not exists p.LocalPools = append(p.LocalPools, restoredPool) } else { workerBytes, err := os.ReadFile(workerCacheFileName) if err != nil { return err } p.CurrentPool[string(workerBytes)] = restoredPool fmt.Println(string(workerBytes)) } } } } return nil } // Pool Packing func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error) { startTime := time.Now() dboChunk := common.DB_Chunk{ ID: packResult.PoolID, Hash: packResult.Hash, Size: packResult.Size, FileCount: packResult.FileCount, Created: time.Now(), NotReady: true, ReadOnly: true, Finalized: true, } var dboChunk2File []common.DB_File2Chunk for _, prFile := range packResult.Files { dboChunk2File = append(dboChunk2File, common.DB_File2Chunk{ ID: prFile, File: fmt.Sprintf("file/%s", prFile), Chunk: fmt.Sprintf("chunk/%s", dboChunk.ID), }) } _, err = colChunk.CreateDocument(arangoCTX, dboChunk) if err != nil { return err } chunkSize := 500 for { if len(dboChunk2File) == 0 { break } // necessary check to avoid slicing beyond // slice capacity if len(dboChunk2File) < chunkSize { chunkSize = len(dboChunk2File) } _, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize]) //metaSlice, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize]) //fmt.Println("Metaslice") //fmt.Println(metaSlice) /*for _, meta := range metaSlice { if !meta.ID.IsEmpty() { newUnknownFiles = append(newUnknownFiles, meta.Key) fileIDs = append(fileIDs, meta.Key) } }*/ for _, createError := range errorSlice { if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") { } else if createError != nil { return createError } } dboChunk2File = dboChunk2File[chunkSize:] } fmt.Printf("ImportPool Duration %dms\n", time.Since(startTime).Milliseconds()) return nil } func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) { startTime := time.Now() targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", packResult.PoolID)) err = common.MoveFile(packResult.outputFileName, targetFileName) if err != nil { return err } tarFileCheck, err := os.Open(targetFileName) if err != nil { return err } defer tarFileCheck.Close() shaHasher := sha256.New() _, err = io.Copy(shaHasher, tarFileCheck) if err != nil { return err } wormHash := fmt.Sprintf("%x", shaHasher.Sum(nil)) fmt.Printf("WORMTarPool hash is %s , old is %s\n", wormHash, packResult.Hash) if wormHash != packResult.Hash { os.Remove(targetFileName) return err } fmt.Printf("MoveWORM Duration %dms\n", time.Since(startTime).Milliseconds()) return nil } func (p *PoolMaster) WriteRecoveryFile(packResult PoolPackResult, pool *Pool) (err error) { fileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.json", packResult.PoolID)) recoveryFile, err := os.Create(fileName) if err != nil { return err } defer recoveryFile.Close() poolRecoveryData := PoolRecoveryData{ PoolID: packResult.PoolID, Size: uint64(packResult.Size), Created: time.Now(), Hash: packResult.Hash, ItemCount: pool.itemCount, Items: pool.items, //RecoveryData, } //TODO: fetch RecoveryData from DB poolRecoveryData.RecoveryData = make([]common.DB_File, len(pool.items)) _, _, err = colFile.ReadDocuments(arangoCTX, pool.items, poolRecoveryData.RecoveryData) if err != nil { return fmt.Errorf("error @ReadDocuments %v", err) } json, err := json.MarshalIndent(poolRecoveryData, "", "\t") if err != nil { return fmt.Errorf("error @json.MarshalIndent %v", err) } _, err = recoveryFile.Write(json) if err != nil { return fmt.Errorf("error @recoveryFile.Write %v", err) } return nil } /* * Only call this in a locked Sate */ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err error) { startTime := time.Now() packResult.PoolID = poolID packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar.zst", poolID)) tarFile, err := os.Create(packResult.outputFileName) if err != nil { return packResult, fmt.Errorf("os.Create: %v", err) } defer tarFile.Close() compressor, err := zstd.NewWriter(tarFile, zstd.WithEncoderLevel(4)) if err != nil { return packResult, fmt.Errorf("zstd.NewWriter: %v", err) } defer compressor.Close() tw := tar.NewWriter(compressor) defer tw.Close() entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool", poolID)) if err != nil { return packResult, fmt.Errorf("os.ReadDir: %v", err) } //fmt.Printf("len(entries) == %d\n", len(entries)) if len(entries) != PoolMaxItems { return packResult, fmt.Errorf("Pool contains %d items, but there should be %d", len(entries), PoolMaxItems) } for _, e := range entries { originalPath := filepath.Join(p.cachePath, "pool", poolID, e.Name()) file, err := os.Open(originalPath) if err != nil { return packResult, fmt.Errorf("for os.Open: %v", err) } defer file.Close() info, err := file.Stat() if err != nil { return packResult, fmt.Errorf("for fs.Stat: %v", err) } tarFileHeader, err := tar.FileInfoHeader(info, info.Name()) if err != nil { return packResult, err } err = tw.WriteHeader(tarFileHeader) if err != nil { return packResult, err } _, err = io.Copy(tw, file) if err != nil { return packResult, err } packResult.FileCount++ packResult.Files = append(packResult.Files, e.Name()) fmt.Printf("*") } err = tw.Close() if err != nil { return packResult, err } err = compressor.Close() if err != nil { return packResult, err } err = tarFile.Close() if err != nil { return packResult, err } // re-open and check { tarFileCheck, err := os.Open(packResult.outputFileName) if err != nil { return packResult, err } defer tarFileCheck.Close() shaHasher := sha256.New() hashedBytes, err := io.Copy(shaHasher, tarFileCheck) if err != nil { return packResult, err } packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil)) fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash) tarFileCheck.Seek(0, 0) // validate written tar-chunk decompressor, err := zstd.NewReader(tarFileCheck, zstd.WithDecoderConcurrency(8)) if err != nil { panic(err) } defer decompressor.Close() tarFileCheckReader := tar.NewReader(decompressor) //filenamesReadBackList := []string{} for { header, err := tarFileCheckReader.Next() //header.PAXRecords if err == io.EOF { break } if err != nil { return packResult, err } hasher := sha256.New() hashedBytes, err := io.Copy(hasher, tarFileCheckReader) if err != nil { return packResult, err } readBackChecksum := fmt.Sprintf("%x", hasher.Sum(nil)) if hashedBytes != header.Size { return packResult, fmt.Errorf("validation on output archive, incorrect size file %s has %d should be %d", header.Name, hashedBytes, header.Size) } if header.Name != readBackChecksum { return packResult, fmt.Errorf("validation on output archive, incorrect checksum file %s has %s", header.Name, readBackChecksum) } //filenamesReadBackList = append(filenamesReadBackList, header.Name) } packFileStats, err := tarFileCheck.Stat() if err != nil { return packResult, err } packResult.Size = packFileStats.Size() if hashedBytes != packResult.Size { return packResult, fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, packResult.Size) } fmt.Printf("PackPool Duration %dms\n", time.Since(startTime).Milliseconds()) } // TODO: write index json describing the files inside, pure hash list, aswell // as dictionary containing all the infos for restore/disaster-recovery into Arango return packResult, nil } func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { // p.NewPool() for _, localPool := range p.LocalPools { if !localPool.ReadOnly && localPool.itemCount < PoolMaxItems { return localPool, nil } } return p.NewPool() } func (p *PoolMaster) Lookup(id string) (exists bool) { p.lock.Lock() defer p.lock.Unlock() for _, cp := range p.CurrentPool { if cp != nil { // CurrentPool for _, poolItem := range cp.items { if poolItem == id { return true } } } } for _, wormPool := range p.WORMPools { // WORM Pools for _, poolItem := range wormPool.items { if poolItem == id { return true } } } for _, fullPool := range p.FullPools { // Full Pools for _, poolItem := range fullPool.items { if poolItem == id { return true } } } for _, localPool := range p.LocalPools { // Local Pools for _, poolItem := range localPool.items { if poolItem == id { return true } } } dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id) if err != nil { return false } return dboFile2ChunkExists } func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writer) (err error) { fmt.Printf("FetchLoadWORM(chunkID %s, fileID %s, ...)\n", chunkID, fileID) // search within loaded worm-pools //fmt.Println("WormPool For Start") for wormID, wormPool := range p.WORMPools { //fmt.Printf("WORMPool[%s] for-iter\n", wormID) if wormID != chunkID { continue } for _, poolItem := range wormPool.items { if poolItem == fileID { //fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID) return wormPool.Fetch(fileID, writer) } } break } // else load wormPool into disk-cache extract to "worm" // wormMode // TODO: every Method here should have locked the WORMLock!! /* fmt.Printf("Aquiring WORMLock for FetchLoadWORM\n") p.WORMLock.Lock() defer p.WORMLock.Unlock() 0 defer fmt.Printf("unlock WORMLock FetchLoadWORM\n") fmt.Printf("Aquired WORMLock success for FetchLoadWORM\n") */ fmt.Printf("Aquiring Lock for FetchLoadWORM\n") p.lock.Lock() defer p.lock.Unlock() defer fmt.Printf("unlock FetchLoadWORM\n") fmt.Printf("Aquired Lock success for FetchLoadWORM\n") var dboChunk common.DB_Chunk _, err = colChunk.ReadDocument(arangoCTX, chunkID, &dboChunk) if err != nil { return err } loadedWormPool := Pool{ PoolID: dboChunk.ID, Size: uint64(dboChunk.Size), ReadOnly: dboChunk.ReadOnly, Finalized: dboChunk.Finalized, LastTouchy: time.Now(), filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", dboChunk.ID)), } fmt.Printf("initialized loadedWormPool (%s), Opening tar...\n", dboChunk.ID) err = loadedWormPool.OpenTar() if err != nil { fmt.Println(err) return err } fmt.Printf("extracted and key = %s\n", loadedWormPool.PoolID) p.WORMPools[loadedWormPool.PoolID] = &loadedWormPool return loadedWormPool.Fetch(fileID, writer) //return nil } func (p *PoolMaster) CleanWORMTemp() (err error) { p.lock.Lock() //p.WORMLock.Lock() //defer p.WORMLock.Unlock() defer p.lock.Unlock() for _, wormPool := range p.WORMPools { if time.Since(wormPool.LastTouchy).Minutes() > 4 { wormPool.Unload() delete(p.WORMPools, wormPool.PoolID) //os.RemoveAll(filepath.Join(poolMaster.cachePath, "worm", wormPool.PoolID)) } } return nil } func (p *PoolMaster) PackFullPools() (err error) { if len(poolMaster.FullPools) >= 16 { poolMaster.lock.Lock() fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n") //poolMaster.WORMLock.Lock() //fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n") poolChannel := make(chan error) // Channel to receive the results var deletedPools []string var poolChannelCounter = 0 for _, fullPool := range poolMaster.FullPools { if time.Since(fullPool.LastTouchy).Minutes() < 5 { continue } // start parallel go func(fullPool *Pool) { packResult, err := poolMaster.PackPool(fullPool.PoolID) if err != nil { poolChannel <- fmt.Errorf("error @PackPool: %v", err) return } err = poolMaster.ImportPoolPackResult(packResult) if err != nil { poolChannel <- fmt.Errorf("error @ImportPoolPackResult: %v", err) return } err = poolMaster.MovePoolPackToWORM(packResult) if err != nil { poolChannel <- fmt.Errorf("error @MovePoolPackToWORM: %v", err) return } err = poolMaster.WriteRecoveryFile(packResult, fullPool) if err != nil { poolChannel <- fmt.Errorf("error @WriteRecoveryFile: %v", err) return } os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) deletedPools = append(deletedPools, fullPool.PoolID) _, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{ NotReady: false, Finalized: true, ReadOnly: true, Hash: packResult.Hash, Size: packResult.Size, }) if err != nil { poolChannel <- err return } poolChannel <- nil }(fullPool) // increment total parallel counter poolChannelCounter++ } // Waiting for them to finish for i := 0; i < poolChannelCounter; i++ { result := <-poolChannel if result != nil { fmt.Printf("PoolChannel Error: %v\n", result) } } // delete pools that have successfully packed for _, deletedPoolID := range deletedPools { for index, fullPool := range poolMaster.FullPools { if fullPool.PoolID == deletedPoolID { poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index) break } } } //fmt.Printf("unlock WORMLock Regular FullPool Pack\n") //poolMaster.WORMLock.Unlock() fmt.Printf("unlock lock Regular FullPool Pack\n") poolMaster.lock.Unlock() } return nil } func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { var unlocKOnce sync.Once p.lock.Lock() defer unlocKOnce.Do(p.lock.Unlock) for _, cp := range p.CurrentPool { if cp != nil { for _, poolItem := range cp.items { if poolItem == id { fmt.Printf("Fetch CurrentPool %s\n", id) poolLocalFilePath := filepath.Join(p.cachePath, "pool", cp.PoolID, id) //fmt.Println(poolLocalFilePath) //fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { return err } //fmt.Println("Closer") defer srcLocalFile.Close() //fmt.Println("io.Copy") if _, err = io.Copy(writer, srcLocalFile); err != nil { return err } return nil } } } } for _, wormPool := range p.WORMPools { for _, poolItem := range wormPool.items { if poolItem == id { fmt.Printf("Fetch WORMPool %s file %s\n", wormPool.PoolID, id) return wormPool.Fetch(id, writer) } } } unlocKOnce.Do(p.lock.Unlock) for _, fullPool := range p.FullPools { for _, poolItem := range fullPool.items { if poolItem == id { fmt.Printf("Fetch FullPool %s\n", id) fullPool.LastTouchy = time.Now() poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { return err } defer srcLocalFile.Close() if _, err = io.Copy(writer, srcLocalFile); err != nil { return err } return nil } } } for _, localPool := range p.LocalPools { for _, poolItem := range localPool.items { if poolItem == id { fmt.Printf("Fetch LocalPool %s\n", id) poolLocalFilePath := filepath.Join(p.cachePath, "pool", localPool.PoolID, id) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { return err } defer srcLocalFile.Close() if _, err = io.Copy(writer, srcLocalFile); err != nil { return err } return nil } } } // ArangoDB dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id) if err != nil { return err } fmt.Printf("dboFile2ChunkExists %s = %v\n", id, dboFile2ChunkExists) if dboFile2ChunkExists { var dboFile2Chunk common.DB_File2Chunk _, err = colFile2Chunk.ReadDocument(arangoCTX, id, &dboFile2Chunk) if err != nil { return err } //FetchFromPoolPack apparentPoolID := dboFile2Chunk.Chunk[6:] fastCachePath := filepath.Join(WORMCachePath, "worm", apparentPoolID, id) if checkFileExists(fastCachePath) { fastCacheDir := filepath.Join(WORMCachePath, "worm", apparentPoolID) srcLocalFile, err := os.Open(fastCachePath) if err != nil { return err } defer srcLocalFile.Close() if _, err = io.Copy(writer, srcLocalFile); err != nil { return err } // at this point acquire lock til end p.lock.Lock() defer p.lock.Unlock() if _, ok := p.WORMPools[apparentPoolID]; !ok { // create virtual worm fastCachePool, err := RestorePoolFromFolder(fastCacheDir) if err != nil { return err } // fastCachePool p.WORMPools[fastCachePool.PoolID] = fastCachePool } return nil } //dboFile2Chunk.Chunk <- which chunk i need to find return p.FetchLoadWORM(dboFile2Chunk.Chunk[6:], id, writer) } return nil } func checkFileExists(filePath string) bool { _, error := os.Stat(filePath) //return !os.IsNotExist(err) return !errors.Is(error, os.ErrNotExist) } func (p *PoolMaster) Store(id string, workerID string, src io.Reader, targetSize int64) (err error) { pool, err := p.GetCurrentWriteablePool(workerID) if err != nil { return err } if pool.ReadOnly { return fmt.Errorf("WTF Pool %s is ReadOnly but GetCurrentWriteablePool returned it", pool.PoolID) } fmt.Printf("Store(%s)\n", id) fmt.Printf("Aquiring Lock for Store\n") p.lock.Lock() defer p.lock.Unlock() defer fmt.Printf("unlock Store\n") fmt.Printf("Aquired Lock success for Store\n") // figuring out paths poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID) destPath := filepath.Join(poolFolder, id) dst, err := os.Create(destPath) if err != nil { _ = os.Remove(destPath) return err } defer dst.Close() // copy from ioReader to file writtenBytes, err := io.Copy(dst, src) if err != nil { _ = os.Remove(destPath) return err } if writtenBytes != targetSize { _ = os.Remove(destPath) return err } // check transferred data dst.Seek(0, 0) shaHasher := sha256.New() if _, err := io.Copy(shaHasher, dst); err != nil { return err } outputHash := fmt.Sprintf("%x", shaHasher.Sum(nil)) if outputHash != id { return fmt.Errorf("Store() Sha256 Hash Mismatch") } pool.itemCount++ pool.items = append(pool.items, id) fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) entries, err := os.ReadDir(poolFolder) if err != nil { return err } newItemCount := 0 for _, e := range entries { if !e.IsDir() { //pool.items = append(pool.items, e.Name()) newItemCount++ } } pool.itemCount = newItemCount //fmt.Printf("Current Pool %s, Recounted ItemCount = %d\n", pool.PoolID, pool.itemCount) if pool.itemCount >= PoolMaxItems { pool.ReadOnly = true } return nil } func removeFromSlice(slice []*Pool, s int) []*Pool { return append(slice[:s], slice[s+1:]...) } func main() { err := InitDatabase() if err != nil { panic(err) } poolMaster, err = NewPoolMaster(PoolPathFinal, PoolPathTemp) if err != nil { panic(err) } // Scan for local existing Pools err = poolMaster.ScanForLocalPools() if err != nil { panic(err) } go func() { for { poolMaster.PackFullPools() poolMaster.CleanWORMTemp() time.Sleep(time.Minute * 2) } }() { poolChannel := make(chan error) // Channel to receive the results var poolChannelCounter = 0 // Initial packing for _, localPool := range poolMaster.LocalPools { if localPool.ReadOnly { dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID) if err != nil { panic(err) } if !dboChunkExists { //spawn thread go func(localPool *Pool) { fmt.Printf("Packing Pool %s\n", localPool.PoolID) packResult, err := poolMaster.PackPool(localPool.PoolID) if err != nil { panic(err) } err = poolMaster.ImportPoolPackResult(packResult) if err != nil { panic(err) } err = poolMaster.MovePoolPackToWORM(packResult) if err != nil { panic(err) } err = poolMaster.WriteRecoveryFile(packResult, localPool) if err != nil { panic(err) } os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) poolChannel <- nil }(localPool) // increment total parallel counter poolChannelCounter++ } //os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) } // packResult.FileCount } // Waiting for them to finish for i := 0; i < poolChannelCounter; i++ { result := <-poolChannel if result != nil { fmt.Printf("PoolChannel Error: %v\n", result) } } } e := echo.New() e.Use(middleware.RecoverWithConfig(middleware.RecoverConfig{ StackSize: 1 << 10, // 1 KB })) //e.Use(middleware.Logger()) e.GET("/", func(c echo.Context) error { return c.String(http.StatusOK, "Hello, World!") }) e.GET("/pmdump", func(c echo.Context) error { return c.JSON(http.StatusOK, poolMaster) }) e.GET("/check/:id", func(c echo.Context) error { id := c.Param("id") fmt.Printf("/check/%s checking...\n", id) exists := poolMaster.Lookup(id) //fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists)) if exists { return c.JSON(http.StatusAlreadyReported, exists) } return c.JSON(http.StatusOK, exists) }) e.GET("/fetch/:id", func(c echo.Context) error { id := c.Param("id") //fmt.Printf("/fetch/%s fetching...\n", id) exists := poolMaster.Lookup(id) if exists { c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream) c.Response().WriteHeader(http.StatusOK) err = poolMaster.Fetch(id, c.Response()) if err != nil { fmt.Printf("%v", err) return c.String(http.StatusInternalServerError, err.Error()) } c.Response().Flush() } else { fmt.Printf("/fetch/%s does not exist\n", id) return c.String(http.StatusNotFound, "Not Found") } return nil }) e.POST("/stash/:id/:size", func(c echo.Context) error { id := c.Param("id") sizeStr := c.Param("size") sizeVal, err := strconv.ParseInt(sizeStr, 10, 64) if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } workerID := c.FormValue("worker") infoJSON := c.FormValue("info") fileInfo := common.DB_File{} err = json.Unmarshal([]byte(infoJSON), &fileInfo) if err != nil { fmt.Println(err) return c.String(http.StatusBadRequest, "Error") } exists := poolMaster.Lookup(id) if exists { fmt.Printf("/stash/%s exists already\n", id) file, err := c.FormFile("file") if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } formStream, err := file.Open() if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } defer formStream.Close() if _, err = io.Copy(io.Discard, formStream); err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } return c.String(http.StatusAlreadyReported, "Exists already") } fmt.Printf("stashing %s", id) file, err := c.FormFile("file") if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } formStream, err := file.Open() if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } defer formStream.Close() err = poolMaster.Store(id, workerID, formStream, sizeVal) if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } fmt.Println("...stashed") fmt.Println(fileInfo) fileInfo.Created = time.Now() _, err = colFile.CreateDocument(arangoCTX, fileInfo) if err != nil { fmt.Println(err) return c.String(http.StatusExpectationFailed, "Error") } return c.JSON(http.StatusOK, true) }) e.Logger.Fatal(e.Start(":13371")) }