From c0145d7f31fa07123818f192c8d0beadde8e1be2 Mon Sep 17 00:00:00 2001 From: cheetah Date: Thu, 6 Jul 2023 07:59:18 -0500 Subject: [PATCH] updated some code :-) --- common/common.go | 1 + main.go | 52 ++++++++++++++++++++++++------------------------ 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/common/common.go b/common/common.go index 6f4c28f..3233a84 100644 --- a/common/common.go +++ b/common/common.go @@ -28,6 +28,7 @@ type DB_GMA struct { Header gma.GMAHeader `json:"header"` FirstType int32 `json:"firstType"` Success bool `json:"success"` + RetryCounter int `json:"retries"` } type DB_File struct { diff --git a/main.go b/main.go index 2575322..bae0626 100644 --- a/main.go +++ b/main.go @@ -328,17 +328,23 @@ func modeIngress(folderPath string, skipName string) { pw.Style().Visibility.Speed = true pw.Style().Visibility.SpeedOverall = false pw.Style().Visibility.Time = true - pw.Style().Visibility.TrackerOverall = true + pw.Style().Visibility.TrackerOverall = false pw.Style().Visibility.Value = true pw.Style().Visibility.Pinned = true // call Render() in async mode; yes we don't have any trackers at the moment go pw.Render() + trackerDoneMarker := sync.Once{} + tracker := progress.Tracker{Message: fmt.Sprintf("Working %d GMAs", len(WorkerJobPool)), Total: int64(len(WorkerJobPool)), Units: progress.UnitsDefault} + pw.AppendTracker(&tracker) + defer trackerDoneMarker.Do(tracker.MarkAsDone) + for _, jobFile := range WorkerJobPool { wg.Add(1) go func(jobFile string, wg *sync.WaitGroup) { defer wg.Done() + defer tracker.Increment(1) err = ProcessGMA(pw, jobFile) if err != nil { log.Printf("\nERROR: %v\n", err) @@ -351,7 +357,7 @@ func modeIngress(folderPath string, skipName string) { wg.Wait() } func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { - log.Printf("undoBatch(%v, %s)\n", undoBatch, gmaID) + //log.Printf("undoBatch(%v, %s)\n", undoBatch, gmaID) /* _, err = colGMA.RemoveDocument(arangoCTX, gmaID) if err != nil { @@ -375,11 +381,11 @@ func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []str func ProcessGMA(pw progress.Writer, filePath string) (err error) { var unlockOnce sync.Once - fmt.Println("trying to acquire global write lock") + //fmt.Println("trying to acquire global write lock") GlobalWriteLock.Lock() // Wait for worker to have slot open - fmt.Println("aquired global write lock") + //fmt.Println("aquired global write lock") defer unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway - defer fmt.Println("unlocking GlobalWriteLock") + //defer fmt.Println("unlocking GlobalWriteLock") //time.Sleep(5 * time.Second) var ( @@ -427,7 +433,9 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { return fmt.Errorf("GMA File too small, skipping") } niceName := filepath.Base(filePath) + trackerProcessDoneMarker := sync.Once{} trackerProcess := progress.Tracker{Message: fmt.Sprintf("Extracting %s", niceName), Total: 0, Units: progress.UnitsDefault} + defer trackerProcessDoneMarker.Do(trackerProcess.MarkAsDone) pw.AppendTracker(&trackerProcess) //log.Printf("Opening %s\n", filePath) @@ -476,6 +484,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { if err != nil { return err } + trackerProcess.UpdateTotal(int64(len(files))) dboGMA.FirstType = firstType //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) @@ -515,7 +524,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { } if extractMeta.ExtractedCRC != extractMeta.OriginalMeta.CRC { - log.Printf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName) + pw.Log(fmt.Sprintf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName)) } //fmt.Printf("ExtractedMeta %s CRC: %d SHA256: %s\n", file.FileName, extractMeta.ExtractedCRC, extractMeta.ExtractedSHA256) dboFile := common.DB_File{ @@ -549,6 +558,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { //fmt.Println(dboGMA2File) gma2FileIDs = append(gma2FileIDs, dboGMA2File.ID) dboGMA2Files = append(dboGMA2Files, dboGMA2File) + trackerProcess.Increment(1) } lastFile := files[len(files)-1] @@ -584,14 +594,16 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { } dboExistFile2GMA[dboGMA2File.ID] = exists } - trackerProcess.MarkAsDone() + trackerProcessDoneMarker.Do(trackerProcess.MarkAsDone) // TODO: upload all unknownNewFiles to StorageServer http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200 var httpClient *http.Client = http.DefaultClient + trackerUploadDoneMarker := sync.Once{} trackerUpload := progress.Tracker{Message: fmt.Sprintf("Uploading %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault} pw.AppendTracker(&trackerUpload) + defer trackerUploadDoneMarker.Do(trackerUpload.MarkAsDone) for _, dboFile := range dboFiles { dboFileID := fmt.Sprintf("file/%s", dboFile.ID) @@ -610,6 +622,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { //body, _ := ioutil.ReadAll(res.Body) //fmt.Printf("res.StatusCode = %d\n", res.StatusCode) if res.StatusCode == http.StatusAlreadyReported { + trackerUpload.UpdateMessage(fmt.Sprintf("Skipping %s", niceName)) trackerUpload.Increment(1) continue } @@ -619,21 +632,6 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { //log.Println("Found dboFileID == dboGMA2File.Ref ID") uploadSuccess := true for { - //log.Printf("Uploading %s to Storage\n", dboGMA2File.UploadID) - // TODO: move file management to storageserver - /*existsFile, err := colFile.DocumentExists(arangoCTX, dboFile.ID) - if err != nil { - log.Println("err @colFile.DocumentExist") - return err - } - if !existsFile { - _, err := colFile.CreateDocument(arangoCTX, dboFile) - if err != nil { - // TODO: error handling - log.Println("err @colFile.CreateDocument") - return err - } - }*/ fileInfoJSON, err := json.Marshal(dboFile) if err != nil { log.Println("err @json.Marshal dboFile") @@ -641,6 +639,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { } //uploadBar.Describe("Uploading") + trackerUpload.UpdateMessage(fmt.Sprintf("Uploading %s", niceName)) err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON, workerID) if err != nil { log.Println("err @common.MultipartUpload") @@ -685,7 +684,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { } } } - trackerUpload.MarkAsDone() + trackerUploadDoneMarker.Do(trackerUpload.MarkAsDone) // at this point we can release the write semaphore unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway //fmt.Println("unlocking GlobalWriteLock") @@ -694,9 +693,10 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { // TODO : write new gma from arangoinfo // TODO : compare hashes { - log.Println("rewriting gma") + trackerRewriteDoneMarker := sync.Once{} trackerRewrite := progress.Tracker{Message: fmt.Sprintf("Rewriting %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault} pw.AppendTracker(&trackerRewrite) + defer trackerRewriteDoneMarker.Do(trackerRewrite.MarkAsDone) destPath := filepath.Join(gmaTempPath, "rewrite.gma") dir := filepath.Dir(destPath) @@ -754,7 +754,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { trackerRewrite.Increment(1) } gmaWriter.FileHandle.Seek(0, 2) - log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) + //log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC) // TODO: maybe use io.MultiWriter ?? @@ -785,7 +785,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) { trackerRewrite.MarkAsErrored() return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) } - trackerRewrite.MarkAsDone() + trackerRewriteDoneMarker.Do(trackerRewrite.MarkAsDone) } // TODO: 4... profit?