You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1223 lines
37 KiB
Go

package main
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strings"
"sync"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/common"
"git.cheetah.cat/worksucc/gma-puzzles/gma"
"github.com/arangodb/go-driver"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/schollz/progressbar/v3"
"github.com/twinj/uuid"
"github.com/jedib0t/go-pretty/v6/progress"
_ "net/http/pprof"
)
var (
arangoDB adriver.Database
arangoCTX context.Context
//colChunk adriver.Collection
colFile adriver.Collection
//colFile2Chunk adriver.Collection
colGMA adriver.Collection
colGMA2File adriver.Collection
colAliases adriver.Collection
workerID string
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
log.Println("connectDB:", "Starting Connection Process...")
// Retry Loop for Failed Connections
for i := 0; i < 6; i++ {
if i == 5 {
return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i)
} else if i > 0 {
time.Sleep(30 * time.Second)
}
// Connect to ArangoDB URL
conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{
Endpoints: []string{baseURL},
TLSConfig: &tls.Config{ /*...*/ },
})
if err != nil {
log.Println("connectDB:", "Cannot Connect to ArangoDB!", err)
continue
}
// Connect Driver to User
client, err := adriver.NewClient(adriver.ClientConfig{
Connection: conn,
Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD),
})
if err != nil {
log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err)
continue
}
// Create Context for Database Access
ctx = context.Background()
driver, err = client.Database(ctx, arangoDatabase)
if err != nil {
log.Println("connectDB:", "Cannot Load ArangoDB Database!", err)
continue
}
log.Println("connectDB:", "Connection Sucessful!")
return driver, ctx, nil
}
return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?")
}
func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB(common.ArangoHost, common.ArangoUser, common.ArangoPass, common.ArangoDatabase)
if err != nil {
return err
}
/*colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
if err != nil {
return err
}*/
colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil {
return err
}
colGMA, err = arangoDB.Collection(arangoCTX, "gma")
if err != nil {
return err
}
/*colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map")
if err != nil {
return err
}*/
colGMA2File, err = arangoDB.Collection(arangoCTX, "gma_file_map")
if err != nil {
return err
}
colAliases, err = arangoDB.Collection(arangoCTX, "aliases")
if err != nil {
return err
}
return nil
}
var JobPoolSize int = 5
var ConcurrencyLimit int = 1
var WorkerJobPool chan string
var GlobalWriteLock sync.Mutex
var GlobalDeleteLock sync.Mutex
func main() {
workerModeP := flag.String("mode", "", "mode (ingress, rebuild)")
debugEnabled := flag.Bool("debug", false, "enables debug")
folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string")
skipNameP := flag.Int("skip", -1, "skip n addons")
workerNameP := flag.String("worker", "def", "worker name")
rebuildIDP := flag.String("id", "", "id to rebuild")
flag.Parse()
workerMode := *workerModeP
if *debugEnabled {
debug.SetMemoryLimit(6e9)
go func() {
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
}()
}
err := InitDatabase()
if err != nil {
panic(err)
}
switch workerMode {
case "ingress":
workerID = *workerNameP
modeIngress(*folderPathP, *skipNameP)
case "delete":
workerID = *workerNameP
modeDelete(*folderPathP)
case "rebuild":
flag.Parse()
err = modeRebuild(*rebuildIDP)
if err != nil {
panic(err)
}
case "test":
modeTest()
}
}
func modeTest() (err error) {
filePath := "/mnt/worksucc/san1/gma/2/5/0/0/2500735732.1623884796.gma"
gmaReader, err := gma.NewReader(filePath)
if err != nil {
return err
}
defer gmaReader.Close()
hash, err := gmaReader.GetSHA256()
if err != nil {
return err
}
fmt.Printf("GMA Hash: %s\n", hash)
gmaReader.FileHandle.Seek(0, 0)
header, err := gmaReader.ReadHeader()
if err != nil {
return err
}
log.Printf("Name=%s\n", header.Title)
log.Printf("Desc=%s\n", header.Description)
log.Printf("AddonVersion=%d\n", header.AddonVersion)
log.Printf("FormatVersion=%d\n", header.FormatVersion)
log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte)
firstType, files, err := gmaReader.ReadFiles()
if err != nil {
return err
}
fmt.Printf("firstType = %d\n", firstType)
for _, file := range files {
if file.FileSize < 0 { // Something is fucked
return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber)
}
//fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber)
if file.NextType > uint32(file.FileNumber+10) { // Something is fucked
/*log.Printf("Current Cursor %d", gmaReader.GetOffset())
for _, otherFile := range files[file.FileNumber:] {
log.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber)
}*/
return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber)
}
extractMeta, err := gmaReader.ExtractFileTo(file, io.Discard)
if err != nil {
return err
}
if extractMeta.ExtractedCRC != extractMeta.OriginalMeta.CRC {
fmt.Printf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName)
}
fmt.Println("#%d = %s -%d bytes [%s]\n", extractMeta.OriginalMeta.FileNumber, extractMeta.OriginalMeta.FileName, extractMeta.OriginalMeta.FileSize, extractMeta.ExtractedSHA256)
//fmt.Printf("Extra
}
return nil
}
func modeRebuild(id string) (err error) {
var (
dboGMA common.DB_GMA
dboGMA2Files []common.DB_GMA2File
)
_, err = colGMA.ReadDocument(arangoCTX, id, &dboGMA)
if err != nil {
return err
}
cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR gf IN gma_file_map FILTER gf._from == 'gma/%s' RETURN gf", dboGMA.ID), nil)
if err != nil {
return err
}
defer cursor.Close()
if cursor.Count() > 0 || cursor.HasMore() {
for {
gma2File := common.DB_GMA2File{}
_, err = cursor.ReadDocument(arangoCTX, &gma2File)
if driver.IsNoMoreDocuments(err) {
break
} else if err != nil {
return err
}
gma2File.UploadID = gma2File.File[5:]
dboGMA2Files = append(dboGMA2Files, gma2File)
}
} else {
return fmt.Errorf("no files for gma available")
}
{
log.Println("rewriting gma")
rewriteBar := progressbar.Default(int64(len(dboGMA2Files)), "Rewriting GMA")
destPath := fmt.Sprintf("./%s.gma", id) //filepath.Join(gmaTempPath, "rewrite.gma")
dir := filepath.Dir(destPath)
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return err
}
gmaWriter, err := gma.NewWriter(destPath)
if err != nil {
return err
}
defer gmaWriter.Close()
//fmt.Printf("Writing Header with FormatVersion: %d\n", dboGMA.Header.FormatVersion)
err = gmaWriter.WriteHeader(dboGMA.Header)
if err != nil {
return err
}
err = gmaWriter.WriteFirstType(dboGMA.FirstType)
if err != nil {
return err
}
sort.SliceStable(dboGMA2Files, func(i, j int) bool { return dboGMA2Files[i].FileNumber < dboGMA2Files[j].FileNumber })
for _, dboGMA2File := range dboGMA2Files {
//fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber)
err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType)
if err != nil {
return err
}
}
var httpClient *http.Client = &http.Client{
Timeout: 15 * time.Minute,
}
for _, dboGMA2File := range dboGMA2Files {
//fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID)
resp, err := httpClient.Get(fmt.Sprintf("http://192.168.133.118:13371/fetch/%s", dboGMA2File.UploadID))
if err != nil {
return err
}
defer resp.Body.Close()
err = gmaWriter.WriteFile(resp.Body)
if err != nil {
return err
}
rewriteBar.Add(1)
}
gmaWriter.FileHandle.Seek(0, 2)
log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC)
gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC)
// TODO: maybe use io.MultiWriter ??
gmaWriter.FileHandle.Seek(0, 0)
writeHash, err := gmaWriter.GetSHA256()
if err != nil {
return err
}
log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath)
log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath)
log.Println()
writeStat, err := os.Stat(destPath)
if err != nil {
return err
}
writeSize := writeStat.Size()
if writeSize != dboGMA.GMASize {
//fail
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
}
if writeHash != dboGMA.GMAHash {
//fail
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
}
}
return nil
}
func recursive(jobs []string, folderPath string) (WorkerJobPool []string) {
entries, err := os.ReadDir(folderPath)
if err != nil {
panic(err)
}
for _, e := range entries {
fullPath := filepath.Join(folderPath, e.Name())
if e.IsDir() {
jobs = recursive(jobs, fullPath)
}
fmt.Println(e.Name())
if strings.HasSuffix(e.Name(), ".gma") && !strings.Contains(e.Name(), ".lzma") {
if !e.IsDir() {
jobs = append(jobs, filepath.Join(folderPath, e.Name()))
}
}
}
return jobs
}
func modeDelete(folderPath string) {
// skipNameEnabled := skipN > 0
entries, err := os.ReadDir(folderPath)
if err != nil {
panic(err)
}
var WorkerJobPool []string
for _, e := range entries {
fullPath := filepath.Join(folderPath, e.Name())
if e.IsDir() {
WorkerJobPool = recursive(WorkerJobPool, fullPath)
}
fmt.Println(e.Name())
if strings.HasSuffix(e.Name(), ".gma") && !strings.Contains(e.Name(), ".lzma") {
if !e.IsDir() {
WorkerJobPool = append(WorkerJobPool, filepath.Join(folderPath, e.Name()))
}
}
}
wg := sync.WaitGroup{}
deleteSem := common.NewSemaphore(4)
pw := progress.NewWriter()
pw.SetAutoStop(true)
pw.SetTrackerLength(40)
pw.SetMessageWidth(40)
//pw.SetNumTrackersExpected(*flagNumTrackers)
pw.SetSortBy(progress.SortByPercentDsc)
pw.SetStyle(progress.StyleDefault)
pw.SetTrackerPosition(progress.PositionRight)
pw.SetUpdateFrequency(time.Millisecond * 100)
pw.Style().Colors = progress.StyleColorsExample
pw.Style().Options.PercentFormat = "%4.1f%%"
pw.Style().Options.ETAPrecision = time.Second
pw.Style().Options.TimeDonePrecision = time.Second
pw.Style().Options.TimeInProgressPrecision = time.Second
pw.Style().Options.TimeOverallPrecision = time.Second
pw.Style().Visibility.ETA = true
pw.Style().Visibility.ETAOverall = true
pw.Style().Visibility.Percentage = false
pw.Style().Visibility.Speed = true
pw.Style().Visibility.SpeedOverall = false
pw.Style().Visibility.Time = true
pw.Style().Visibility.TrackerOverall = false
pw.Style().Visibility.Value = true
pw.Style().Visibility.Pinned = true
// call Render() in async mode; yes we don't have any trackers at the moment
go pw.Render()
trackerDoneMarker := sync.Once{}
tracker := progress.Tracker{Message: fmt.Sprintf("Deleting successfull %d GMAs", len(WorkerJobPool)), Total: int64(len(WorkerJobPool)), Units: progress.UnitsDefault}
pw.AppendTracker(&tracker)
defer trackerDoneMarker.Do(tracker.MarkAsDone)
for _, jobFile := range WorkerJobPool {
wg.Add(1)
go func(jobFile string, wg *sync.WaitGroup) {
deleteSem.Acquire()
defer deleteSem.Release()
defer wg.Done()
defer tracker.Increment(1)
err = DeleteIfSafeGMA(pw, jobFile)
if err != nil {
pw.Log(fmt.Sprintf("\nERROR: %v\n", err))
}
}(jobFile, &wg)
}
// Wait for all jobs to finish
wg.Wait()
}
func modeIngress(folderPath string, skipN int) {
// skipNameEnabled := skipN > 0
entries, err := os.ReadDir(folderPath)
if err != nil {
panic(err)
}
var WorkerJobPool []string
for _, e := range entries {
fullPath := filepath.Join(folderPath, e.Name())
if e.IsDir() {
WorkerJobPool = recursive(WorkerJobPool, fullPath)
}
fmt.Println(e.Name())
if strings.HasSuffix(e.Name(), ".gma") && !strings.Contains(e.Name(), ".lzma") {
if !e.IsDir() {
WorkerJobPool = append(WorkerJobPool, filepath.Join(folderPath, e.Name()))
}
}
}
if skipN > 0 {
WorkerJobPool = WorkerJobPool[skipN:]
}
wg := sync.WaitGroup{}
pw := progress.NewWriter()
pw.SetAutoStop(true)
pw.SetTrackerLength(40)
pw.SetMessageWidth(40)
//pw.SetNumTrackersExpected(*flagNumTrackers)
pw.SetSortBy(progress.SortByPercentDsc)
pw.SetStyle(progress.StyleDefault)
pw.SetTrackerPosition(progress.PositionRight)
pw.SetUpdateFrequency(time.Millisecond * 100)
pw.Style().Colors = progress.StyleColorsExample
pw.Style().Options.PercentFormat = "%4.1f%%"
pw.Style().Options.ETAPrecision = time.Second
pw.Style().Options.TimeDonePrecision = time.Second
pw.Style().Options.TimeInProgressPrecision = time.Second
pw.Style().Options.TimeOverallPrecision = time.Second
pw.Style().Visibility.ETA = true
pw.Style().Visibility.ETAOverall = true
pw.Style().Visibility.Percentage = false
pw.Style().Visibility.Speed = true
pw.Style().Visibility.SpeedOverall = false
pw.Style().Visibility.Time = true
pw.Style().Visibility.TrackerOverall = false
pw.Style().Visibility.Value = true
pw.Style().Visibility.Pinned = true
// call Render() in async mode; yes we don't have any trackers at the moment
go pw.Render()
trackerDoneMarker := sync.Once{}
tracker := progress.Tracker{Message: fmt.Sprintf("Working %d GMAs", len(WorkerJobPool)), Total: int64(len(WorkerJobPool)), Units: progress.UnitsDefault}
pw.AppendTracker(&tracker)
defer trackerDoneMarker.Do(tracker.MarkAsDone)
for _, jobFile := range WorkerJobPool {
wg.Add(1)
go func(jobFile string, wg *sync.WaitGroup) {
defer tracker.Increment(1)
defer wg.Done()
err = ProcessGMA(pw, jobFile)
if err != nil {
pw.Log(fmt.Sprintf("\nERROR: %v\n", err))
if strings.Contains(err.Error(), "refused") {
panic(err)
}
}
err = DeleteIfSafeGMA(pw, jobFile)
if err != nil {
pw.Log(fmt.Sprintf("\nERROR: %v\n", err))
}
}(jobFile, &wg)
}
// Wait for all jobs to finish
wg.Wait()
}
func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) {
//log.Printf("undoBatch(%v, %s)\n", undoBatch, gmaID)
/*
_, err = colGMA.RemoveDocument(arangoCTX, gmaID)
if err != nil {
return err
}
*/
/*
_, _, err = colFile.RemoveDocuments(arangoCTX, fileIDs)
if err != nil {
return err
}
*/
/*
_, _, err = colGMA2File.RemoveDocuments(arangoCTX, gma2FileIDs)
if err != nil {
return err
}
*/
return nil
}
func DeleteIfSafeGMA(pw progress.Writer, filePath string) (err error) {
var unlockOnce sync.Once
GlobalDeleteLock.Lock() // Wait for worker to have slot open
defer unlockOnce.Do(GlobalDeleteLock.Unlock) // release anyway
dboGMA := common.DB_GMA{}
dboGMA.OriginalPath = filePath
cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR g IN gma FILTER g.originalPath == '%s' RETURN g", dboGMA.OriginalPath), nil)
if err != nil {
return fmt.Errorf("545: %v", err)
}
defer cursor.Close()
initialRetryCounter := 0
if cursor.Count() > 0 || cursor.HasMore() {
for {
gma := common.DB_GMA{}
_, err = cursor.ReadDocument(arangoCTX, &gma)
if driver.IsNoMoreDocuments(err) {
break
} else if err != nil {
return fmt.Errorf("557: %v", err)
}
if !gma.Success {
return fmt.Errorf("GMA with ID %s was not successfull ", gma.ID)
}
}
//} else {
// return fmt.Errorf("no gmas found with path = %s", dboGMA.OriginalPath)
}
fmt.Println(dboGMA)
fileStat, err := os.Stat(filePath)
if err != nil {
return err
}
dboGMA.StatModTime = fileStat.ModTime()
dboGMA.GMASize = fileStat.Size()
dboGMA.RetryCounter = initialRetryCounter + 1
if dboGMA.GMASize < 200 {
return fmt.Errorf("GMA File too small, skipping")
}
//niceName := filepath.Base(filePath)
//trackerProcessDoneMarker := sync.Once{}
//trackerProcess := progress.Tracker{Message: fmt.Sprintf("Deleting %s", niceName), Total: 0, Units: progress.UnitsDefault}
//defer trackerProcessDoneMarker.Do(trackerProcess.MarkAsDone)
//pw.AppendTracker(&trackerProcess)
gmaReader, err := gma.NewReader(filePath)
if err != nil {
return err
}
defer gmaReader.Close()
unlockOnce.Do(GlobalDeleteLock.Unlock) // release anyway
dboGMA.GMAHash, err = gmaReader.GetSHA256()
if err != nil {
return err
}
dboGMA.ID = dboGMA.GMAHash
gmaReader.FileHandle.Seek(0, 0)
dboIDExists, err := colGMA.DocumentExists(arangoCTX, dboGMA.ID)
if err != nil {
return err
}
dboGMA_sameHash := common.DB_GMA{}
if !dboIDExists {
return fmt.Errorf("GMA with ID %s does not exists", dboGMA.ID)
} else {
_, err := colGMA.ReadDocument(arangoCTX, dboGMA.ID, &dboGMA_sameHash)
if err != nil {
return fmt.Errorf("609-: %v", err)
}
if dboGMA_sameHash.OriginalPath != filePath {
pw.Log("originalpath differs for this hash")
// DB_GMA_Alias
dboGMA.OriginalPath = dboGMA_sameHash.OriginalPath
}
}
if dboGMA.OriginalPath == filePath {
{
recoveryPath := filepath.Join("/zpool0/cheetah/workshop/garrysmod/gma-inator/recovery-tree", fmt.Sprintf("%s.json", dboGMA.OriginalPath))
if checkFileExists(recoveryPath) {
err = os.Remove(dboGMA.OriginalPath)
if err != nil {
return err
}
pw.Log("deleted %s", dboGMA.OriginalPath)
} else {
pw.Log("%s recoveryPath does not exist %s", filePath, recoveryPath)
}
lzmaFile := fmt.Sprintf("%s.lzma", dboGMA.OriginalPath)
if checkFileExists(lzmaFile) {
err = os.Remove(lzmaFile)
if err != nil {
return err
}
pw.Log("deleted %s", lzmaFile)
}
}
{
_, err = colGMA.UpdateDocument(arangoCTX, dboGMA.ID, common.DB_GMADeletionData{
Deleted: true,
})
if err != nil {
return err
}
}
} else {
originalDeletionData := common.DB_GMADeletionData{}
_, err := colGMA.ReadDocument(arangoCTX, dboGMA.ID, &originalDeletionData)
if err != nil {
return fmt.Errorf("650-: %v", err)
}
if originalDeletionData.Deleted {
pw.Log(filepath.Base(filePath))
pw.Log(filepath.Base(dboGMA_sameHash.OriginalPath))
pw.Log("")
aliasData := common.DB_GMA_Alias{
Path: filePath,
Hash: dboGMA.GMAHash,
Deleted: true,
MigrationID: common.CreateMigrationID(filePath),
}
err = os.Remove(aliasData.Path)
if err != nil {
return err
}
pw.Log("deleted %s", aliasData.Path)
lzmaFile := fmt.Sprintf("%s.lzma", aliasData.Path)
if checkFileExists(lzmaFile) {
err = os.Remove(lzmaFile)
if err != nil {
return err
}
pw.Log("deleted %s", lzmaFile)
}
_, err = colAliases.CreateDocument(arangoCTX, aliasData)
if err != nil {
return err
}
} else {
return fmt.Errorf("originals have not been deleted")
}
}
return nil
}
func checkFileExists(filePath string) bool {
_, error := os.Stat(filePath)
//return !os.IsNotExist(err)
return !errors.Is(error, os.ErrNotExist)
}
func ProcessGMA(pw progress.Writer, filePath string) (err error) {
var unlockOnce sync.Once
//fmt.Println("trying to acquire global write lock")
GlobalWriteLock.Lock() // Wait for worker to have slot open
//fmt.Println("aquired global write lock")
defer unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway
//defer fmt.Println("unlocking GlobalWriteLock")
//time.Sleep(5 * time.Second)
var (
fileIDs []string
gma2FileIDs []string
)
dboGMA := common.DB_GMA{}
dboGMA.BatchID = uuid.NewV4().String() // use this for rapid unscheduled dissassembly
dboGMA.OriginalPath = filePath
dboGMA.ProcessingStart = time.Now()
cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR g IN gma FILTER g.originalPath == '%s' RETURN g", dboGMA.OriginalPath), nil)
if err != nil {
return err
}
defer cursor.Close()
initialRetryCounter := 0
skipHashCheck := false
if cursor.Count() > 0 || cursor.HasMore() {
for {
gma := common.DB_GMA{}
_, err = cursor.ReadDocument(arangoCTX, &gma)
if driver.IsNoMoreDocuments(err) {
break
} else if err != nil {
return err
}
if gma.Success {
return fmt.Errorf("GMA with ID %s was successfull at %v", gma.ID, gma.ProcessingEnd)
} else {
if gma.RetryCounter > 3 {
return fmt.Errorf("GMA with ID %s was too many times unsuccessfull", gma.ID)
}
skipHashCheck = true
initialRetryCounter = gma.RetryCounter + 1
_, err = colGMA.UpdateDocument(arangoCTX, gma.ID, &gma)
if err != nil {
return err
}
}
}
}
fileStat, err := os.Stat(filePath)
if err != nil {
return err
}
dboGMA.StatModTime = fileStat.ModTime()
dboGMA.GMASize = fileStat.Size()
dboGMA.RetryCounter = initialRetryCounter + 1
if dboGMA.GMASize < 200 {
return fmt.Errorf("GMA File too small, skipping")
}
niceName := filepath.Base(filePath)
trackerProcessDoneMarker := sync.Once{}
trackerProcess := progress.Tracker{Message: fmt.Sprintf("Extracting %s", niceName), Total: 0, Units: progress.UnitsDefault}
defer trackerProcessDoneMarker.Do(trackerProcess.MarkAsDone)
pw.AppendTracker(&trackerProcess)
//log.Printf("Opening %s\n", filePath)
gmaReader, err := gma.NewReader(filePath)
if err != nil {
return err
}
defer gmaReader.Close()
dboGMA.GMAHash, err = gmaReader.GetSHA256()
if err != nil {
return err
}
dboGMA.ID = dboGMA.GMAHash
gmaReader.FileHandle.Seek(0, 0)
gmaTempPath := filepath.Join("/mnt/ramfs/gma-extr-temp", dboGMA.ID)
defer os.RemoveAll(gmaTempPath) // clean up under any circumstances
dboIDExists, err := colGMA.DocumentExists(arangoCTX, dboGMA.ID)
if err != nil {
return err
}
if dboIDExists && !skipHashCheck {
return fmt.Errorf("GMA with ID %s exists", dboGMA.ID)
}
if dboIDExists && skipHashCheck {
_, err = colGMA.RemoveDocument(arangoCTX, dboGMA.ID)
if err != nil {
return err
}
}
header, err := gmaReader.ReadHeader()
if err != nil {
return err
}
dboGMA.Header = header
//log.Printf("Name=%s\n", header.Title)
//log.Printf("Desc=%s\n", header.Description)
// log.Printf("AddonVersion=%d\n", header.AddonVersion)
// log.Printf("FormatVersion=%d\n", header.FormatVersion)
// log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte)
// //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset())
firstType, files, err := gmaReader.ReadFiles()
if err != nil {
return err
}
trackerProcess.UpdateTotal(int64(len(files)))
dboGMA.FirstType = firstType
//fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset())
var (
dboGMA2Files []common.DB_GMA2File
dboFiles []common.DB_File
)
// Convert GMA Files into DB Metadata
for _, file := range files {
if file.FileSize < 0 { // Something is fucked
return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber)
}
//fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber)
if file.NextType > uint32(file.FileNumber+10) { // Something is fucked
/*log.Printf("Current Cursor %d", gmaReader.GetOffset())
for _, otherFile := range files[file.FileNumber:] {
log.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber)
}*/
return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber)
}
destPath := filepath.Join(gmaTempPath, "contents", file.FileName)
dir := filepath.Dir(destPath)
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return err
}
destFile, err := os.Create(destPath)
if err != nil {
return err
}
defer destFile.Close()
extractMeta, err := gmaReader.ExtractFileTo(file, destFile)
if err != nil {
return err
}
if extractMeta.ExtractedCRC != extractMeta.OriginalMeta.CRC {
pw.Log(fmt.Sprintf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName))
}
//fmt.Printf("ExtractedMeta %s CRC: %d SHA256: %s\n", file.FileName, extractMeta.ExtractedCRC, extractMeta.ExtractedSHA256)
dboFile := common.DB_File{
ID: extractMeta.ExtractedSHA256, // ID is the SHA256, i guess that is good enough?
BatchID: dboGMA.BatchID,
InitialPath: file.FileName,
CRC: file.CRC,
Size: file.FileSize,
Hash: extractMeta.ExtractedSHA256,
Extension: filepath.Ext(file.FileName),
G2FRef: fmt.Sprintf("%s_%s_%d", dboGMA.ID, extractMeta.ExtractedSHA256, extractMeta.OriginalMeta.FileNumber), // reference for the GMA2File Thing so that we can find it agian in the list
}
dboGMA2File := common.DB_GMA2File{
ID: dboFile.G2FRef,
BatchID: dboGMA.BatchID,
File: fmt.Sprintf("file/%s", extractMeta.ExtractedSHA256),
GMA: fmt.Sprintf("gma/%s", dboGMA.ID),
FileNumber: extractMeta.OriginalMeta.FileNumber,
FileName: extractMeta.OriginalMeta.FileName,
Offset: extractMeta.OriginalMeta.Offset,
FileSize: extractMeta.OriginalMeta.FileSize,
CRCMatch: extractMeta.ExtractedCRC == extractMeta.OriginalMeta.CRC,
CRC: extractMeta.OriginalMeta.CRC,
NextType: extractMeta.OriginalMeta.NextType,
LocalFileName: destPath,
UploadID: extractMeta.ExtractedSHA256,
}
//fmt.Println(dboGMA2File)
// Add fileIDs from new unknowns
dboFiles = append(dboFiles, dboFile)
//fmt.Println(dboGMA2File)
gma2FileIDs = append(gma2FileIDs, dboGMA2File.ID)
dboGMA2Files = append(dboGMA2Files, dboGMA2File)
trackerProcess.Increment(1)
}
if len(files) < 1 {
return fmt.Errorf("GMA %s empty, len=%d", dboGMA.OriginalPath, len(files))
}
lastFile := files[len(files)-1]
dboGMA.FooterAddonCRC, err = gmaReader.ReadAddonCRC(lastFile.Offset + lastFile.FileSize)
if err != nil {
return err
}
dboGMA.ProcessingEnd = time.Now()
dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds()
// TODO: Calculate dboGMA.OptimizedSize
dboGMA.OptimizedSize = 0
_, err = colGMA.CreateDocument(arangoCTX, dboGMA)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
// TODO: Check all dboFiles and dboGMA2Files if they exist, if something is odd, queue reupload
dboExistFile := map[string]bool{}
dboExistFile2GMA := map[string]bool{}
for _, dboFile := range dboFiles {
exists, err := colFile.DocumentExists(arangoCTX, dboFile.ID)
if err != nil {
return err
}
dboExistFile[dboFile.ID] = exists
}
for _, dboGMA2File := range dboGMA2Files {
exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID)
if err != nil {
return err
}
dboExistFile2GMA[dboGMA2File.ID] = exists
}
trackerProcessDoneMarker.Do(trackerProcess.MarkAsDone)
// TODO: upload all unknownNewFiles to StorageServer
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200
var httpClient *http.Client = http.DefaultClient
trackerUploadDoneMarker := sync.Once{}
trackerUpload := progress.Tracker{Message: fmt.Sprintf("Uploading %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault}
pw.AppendTracker(&trackerUpload)
defer trackerUploadDoneMarker.Do(trackerUpload.MarkAsDone)
for _, dboFile := range dboFiles {
dboFileID := fmt.Sprintf("file/%s", dboFile.ID)
//fmt.Printf("Line 460: %s checking if we need to store this on the server", dboFileID)
//dboFile2ChunkID := fmt.Sprintf("file_chunk_map/%s", dboFile.ID)
// TODO: Check against Storage backend
res, err := http.Get(fmt.Sprintf("http://192.168.133.118:13371/check/%s", dboFile.ID))
if err != nil {
return err
}
defer res.Body.Close()
if _, err = io.Copy(io.Discard, res.Body); err != nil {
return err
}
//body, _ := ioutil.ReadAll(res.Body)
//fmt.Printf("res.StatusCode = %d\n", res.StatusCode)
if res.StatusCode == http.StatusAlreadyReported {
trackerUpload.UpdateMessage(fmt.Sprintf("Skipping %s", niceName))
for _, dboGMA2File := range dboGMA2Files {
if dboFile.G2FRef == dboGMA2File.ID {
// Create File and dboGMA2File Object
exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID)
if err != nil {
log.Println("err @colGMA2File.DocumentExists")
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
if !exists {
_, err = colGMA2File.CreateDocument(arangoCTX, dboGMA2File)
if err != nil {
log.Println("err @colGMA2File.CreateDocument")
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
//} else {
//log.Println("already exists... weird")
}
break
}
}
trackerUpload.Increment(1)
continue
}
for _, dboGMA2File := range dboGMA2Files {
if dboFileID == dboGMA2File.File { // find corresponding dboGMA2File
//log.Println("Found dboFileID == dboGMA2File.Ref ID")
uploadSuccess := true
for {
fileInfoJSON, err := json.Marshal(dboFile)
if err != nil {
log.Println("err @json.Marshal dboFile")
return err
}
//uploadBar.Describe("Uploading")
trackerUpload.UpdateMessage(fmt.Sprintf("Uploading %s", niceName))
err = common.MultipartUpload(httpClient, fmt.Sprintf("http://192.168.133.118:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON, workerID)
if err != nil {
log.Println("err @common.MultipartUpload")
log.Println(err)
if strings.Contains(err.Error(), "cannot assign requested address") {
uploadSuccess = false
} else if strings.Contains(err.Error(), "refused") {
panic(err)
} else {
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
}
if uploadSuccess {
// Create File and dboGMA2File Object
exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID)
if err != nil {
log.Println("err @colGMA2File.DocumentExists")
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
if !exists {
_, err = colGMA2File.CreateDocument(arangoCTX, dboGMA2File)
if err != nil {
log.Println("err @colGMA2File.CreateDocument")
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
//} else {
//log.Println("already exists... weird")
}
trackerUpload.Increment(1)
break
}
time.Sleep(10 * time.Second)
}
if uploadSuccess {
break
}
}
}
}
trackerUploadDoneMarker.Do(trackerUpload.MarkAsDone)
// at this point we can release the write semaphore
unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway
//fmt.Println("unlocking GlobalWriteLock")
// TODO : fetch all files from storageServer
// TODO : write new gma from arangoinfo
// TODO : compare hashes
{
var (
rw_dboGMA2Files []common.DB_GMA2File
)
cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR gf IN gma_file_map FILTER gf._from == 'gma/%s' RETURN gf", dboGMA.ID), nil)
if err != nil {
return err
}
defer cursor.Close()
if cursor.Count() > 0 || cursor.HasMore() {
for {
gma2File := common.DB_GMA2File{}
_, err = cursor.ReadDocument(arangoCTX, &gma2File)
if driver.IsNoMoreDocuments(err) {
break
} else if err != nil {
return err
}
gma2File.UploadID = gma2File.File[5:]
rw_dboGMA2Files = append(rw_dboGMA2Files, gma2File)
}
} else {
return fmt.Errorf("no files for gma available")
}
trackerRewriteDoneMarker := sync.Once{}
trackerRewrite := progress.Tracker{Message: fmt.Sprintf("Rewriting %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault}
pw.AppendTracker(&trackerRewrite)
defer trackerRewriteDoneMarker.Do(trackerRewrite.MarkAsDone)
destPath := filepath.Join(gmaTempPath, "rewrite.gma")
dir := filepath.Dir(destPath)
err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
gmaWriter, err := gma.NewWriter(destPath)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
defer gmaWriter.Close()
//fmt.Printf("Writing Header with FormatVersion: %d\n", dboGMA.Header.FormatVersion)
err = gmaWriter.WriteHeader(dboGMA.Header)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
err = gmaWriter.WriteFirstType(dboGMA.FirstType)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
sort.SliceStable(rw_dboGMA2Files, func(i, j int) bool { return rw_dboGMA2Files[i].FileNumber < rw_dboGMA2Files[j].FileNumber })
for _, dboGMA2File := range rw_dboGMA2Files {
//fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber)
err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
}
var httpClient *http.Client = &http.Client{
Timeout: 15 * time.Minute,
}
for _, dboGMA2File := range rw_dboGMA2Files {
//fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID)
resp, err := httpClient.Get(fmt.Sprintf("http://192.168.133.118:13371/fetch/%s", dboGMA2File.UploadID))
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
defer resp.Body.Close()
err = gmaWriter.WriteFile(resp.Body)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
trackerRewrite.Increment(1)
}
gmaWriter.FileHandle.Seek(0, 2)
//log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC)
gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC)
// TODO: maybe use io.MultiWriter ??
gmaWriter.FileHandle.Seek(0, 0)
writeHash, err := gmaWriter.GetSHA256()
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
//log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath)
//log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath)
//log.Println()
writeStat, err := os.Stat(destPath)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
writeSize := writeStat.Size()
if writeSize != dboGMA.GMASize {
//fail
//createDebugInformation
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerRewrite.MarkAsErrored()
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
}
if writeHash != dboGMA.GMAHash {
//fail
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerRewrite.MarkAsErrored()
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
}
trackerRewriteDoneMarker.Do(trackerRewrite.MarkAsDone)
recoveryData := common.JSON_GMARecovery{
GMA: dboGMA,
Refs: rw_dboGMA2Files,
}
recoveryBytes, err := json.MarshalIndent(recoveryData, "", "\t")
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
recoveryPath := filepath.Join("/zpool0/cheetah/workshop/garrysmod/gma-inator/recovery-tree", fmt.Sprintf("%s.json", dboGMA.OriginalPath))
err = os.MkdirAll(filepath.Dir(recoveryPath), os.ModePerm)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
recoveryFile, err := os.Create(recoveryPath)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
_, err = recoveryFile.Write(recoveryBytes)
if err != nil {
return fmt.Errorf("error @recoveryFile.Write %v", err)
}
}
// TODO: 4... profit?
dboGMA.ProcessingEnd = time.Now()
dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds()
dboGMA.Success = true
dboGMA.MigrationID = common.CreateMigrationID(dboGMA.OriginalPath)
_, err = colGMA.UpdateDocument(arangoCTX, dboGMA.ID, dboGMA)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
return nil
}