diff --git a/main.go b/main.go index babba1f..afdb0e7 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "runtime/debug" "sort" "strings" + "sync" "time" "git.cheetah.cat/worksucc/gma-puzzles/common" @@ -109,8 +110,11 @@ func InitDatabase() (err error) { } var JobPoolSize int = 5 -var ConcurrencyLimit int = 5 +var ConcurrencyLimit int = 1 var WorkerJobPool chan string +var GlobalWriteSemaphore common.Semaphore + +var GlobalWriteLock sync.Mutex func main() { folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string") @@ -170,30 +174,22 @@ func main() { } } - /* - sem := common.NewSemaphore(ConcurrencyLimit) - wg := sync.WaitGroup{} - */ - for _, jobFile := range WorkerJobPool { - //wg.Add(1) - //go func(jobFile string, wg *sync.WaitGroup) { - // sem.Acquire() // Wait for worker to have slot open + GlobalWriteSemaphore = common.NewSemaphore(ConcurrencyLimit) + wg := sync.WaitGroup{} - err = ProcessGMA(jobFile) - if err != nil { - log.Printf("\nERROR: %v\n", err) - //panic(err) - continue - } - //os.Rename(jobFile, filepath.Join(folderPathTarget, filepath.Base(jobFile))) - - // sem.Release() // Release the slot - // wg.Done() // Finish job - //}(job, &wg) + for _, jobFile := range WorkerJobPool { + wg.Add(1) + go func(jobFile string, wg *sync.WaitGroup) { + err = ProcessGMA(jobFile) + if err != nil { + log.Printf("\nERROR: %v\n", err) + } + wg.Done() + }(jobFile, &wg) } // Wait for all jobs to finish - //wg.Wait() + wg.Wait() } @@ -220,6 +216,15 @@ func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []str return nil } func ProcessGMA(filePath string) (err error) { + var unlockOnce sync.Once + + fmt.Println("trying to acquire global write lock") + GlobalWriteLock.Lock() // Wait for worker to have slot open + fmt.Println("aquired global write lock") + defer unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway + defer fmt.Println("unlocking GlobalWriteLock") + time.Sleep(5 * time.Second) + var ( fileIDs []string gma2FileIDs []string @@ -369,73 +374,6 @@ func ProcessGMA(filePath string) (err error) { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } - /* - importStartTime := time.Now() - - var newUnknownFiles []string - chunkSize := 5 - for { - if len(dboFiles) == 0 { - break - } - - // necessary check to avoid slicing beyond - // slice capacity - if len(dboFiles) < chunkSize { - chunkSize = len(dboFiles) - } - - // process and work withj - metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize]) - - for _, meta := range metaSlice { - if !meta.ID.IsEmpty() { - newUnknownFiles = append(newUnknownFiles, meta.Key) - fileIDs = append(fileIDs, meta.Key) - } - } - //fmt.Println("ErrorSlice") - //fmt.Println(errorSlice) - for _, createError := range errorSlice { - if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") { - } else if createError != nil { - undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) - return createError - } - } - - dboFiles = dboFiles[chunkSize:] - } - log.Println() - log.Printf("Imported dboFiles into Arango and now we have %d new files from %d addon files\n", len(newUnknownFiles), len(files)) - deltaFileSize := int64(0) - for _, unknownFile := range newUnknownFiles { - unknownFileID := fmt.Sprintf("file/%s", unknownFile) - for _, dboGMA2File := range dboGMA2Files { - if unknownFileID == dboGMA2File.File { - deltaFileSize += dboGMA2File.FileSize - } - } - } - dboGMA.OptimizedSize = deltaFileSize - log.Printf("Delta Storage %d bytes\n", deltaFileSize) - */ - /* - _, err = colGMA2File.ImportDocuments(arangoCTX, dboGMA2Files, &adriver.ImportDocumentOptions{ - OnDuplicate: adriver.ImportOnDuplicateIgnore, - //FromPrefix: "gma/", - //ToPrefix: "file/", - Complete: true, // will make it fail if any error occurs (and hopefully reverse the trans-action) - }) - if err != nil { - undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) - return fmt.Errorf("ImportDocuments File fail: %v", err) - } - //fmt.Printf("Code: %d, Created: %d, Ignored: %d, Errors: %d", statsImportGMA2File.Code, statsImportGMA2File.Created, statsImportGMA2File.Ignored, statsImportGMA2File.Errors) - - log.Printf("Import Duration %dms\n", time.Since(importStartTime).Milliseconds()) - log.Println() - */ // TODO: Check all dboFiles and dboGMA2Files if they exist, if something is odd, queue reupload dboExistFile := map[string]bool{} @@ -550,6 +488,11 @@ func ProcessGMA(filePath string) (err error) { } } } + + // at this point we can release the write semaphore + unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway + fmt.Println("unlocking GlobalWriteLock") + // TODO : fetch all files from storageServer // TODO : write new gma from arangoinfo // TODO : compare hashes