@ -328,17 +328,23 @@ func modeIngress(folderPath string, skipName string) {
pw . Style ( ) . Visibility . Speed = true
pw . Style ( ) . Visibility . Speed = true
pw . Style ( ) . Visibility . SpeedOverall = false
pw . Style ( ) . Visibility . SpeedOverall = false
pw . Style ( ) . Visibility . Time = true
pw . Style ( ) . Visibility . Time = true
pw . Style ( ) . Visibility . TrackerOverall = tru e
pw . Style ( ) . Visibility . TrackerOverall = fals e
pw . Style ( ) . Visibility . Value = true
pw . Style ( ) . Visibility . Value = true
pw . Style ( ) . Visibility . Pinned = true
pw . Style ( ) . Visibility . Pinned = true
// call Render() in async mode; yes we don't have any trackers at the moment
// call Render() in async mode; yes we don't have any trackers at the moment
go pw . Render ( )
go pw . Render ( )
trackerDoneMarker := sync . Once { }
tracker := progress . Tracker { Message : fmt . Sprintf ( "Working %d GMAs" , len ( WorkerJobPool ) ) , Total : int64 ( len ( WorkerJobPool ) ) , Units : progress . UnitsDefault }
pw . AppendTracker ( & tracker )
defer trackerDoneMarker . Do ( tracker . MarkAsDone )
for _ , jobFile := range WorkerJobPool {
for _ , jobFile := range WorkerJobPool {
wg . Add ( 1 )
wg . Add ( 1 )
go func ( jobFile string , wg * sync . WaitGroup ) {
go func ( jobFile string , wg * sync . WaitGroup ) {
defer wg . Done ( )
defer wg . Done ( )
defer tracker . Increment ( 1 )
err = ProcessGMA ( pw , jobFile )
err = ProcessGMA ( pw , jobFile )
if err != nil {
if err != nil {
log . Printf ( "\nERROR: %v\n" , err )
log . Printf ( "\nERROR: %v\n" , err )
@ -351,7 +357,7 @@ func modeIngress(folderPath string, skipName string) {
wg . Wait ( )
wg . Wait ( )
}
}
func undoBatch ( undoBatch bool , gmaID string , fileIDs [ ] string , gma2FileIDs [ ] string ) ( err error ) {
func undoBatch ( undoBatch bool , gmaID string , fileIDs [ ] string , gma2FileIDs [ ] string ) ( err error ) {
log . Printf ( "undoBatch(%v, %s)\n" , undoBatch , gmaID )
//log.Printf("undoBatch(%v, %s)\n", undoBatch, gmaID)
/ *
/ *
_ , err = colGMA . RemoveDocument ( arangoCTX , gmaID )
_ , err = colGMA . RemoveDocument ( arangoCTX , gmaID )
if err != nil {
if err != nil {
@ -375,11 +381,11 @@ func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []str
func ProcessGMA ( pw progress . Writer , filePath string ) ( err error ) {
func ProcessGMA ( pw progress . Writer , filePath string ) ( err error ) {
var unlockOnce sync . Once
var unlockOnce sync . Once
fmt . Println ( "trying to acquire global write lock" )
//fmt.Println("trying to acquire global write lock")
GlobalWriteLock . Lock ( ) // Wait for worker to have slot open
GlobalWriteLock . Lock ( ) // Wait for worker to have slot open
fmt . Println ( "aquired global write lock" )
//fmt.Println("aquired global write lock")
defer unlockOnce . Do ( GlobalWriteLock . Unlock ) // release anyway
defer unlockOnce . Do ( GlobalWriteLock . Unlock ) // release anyway
defer fmt . Println ( "unlocking GlobalWriteLock" )
//defer fmt.Println("unlocking GlobalWriteLock")
//time.Sleep(5 * time.Second)
//time.Sleep(5 * time.Second)
var (
var (
@ -427,7 +433,9 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
return fmt . Errorf ( "GMA File too small, skipping" )
return fmt . Errorf ( "GMA File too small, skipping" )
}
}
niceName := filepath . Base ( filePath )
niceName := filepath . Base ( filePath )
trackerProcessDoneMarker := sync . Once { }
trackerProcess := progress . Tracker { Message : fmt . Sprintf ( "Extracting %s" , niceName ) , Total : 0 , Units : progress . UnitsDefault }
trackerProcess := progress . Tracker { Message : fmt . Sprintf ( "Extracting %s" , niceName ) , Total : 0 , Units : progress . UnitsDefault }
defer trackerProcessDoneMarker . Do ( trackerProcess . MarkAsDone )
pw . AppendTracker ( & trackerProcess )
pw . AppendTracker ( & trackerProcess )
//log.Printf("Opening %s\n", filePath)
//log.Printf("Opening %s\n", filePath)
@ -476,6 +484,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
if err != nil {
if err != nil {
return err
return err
}
}
trackerProcess . UpdateTotal ( int64 ( len ( files ) ) )
dboGMA . FirstType = firstType
dboGMA . FirstType = firstType
//fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset())
//fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset())
@ -515,7 +524,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
}
}
if extractMeta . ExtractedCRC != extractMeta . OriginalMeta . CRC {
if extractMeta . ExtractedCRC != extractMeta . OriginalMeta . CRC {
log. P rintf( "gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n" , filePath , extractMeta . OriginalMeta . CRC , extractMeta . ExtractedCRC , extractMeta . OriginalMeta . FileName )
pw. Log ( fmt . Sp rintf( "gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n" , filePath , extractMeta . OriginalMeta . CRC , extractMeta . ExtractedCRC , extractMeta . OriginalMeta . FileName ) )
}
}
//fmt.Printf("ExtractedMeta %s CRC: %d SHA256: %s\n", file.FileName, extractMeta.ExtractedCRC, extractMeta.ExtractedSHA256)
//fmt.Printf("ExtractedMeta %s CRC: %d SHA256: %s\n", file.FileName, extractMeta.ExtractedCRC, extractMeta.ExtractedSHA256)
dboFile := common . DB_File {
dboFile := common . DB_File {
@ -549,6 +558,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
//fmt.Println(dboGMA2File)
//fmt.Println(dboGMA2File)
gma2FileIDs = append ( gma2FileIDs , dboGMA2File . ID )
gma2FileIDs = append ( gma2FileIDs , dboGMA2File . ID )
dboGMA2Files = append ( dboGMA2Files , dboGMA2File )
dboGMA2Files = append ( dboGMA2Files , dboGMA2File )
trackerProcess . Increment ( 1 )
}
}
lastFile := files [ len ( files ) - 1 ]
lastFile := files [ len ( files ) - 1 ]
@ -584,14 +594,16 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
}
}
dboExistFile2GMA [ dboGMA2File . ID ] = exists
dboExistFile2GMA [ dboGMA2File . ID ] = exists
}
}
trackerProcess . MarkAsDone( )
trackerProcess DoneMarker . Do( trackerProcess . MarkAsDone)
// TODO: upload all unknownNewFiles to StorageServer
// TODO: upload all unknownNewFiles to StorageServer
http . DefaultTransport . ( * http . Transport ) . MaxIdleConnsPerHost = 200
http . DefaultTransport . ( * http . Transport ) . MaxIdleConnsPerHost = 200
var httpClient * http . Client = http . DefaultClient
var httpClient * http . Client = http . DefaultClient
trackerUploadDoneMarker := sync . Once { }
trackerUpload := progress . Tracker { Message : fmt . Sprintf ( "Uploading %s" , niceName ) , Total : int64 ( len ( dboFiles ) ) , Units : progress . UnitsDefault }
trackerUpload := progress . Tracker { Message : fmt . Sprintf ( "Uploading %s" , niceName ) , Total : int64 ( len ( dboFiles ) ) , Units : progress . UnitsDefault }
pw . AppendTracker ( & trackerUpload )
pw . AppendTracker ( & trackerUpload )
defer trackerUploadDoneMarker . Do ( trackerUpload . MarkAsDone )
for _ , dboFile := range dboFiles {
for _ , dboFile := range dboFiles {
dboFileID := fmt . Sprintf ( "file/%s" , dboFile . ID )
dboFileID := fmt . Sprintf ( "file/%s" , dboFile . ID )
@ -610,6 +622,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
//body, _ := ioutil.ReadAll(res.Body)
//body, _ := ioutil.ReadAll(res.Body)
//fmt.Printf("res.StatusCode = %d\n", res.StatusCode)
//fmt.Printf("res.StatusCode = %d\n", res.StatusCode)
if res . StatusCode == http . StatusAlreadyReported {
if res . StatusCode == http . StatusAlreadyReported {
trackerUpload . UpdateMessage ( fmt . Sprintf ( "Skipping %s" , niceName ) )
trackerUpload . Increment ( 1 )
trackerUpload . Increment ( 1 )
continue
continue
}
}
@ -619,21 +632,6 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
//log.Println("Found dboFileID == dboGMA2File.Ref ID")
//log.Println("Found dboFileID == dboGMA2File.Ref ID")
uploadSuccess := true
uploadSuccess := true
for {
for {
//log.Printf("Uploading %s to Storage\n", dboGMA2File.UploadID)
// TODO: move file management to storageserver
/ * existsFile , err := colFile . DocumentExists ( arangoCTX , dboFile . ID )
if err != nil {
log . Println ( "err @colFile.DocumentExist" )
return err
}
if ! existsFile {
_ , err := colFile . CreateDocument ( arangoCTX , dboFile )
if err != nil {
// TODO: error handling
log . Println ( "err @colFile.CreateDocument" )
return err
}
} * /
fileInfoJSON , err := json . Marshal ( dboFile )
fileInfoJSON , err := json . Marshal ( dboFile )
if err != nil {
if err != nil {
log . Println ( "err @json.Marshal dboFile" )
log . Println ( "err @json.Marshal dboFile" )
@ -641,6 +639,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
}
}
//uploadBar.Describe("Uploading")
//uploadBar.Describe("Uploading")
trackerUpload . UpdateMessage ( fmt . Sprintf ( "Uploading %s" , niceName ) )
err = common . MultipartUpload ( httpClient , fmt . Sprintf ( "http://127.0.0.1:13371/stash/%s/%d" , dboGMA2File . UploadID , dboGMA2File . FileSize ) , dboGMA2File . LocalFileName , fileInfoJSON , workerID )
err = common . MultipartUpload ( httpClient , fmt . Sprintf ( "http://127.0.0.1:13371/stash/%s/%d" , dboGMA2File . UploadID , dboGMA2File . FileSize ) , dboGMA2File . LocalFileName , fileInfoJSON , workerID )
if err != nil {
if err != nil {
log . Println ( "err @common.MultipartUpload" )
log . Println ( "err @common.MultipartUpload" )
@ -685,7 +684,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
}
}
}
}
}
}
trackerUpload . MarkAsDone( )
trackerUpload DoneMarker . Do( trackerUpload . MarkAsDone)
// at this point we can release the write semaphore
// at this point we can release the write semaphore
unlockOnce . Do ( GlobalWriteLock . Unlock ) // release anyway
unlockOnce . Do ( GlobalWriteLock . Unlock ) // release anyway
//fmt.Println("unlocking GlobalWriteLock")
//fmt.Println("unlocking GlobalWriteLock")
@ -694,9 +693,10 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
// TODO : write new gma from arangoinfo
// TODO : write new gma from arangoinfo
// TODO : compare hashes
// TODO : compare hashes
{
{
log. Println ( "rewriting gma" )
trackerRewriteDoneMarker := sync . Once { }
trackerRewrite := progress . Tracker { Message : fmt . Sprintf ( "Rewriting %s" , niceName ) , Total : int64 ( len ( dboFiles ) ) , Units : progress . UnitsDefault }
trackerRewrite := progress . Tracker { Message : fmt . Sprintf ( "Rewriting %s" , niceName ) , Total : int64 ( len ( dboFiles ) ) , Units : progress . UnitsDefault }
pw . AppendTracker ( & trackerRewrite )
pw . AppendTracker ( & trackerRewrite )
defer trackerRewriteDoneMarker . Do ( trackerRewrite . MarkAsDone )
destPath := filepath . Join ( gmaTempPath , "rewrite.gma" )
destPath := filepath . Join ( gmaTempPath , "rewrite.gma" )
dir := filepath . Dir ( destPath )
dir := filepath . Dir ( destPath )
@ -754,7 +754,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
trackerRewrite . Increment ( 1 )
trackerRewrite . Increment ( 1 )
}
}
gmaWriter . FileHandle . Seek ( 0 , 2 )
gmaWriter . FileHandle . Seek ( 0 , 2 )
log . Printf ( "Writing Footer CRC %d\n\n" , dboGMA . FooterAddonCRC )
//log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC)
gmaWriter . WriteFooterCRC ( dboGMA . FooterAddonCRC )
gmaWriter . WriteFooterCRC ( dboGMA . FooterAddonCRC )
// TODO: maybe use io.MultiWriter ??
// TODO: maybe use io.MultiWriter ??
@ -785,7 +785,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
trackerRewrite . MarkAsErrored ( )
trackerRewrite . MarkAsErrored ( )
return fmt . Errorf ( "RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)" , dboGMA . GMAHash , dboGMA . GMASize , writeHash , writeSize )
return fmt . Errorf ( "RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)" , dboGMA . GMAHash , dboGMA . GMASize , writeHash , writeSize )
}
}
trackerRewrite . MarkAsDone( )
trackerRewrite DoneMarker . Do( trackerRewrite . MarkAsDone)
}
}
// TODO: 4... profit?
// TODO: 4... profit?