added rebuild feature
This commit is contained in:
parent
01b5a21ebd
commit
1d644e083f
1 changed files with 144 additions and 29 deletions
173
main.go
173
main.go
|
@ -118,46 +118,163 @@ var WorkerJobPool chan string
|
|||
var GlobalWriteLock sync.Mutex
|
||||
|
||||
func main() {
|
||||
workerModeP := flag.String("mode", "", "mode (ingress, rebuild)")
|
||||
debugEnabled := flag.Bool("debug", false, "enables debug")
|
||||
folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string")
|
||||
skipNameP := flag.String("skip", "", "skip until this name")
|
||||
workerNameP := flag.String("worker", "def", "worker name")
|
||||
rebuildIDP := flag.String("id", "", "id to rebuild")
|
||||
|
||||
debug.SetMemoryLimit(6e9)
|
||||
flag.Parse()
|
||||
folderPath := *folderPathP
|
||||
skipName := *skipNameP
|
||||
skipNameEnabled := len(skipName) > 0
|
||||
workerID = *workerNameP
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
|
||||
}()
|
||||
|
||||
workerMode := *workerModeP
|
||||
|
||||
if *debugEnabled {
|
||||
debug.SetMemoryLimit(6e9)
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
|
||||
}()
|
||||
}
|
||||
|
||||
err := InitDatabase()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
/*
|
||||
err = colGMA.Truncate(arangoCTX)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = colFile.Truncate(arangoCTX)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = colGMA2File.Truncate(arangoCTX)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}*/
|
||||
switch workerMode {
|
||||
case "ingress":
|
||||
/*
|
||||
folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string")
|
||||
skipNameP := flag.String("skip", "", "skip until this name")
|
||||
workerNameP := flag.String("worker", "def", "worker name")
|
||||
flag.Parse()
|
||||
*/
|
||||
workerID = *workerNameP
|
||||
modeIngress(*folderPathP, *skipNameP)
|
||||
case "rebuild":
|
||||
// rebuildIDP := flag.String("id", "", "id to rebuild")
|
||||
flag.Parse()
|
||||
modeRebuild(*rebuildIDP)
|
||||
}
|
||||
}
|
||||
func modeRebuild(id string) (err error) {
|
||||
var (
|
||||
dboGMA common.DB_GMA
|
||||
dboGMA2Files []common.DB_GMA2File
|
||||
)
|
||||
|
||||
// /mnt/worksucc/san2/gma/2/1/2/3/2123406190.1591573904.gma
|
||||
//fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin")
|
||||
//gma, err := gma.NewReader("2143898000.1593250551.bin.gma")
|
||||
_, err = colGMA.ReadDocument(arangoCTX, id, &dboGMA)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
|
||||
//folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
|
||||
//
|
||||
cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR gf IN gma_file_map FILTER gf._from == 'gma/%s' RETURN gf", dboGMA.ID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer cursor.Close()
|
||||
if cursor.Count() > 0 || cursor.HasMore() {
|
||||
for {
|
||||
gma2File := common.DB_GMA2File{}
|
||||
_, err = cursor.ReadDocument(arangoCTX, &gma2File)
|
||||
if driver.IsNoMoreDocuments(err) {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
gma2File.UploadID = gma2File.File[5:]
|
||||
dboGMA2Files = append(dboGMA2Files, gma2File)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("no files for gma available")
|
||||
}
|
||||
|
||||
{
|
||||
log.Println("rewriting gma")
|
||||
rewriteBar := progressbar.Default(int64(len(dboGMA2Files)), "Rewriting GMA")
|
||||
destPath := fmt.Sprintf("./%s.gma", id) //filepath.Join(gmaTempPath, "rewrite.gma")
|
||||
dir := filepath.Dir(destPath)
|
||||
|
||||
err := os.MkdirAll(dir, os.ModePerm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gmaWriter, err := gma.NewWriter(destPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer gmaWriter.Close()
|
||||
|
||||
//fmt.Printf("Writing Header with FormatVersion: %d\n", dboGMA.Header.FormatVersion)
|
||||
err = gmaWriter.WriteHeader(dboGMA.Header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = gmaWriter.WriteFirstType(dboGMA.FirstType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sort.SliceStable(dboGMA2Files, func(i, j int) bool { return dboGMA2Files[i].FileNumber < dboGMA2Files[j].FileNumber })
|
||||
for _, dboGMA2File := range dboGMA2Files {
|
||||
//fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber)
|
||||
err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var httpClient *http.Client = &http.Client{
|
||||
Timeout: 15 * time.Minute,
|
||||
}
|
||||
|
||||
for _, dboGMA2File := range dboGMA2Files {
|
||||
//fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID)
|
||||
resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
err = gmaWriter.WriteFile(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rewriteBar.Add(1)
|
||||
}
|
||||
gmaWriter.FileHandle.Seek(0, 2)
|
||||
log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC)
|
||||
gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC)
|
||||
|
||||
// TODO: maybe use io.MultiWriter ??
|
||||
gmaWriter.FileHandle.Seek(0, 0)
|
||||
writeHash, err := gmaWriter.GetSHA256()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath)
|
||||
log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath)
|
||||
log.Println()
|
||||
writeStat, err := os.Stat(destPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
writeSize := writeStat.Size()
|
||||
if writeSize != dboGMA.GMASize {
|
||||
//fail
|
||||
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
|
||||
}
|
||||
if writeHash != dboGMA.GMAHash {
|
||||
//fail
|
||||
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func modeIngress(folderPath string, skipName string) {
|
||||
skipNameEnabled := len(skipName) > 0
|
||||
entries, err := os.ReadDir(folderPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -192,9 +309,7 @@ func main() {
|
|||
|
||||
// Wait for all jobs to finish
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) {
|
||||
log.Printf("undoBatch(%v, %s)\n", undoBatch, gmaID)
|
||||
/*
|
||||
|
|
Loading…
Add table
Reference in a new issue