From e19548a0b6165c277c37116489d8cbe3f1d82c9a Mon Sep 17 00:00:00 2001 From: cheetah Date: Sat, 24 Jun 2023 15:56:25 -0500 Subject: [PATCH] deadlooooooock --- common/common.go | 52 +++++++++++++++ main.go | 82 ++++++++++++++++------- storageserver/storageserver.go | 117 ++++++++++++++++++++++++--------- 3 files changed, 195 insertions(+), 56 deletions(-) diff --git a/common/common.go b/common/common.go index ef2c1cc..32ea67b 100644 --- a/common/common.go +++ b/common/common.go @@ -136,3 +136,55 @@ func MultipartUpload(client *http.Client, url string, path string) (err error) { } return } + +func MoveFile(sourcePath, destPath string) error { + inputFile, err := os.Open(sourcePath) + if err != nil { + return fmt.Errorf("couldn't open source file: %s", err) + } + outputFile, err := os.Create(destPath) + if err != nil { + inputFile.Close() + return fmt.Errorf("couldn't open dest file: %s", err) + } + defer outputFile.Close() + _, err = io.Copy(outputFile, inputFile) + inputFile.Close() + if err != nil { + return fmt.Errorf("writing to output file failed: %s", err) + } + // The copy was successful, so now delete the original file + err = os.Remove(sourcePath) + if err != nil { + return fmt.Errorf("failed removing original file: %s", err) + } + return nil +} + +type Semaphore interface { + Acquire() + Release() + Close() +} + +type semaphore struct { + semC chan struct{} +} + +func NewSemaphore(maxConcurrency int) Semaphore { + return &semaphore{ + semC: make(chan struct{}, maxConcurrency), + } +} + +func (s *semaphore) Acquire() { + s.semC <- struct{}{} +} + +func (s *semaphore) Release() { + <-s.semC +} + +func (s *semaphore) Close() { + close(s.semC) +} diff --git a/main.go b/main.go index fa9a41f..c341367 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "crypto/tls" + "flag" "fmt" "log" "net/http" @@ -23,13 +24,13 @@ import ( ) var ( - arangoDB adriver.Database - arangoCTX context.Context - colChunk adriver.Collection - colFile adriver.Collection - colFile2Chunk adriver.Collection - colGMA adriver.Collection - colGMA2File adriver.Collection + arangoDB adriver.Database + arangoCTX context.Context + //colChunk adriver.Collection + colFile adriver.Collection + //colFile2Chunk adriver.Collection + colGMA adriver.Collection + colGMA2File adriver.Collection ) func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { @@ -38,7 +39,7 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba // Retry Loop for Failed Connections for i := 0; i < 6; i++ { if i == 5 { - return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i) + return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times!", i) } else if i > 0 { time.Sleep(30 * time.Second) } @@ -79,10 +80,10 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba func InitDatabase() (err error) { arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") - colChunk, err = arangoDB.Collection(arangoCTX, "chunk") + /*colChunk, err = arangoDB.Collection(arangoCTX, "chunk") if err != nil { return err - } + }*/ colFile, err = arangoDB.Collection(arangoCTX, "file") if err != nil { return err @@ -91,10 +92,10 @@ func InitDatabase() (err error) { if err != nil { return err } - colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") + /*colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") if err != nil { return err - } + }*/ colGMA2File, err = arangoDB.Collection(arangoCTX, "gma_file_map") if err != nil { return err @@ -102,8 +103,15 @@ func InitDatabase() (err error) { return nil } +var JobPoolSize int = 5 +var ConcurrencyLimit int = 5 +var WorkerJobPool chan string + func main() { + folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string") debug.SetMemoryLimit(6e9) + flag.Parse() + folderPath := *folderPathP go func() { log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) @@ -132,14 +140,17 @@ func main() { //fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin") //gma, err := gma.NewReader("2143898000.1593250551.bin.gma") - folderPath := "/mnt/SC9000/TemporaryTestingShit/" //"/mnt/worksucc/san1/gma/2/5/4/8/" + //folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/" folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/" - //0 + // entries, err := os.ReadDir(folderPath) if err != nil { panic(err) } skipBla := false + + var WorkerJobPool []string + for _, e := range entries { if !e.IsDir() && skipBla == true { if e.Name() == "2547463094.1626322945.gma" { @@ -149,15 +160,35 @@ func main() { } } if !e.IsDir() { - err = ProcessGMA(filepath.Join(folderPath, e.Name())) - if err != nil { - fmt.Printf("\nERROR: %v\n", err) - //panic(err) - continue - } - os.Rename(filepath.Join(folderPath, e.Name()), filepath.Join(folderPathTarget, e.Name())) + WorkerJobPool = append(WorkerJobPool, filepath.Join(folderPath, e.Name())) } } + + /* + sem := common.NewSemaphore(ConcurrencyLimit) + wg := sync.WaitGroup{} + */ + for _, jobFile := range WorkerJobPool { + //wg.Add(1) + //go func(jobFile string, wg *sync.WaitGroup) { + // sem.Acquire() // Wait for worker to have slot open + + err = ProcessGMA(jobFile) + if err != nil { + fmt.Printf("\nERROR: %v\n", err) + //panic(err) + continue + } + os.Rename(jobFile, filepath.Join(folderPathTarget, filepath.Base(jobFile))) + + // sem.Release() // Release the slot + // wg.Done() // Finish job + //}(job, &wg) + } + + // Wait for all jobs to finish + //wg.Wait() + } func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { @@ -339,8 +370,8 @@ func ProcessGMA(filePath string) (err error) { // process and work withj metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize]) - //fmt.Println("Metaslice") - //fmt.Println(metaSlice) + fmt.Println("Metaslice") + fmt.Println(metaSlice) for _, meta := range metaSlice { if !meta.ID.IsEmpty() { newUnknownFiles = append(newUnknownFiles, meta.Key) @@ -394,9 +425,10 @@ func ProcessGMA(filePath string) (err error) { unknownFileID := fmt.Sprintf("file/%s", unknownFile) for _, dboGMA2File := range dboGMA2Files { if unknownFileID == dboGMA2File.File { - //fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName) + fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName) err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) if err != nil { + fmt.Println("oopsie") undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } @@ -407,6 +439,7 @@ func ProcessGMA(filePath string) (err error) { // TODO : write new gma from arangoinfo // TODO : compare hashes { + fmt.Println("rewriting gma") destPath := filepath.Join(gmaTempPath, "rewrite.gma") dir := filepath.Dir(destPath) @@ -504,6 +537,5 @@ func ProcessGMA(filePath string) (err error) { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } - 0 return nil } diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index c4af500..e0e880a 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -26,9 +26,9 @@ import ( ) var ( - PoolMaxItems = 2500 + PoolMaxItems = 500 PoolPathFinal = "/mnt/SC9000/storagePools" - PoolPathTemp = "/mnt/SC9000/storageTemp" + PoolPathTemp = "/mnt/ramfs/" ) type Pool struct { @@ -58,9 +58,9 @@ type PoolMaster struct { CurrentPool *Pool lock sync.Mutex - LocalPools []Pool - FullPools []Pool - WORMPools map[string]Pool + LocalPools []*Pool + FullPools []*Pool + WORMPools map[string]*Pool } type PoolPackResult struct { PoolID string @@ -207,7 +207,7 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) { func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { poolMaster.finalPath = finalPath poolMaster.cachePath = cachePath - poolMaster.WORMPools = make(map[string]Pool) + poolMaster.WORMPools = make(map[string]*Pool) //poolMaster.lock = sync.Mutex{} destPath := filepath.Join(poolMaster.cachePath, "pool") @@ -228,8 +228,9 @@ func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, e } return poolMaster, nil } -func (p *PoolMaster) NewPool() (pool *Pool, err error) { - pool = &Pool{} +func (p *PoolMaster) NewPool() (*Pool, error) { + var err error + pool := Pool{} pool.PoolID = uuid.NewV4().String() pool.Finalized = false pool.ReadOnly = false @@ -238,35 +239,39 @@ func (p *PoolMaster) NewPool() (pool *Pool, err error) { //dir := filepath.Dir(destPath) err = os.MkdirAll(destPath, os.ModePerm) if err != nil { - return pool, err + return &pool, err } - return pool, nil + return &pool, nil } func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) { //fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems { + fmt.Printf("Aquiring Lock for GetCurrentWriteablepool\n") p.lock.Lock() defer p.lock.Unlock() + defer fmt.Printf("unlock GetCurrentWriteablepool\n") + fmt.Printf("Aquired Lock success for GetCurrentWriteablepool\n") p.CurrentPool.ReadOnly = true - p.FullPools = append(p.FullPools, *p.CurrentPool) + p.FullPools = append(p.FullPools, p.CurrentPool) // queue for compression - fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one", p.CurrentPool.PoolID, p.CurrentPool.itemCount) + fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool.PoolID, p.CurrentPool.itemCount) p.CurrentPool = nil } if p.CurrentPool == nil { - pool, err = p.AcquireNewOrRecoverPool() + fmt.Printf("Creating new Pool") + p.CurrentPool, err = p.AcquireNewOrRecoverPool() + fmt.Printf("... got [%s]\n", p.CurrentPool.PoolID) if err != nil { return pool, err } - p.CurrentPool = pool - return pool, nil } return p.CurrentPool, nil } -func RestorePoolFromFolder(folderPath string) (pool Pool, err error) { +func RestorePoolFromFolder(folderPath string) (pool *Pool, err error) { + pool = &Pool{} pool.PoolID = path.Base(folderPath) entries, err := os.ReadDir(folderPath) @@ -285,8 +290,11 @@ func RestorePoolFromFolder(folderPath string) (pool Pool, err error) { return pool, err } func (p *PoolMaster) ScanForLocalPools() (err error) { + fmt.Printf("Aquiring Lock for ScanForLocalPools\n") p.lock.Lock() defer p.lock.Unlock() + defer fmt.Printf("unlock ScanForLocalPools\n") + fmt.Printf("Aquired Lock success for ScanForLocalPools\n") entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool")) if err != nil { return err @@ -401,7 +409,10 @@ func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) { startTime := time.Now() targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", packResult.PoolID)) - os.Rename(packResult.outputFileName, targetFileName) + err = common.MoveFile(packResult.outputFileName, targetFileName) + if err != nil { + return err + } tarFileCheck, err := os.Open(targetFileName) if err != nil { @@ -427,8 +438,11 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err startTime := time.Now() packResult.PoolID = poolID + fmt.Printf("Aquiring Lock for PackPool(%s)\n", poolID) p.lock.Lock() defer p.lock.Unlock() + defer fmt.Printf("unlock PackPool\n") + fmt.Printf("Aquired Lock success for PackPool(%s)\n", poolID) packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID)) tarFile, err := os.Create(packResult.outputFileName) @@ -521,8 +535,8 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { // p.NewPool() for _, localPool := range p.LocalPools { - if !localPool.ReadOnly { - return &localPool, nil + if !localPool.ReadOnly && localPool.itemCount < 500 { + return localPool, nil } } return p.NewPool() @@ -585,8 +599,11 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ // else load wormPool into disk-cache extract to "worm" // wormMode + fmt.Printf("Aquiring Lock for FetchLoadWORM\n") p.lock.Lock() defer p.lock.Unlock() + defer fmt.Printf("unlock FetchLoadWORM\n") + fmt.Printf("Aquired Lock success for FetchLoadWORM\n") var dboChunk common.DB_Chunk _, err = colChunk.ReadDocument(arangoCTX, chunkID, &dboChunk) @@ -607,7 +624,7 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ return err } fmt.Println("extracted") - p.WORMPools[loadedWormPool.PoolID] = loadedWormPool + p.WORMPools[loadedWormPool.PoolID] = &loadedWormPool return loadedWormPool.Fetch(fileID, writer) //return nil } @@ -707,9 +724,11 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro } fmt.Printf("Store(%s)\n", id) - + fmt.Printf("Aquiring Lock for Store\n") p.lock.Lock() defer p.lock.Unlock() + defer fmt.Printf("unlock Store\n") + fmt.Printf("Aquired Lock success for Store\n") // figuring out paths poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID) destPath := filepath.Join(poolFolder, id) @@ -764,7 +783,9 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro return nil } - +func removeFromSlice(slice []*Pool, s int) []*Pool { + return append(slice[:s], slice[s+1:]...) +} func main() { err := InitDatabase() if err != nil { @@ -781,21 +802,55 @@ func main() { panic(err) } + go func() { + for { + for index, fullPool := range poolMaster.FullPools { + poolMaster.lock.Lock() + + packResult, err := poolMaster.PackPool(fullPool.PoolID) + if err != nil { + panic(err) + } + err = poolMaster.ImportPoolPackResult(packResult) + if err != nil { + panic(err) + } + err = poolMaster.MovePoolPackToWORM(packResult) + if err != nil { + panic(err) + } + os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) + poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index) + + poolMaster.lock.Unlock() + } + time.Sleep(time.Second * 10) + } + }() for _, localPool := range poolMaster.LocalPools { if localPool.ReadOnly { - fmt.Printf("Packing Pool %s\n", localPool.PoolID) - packResult, err := poolMaster.PackPool(localPool.PoolID) - if err != nil { - panic(err) - } - err = poolMaster.ImportPoolPackResult(packResult) + + dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID) if err != nil { panic(err) } - err = poolMaster.MovePoolPackToWORM(packResult) - if err != nil { - panic(err) + if !dboChunkExists { + fmt.Printf("Packing Pool %s\n", localPool.PoolID) + packResult, err := poolMaster.PackPool(localPool.PoolID) + if err != nil { + panic(err) + } + err = poolMaster.ImportPoolPackResult(packResult) + if err != nil { + panic(err) + } + err = poolMaster.MovePoolPackToWORM(packResult) + if err != nil { + panic(err) + } + os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) } + //os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) } // packResult.FileCount }