package main
import (
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strings"
"sync"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/common"
"git.cheetah.cat/worksucc/gma-puzzles/gma"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/schollz/progressbar/v3"
"github.com/twinj/uuid"
_ "net/http/pprof"
)
var (
arangoDB adriver . Database
arangoCTX context . Context
//colChunk adriver.Collection
colFile adriver . Collection
//colFile2Chunk adriver.Collection
colGMA adriver . Collection
colGMA2File adriver . Collection
)
func ConnectDB ( baseURL string , arangoUser string , arangoPWD string , arangoDatabase string ) ( driver adriver . Database , ctx context . Context , err error ) {
log . Println ( "connectDB:" , "Starting Connection Process..." )
// Retry Loop for Failed Connections
for i := 0 ; i < 6 ; i ++ {
if i == 5 {
return driver , ctx , fmt . Errorf ( "connectdb unable to connect to database %d times" , i )
} else if i > 0 {
time . Sleep ( 30 * time . Second )
}
// Connect to ArangoDB URL
conn , err := ahttp . NewConnection ( ahttp . ConnectionConfig {
Endpoints : [ ] string { baseURL } ,
TLSConfig : & tls . Config { /*...*/ } ,
} )
if err != nil {
log . Println ( "connectDB:" , "Cannot Connect to ArangoDB!" , err )
continue
}
// Connect Driver to User
client , err := adriver . NewClient ( adriver . ClientConfig {
Connection : conn ,
Authentication : adriver . BasicAuthentication ( arangoUser , arangoPWD ) ,
} )
if err != nil {
log . Println ( "connectDB:" , "Cannot Authenticate ArangoDB User!" , err )
continue
}
// Create Context for Database Access
ctx = context . Background ( )
driver , err = client . Database ( ctx , arangoDatabase )
if err != nil {
log . Println ( "connectDB:" , "Cannot Load ArangoDB Database!" , err )
continue
}
log . Println ( "connectDB:" , "Connection Sucessful!" )
return driver , ctx , nil
}
return driver , ctx , fmt . Errorf ( "connectDB: FUCK HOW DID THIS EXCUTE?" )
}
func InitDatabase ( ) ( err error ) {
arangoDB , arangoCTX , err = ConnectDB ( "http://192.168.45.8:8529/" , "gma-inator" , "gma-inator" , "gma-inator" )
if err != nil {
return err
}
/ * colChunk , err = arangoDB . Collection ( arangoCTX , "chunk" )
if err != nil {
return err
} * /
colFile , err = arangoDB . Collection ( arangoCTX , "file" )
if err != nil {
return err
}
colGMA , err = arangoDB . Collection ( arangoCTX , "gma" )
if err != nil {
return err
}
/ * colFile2Chunk , err = arangoDB . Collection ( arangoCTX , "file_chunk_map" )
if err != nil {
return err
} * /
colGMA2File , err = arangoDB . Collection ( arangoCTX , "gma_file_map" )
if err != nil {
return err
}
return nil
}
var JobPoolSize int = 5
var ConcurrencyLimit int = 1
var WorkerJobPool chan string
var GlobalWriteSemaphore common . Semaphore
var GlobalWriteLock sync . Mutex
func main ( ) {
folderPathP := flag . String ( "path" , "/mnt/SC9000/TemporaryTestingShit2" , "a string" )
skipNameP := flag . String ( "skip" , "" , "skip until this name" )
debug . SetMemoryLimit ( 6e9 )
flag . Parse ( )
folderPath := * folderPathP
skipName := * skipNameP
skipNameEnabled := len ( skipName ) > 0
go func ( ) {
log . Println ( http . ListenAndServe ( "0.0.0.0:6060" , nil ) )
} ( )
err := InitDatabase ( )
if err != nil {
panic ( err )
}
/ *
err = colGMA . Truncate ( arangoCTX )
if err != nil {
panic ( err )
}
err = colFile . Truncate ( arangoCTX )
if err != nil {
panic ( err )
}
err = colGMA2File . Truncate ( arangoCTX )
if err != nil {
panic ( err )
} * /
// /mnt/worksucc/san2/gma/2/1/2/3/2123406190.1591573904.gma
//fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin")
//gma, err := gma.NewReader("2143898000.1593250551.bin.gma")
//folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
//folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
//
entries , err := os . ReadDir ( folderPath )
if err != nil {
panic ( err )
}
var WorkerJobPool [ ] string
for _ , e := range entries {
if ! e . IsDir ( ) && skipNameEnabled {
if e . Name ( ) == skipName {
skipNameEnabled = false
} else {
continue
}
}
if ! e . IsDir ( ) {
WorkerJobPool = append ( WorkerJobPool , filepath . Join ( folderPath , e . Name ( ) ) )
}
}
GlobalWriteSemaphore = common . NewSemaphore ( ConcurrencyLimit )
wg := sync . WaitGroup { }
for _ , jobFile := range WorkerJobPool {
wg . Add ( 1 )
go func ( jobFile string , wg * sync . WaitGroup ) {
err = ProcessGMA ( jobFile )
if err != nil {
log . Printf ( "\nERROR: %v\n" , err )
}
wg . Done ( )
} ( jobFile , & wg )
}
// Wait for all jobs to finish
wg . Wait ( )
}
func undoBatch ( undoBatch bool , gmaID string , fileIDs [ ] string , gma2FileIDs [ ] string ) ( err error ) {
log . Printf ( "undoBatch(%x, %s)\n" , undoBatch , gmaID )
/ *
_ , err = colGMA . RemoveDocument ( arangoCTX , gmaID )
if err != nil {
return err
}
* /
/ *
_ , _ , err = colFile . RemoveDocuments ( arangoCTX , fileIDs )
if err != nil {
return err
}
* /
/ *
_ , _ , err = colGMA2File . RemoveDocuments ( arangoCTX , gma2FileIDs )
if err != nil {
return err
}
* /
return nil
}
func ProcessGMA ( filePath string ) ( err error ) {
var unlockOnce sync . Once
fmt . Println ( "trying to acquire global write lock" )
GlobalWriteLock . Lock ( ) // Wait for worker to have slot open
fmt . Println ( "aquired global write lock" )
defer unlockOnce . Do ( GlobalWriteLock . Unlock ) // release anyway
defer fmt . Println ( "unlocking GlobalWriteLock" )
time . Sleep ( 5 * time . Second )
var (
fileIDs [ ] string
gma2FileIDs [ ] string
)
dboGMA := common . DB_GMA { }
dboGMA . BatchID = uuid . NewV4 ( ) . String ( ) // use this for rapid unscheduled dissassembly
dboGMA . OriginalPath = filePath
dboGMA . ProcessingStart = time . Now ( )
fileStat , err := os . Stat ( filePath )
if err != nil {
return err
}
dboGMA . StatModTime = fileStat . ModTime ( )
dboGMA . GMASize = fileStat . Size ( )
if dboGMA . GMASize < 200 {
return fmt . Errorf ( "GMA File too small, skipping" )
}
log . Printf ( "Opening %s\n" , filePath )
gmaReader , err := gma . NewReader ( filePath )
if err != nil {
return err
}
defer gmaReader . Close ( )
dboGMA . GMAHash , err = gmaReader . GetSHA256 ( )
if err != nil {
return err
}
dboGMA . ID = dboGMA . GMAHash
gmaReader . FileHandle . Seek ( 0 , 0 )
gmaTempPath := filepath . Join ( "/mnt/ramfs/gma-extr-temp" , dboGMA . ID )
defer os . RemoveAll ( gmaTempPath ) // clean up under any circumstances
dboIDExists , err := colGMA . DocumentExists ( arangoCTX , dboGMA . ID )
if err != nil {
return err
}
if dboIDExists {
return fmt . Errorf ( "GMA with ID %s exists" , dboGMA . ID )
}
header , err := gmaReader . ReadHeader ( )
if err != nil {
return err
}
dboGMA . Header = header
log . Printf ( "Name=%s\n" , header . Title )
log . Printf ( "Desc=%s\n" , header . Description )
// log.Printf("AddonVersion=%d\n", header.AddonVersion)
// log.Printf("FormatVersion=%d\n", header.FormatVersion)
// log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte)
// //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset())
firstType , files , err := gmaReader . ReadFiles ( )
if err != nil {
return err
}
dboGMA . FirstType = firstType
//fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset())
var (
dboGMA2Files [ ] common . DB_GMA2File
dboFiles [ ] common . DB_File
)
// Convert GMA Files into DB Metadata
for _ , file := range files {
if file . FileSize < 0 { // Something is fucked
return fmt . Errorf ( "GMA Header corrupted, NextType %d, FileNumber %d" , file . NextType , file . FileNumber )
}
//fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber)
if file . NextType > uint32 ( file . FileNumber + 10 ) { // Something is fucked
/ * log . Printf ( "Current Cursor %d" , gmaReader . GetOffset ( ) )
for _ , otherFile := range files [ file . FileNumber : ] {
log . Printf ( "OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n" , otherFile . FileName , otherFile . CRC , otherFile . Offset , otherFile . FileSize , otherFile . NextType , otherFile . FileNumber )
} * /
return fmt . Errorf ( "GMA Header corrupted, NextType %d, FileNumber %d" , file . NextType , file . FileNumber )
}
destPath := filepath . Join ( gmaTempPath , "contents" , file . FileName )
dir := filepath . Dir ( destPath )
err := os . MkdirAll ( dir , os . ModePerm )
if err != nil {
return err
}
destFile , err := os . Create ( destPath )
if err != nil {
return err
}
defer destFile . Close ( )
extractMeta , err := gmaReader . ExtractFileTo ( file , destFile )
if err != nil {
return err
}
if extractMeta . ExtractedCRC != extractMeta . OriginalMeta . CRC {
log . Printf ( "gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n" , filePath , extractMeta . OriginalMeta . CRC , extractMeta . ExtractedCRC , extractMeta . OriginalMeta . FileName )
}
//fmt.Printf("ExtractedMeta %s CRC: %d SHA256: %s\n", file.FileName, extractMeta.ExtractedCRC, extractMeta.ExtractedSHA256)
dboFile := common . DB_File {
ID : extractMeta . ExtractedSHA256 , // ID is the SHA256, i guess that is good enough?
BatchID : dboGMA . BatchID ,
InitialPath : file . FileName ,
CRC : file . CRC ,
Size : file . FileSize ,
Hash : extractMeta . ExtractedSHA256 ,
Extension : filepath . Ext ( file . FileName ) ,
}
dboGMA2File := common . DB_GMA2File {
ID : fmt . Sprintf ( "%s_%s" , dboGMA . ID , extractMeta . ExtractedSHA256 ) ,
BatchID : dboGMA . BatchID ,
File : fmt . Sprintf ( "file/%s" , extractMeta . ExtractedSHA256 ) ,
GMA : fmt . Sprintf ( "gma/%s" , dboGMA . ID ) ,
FileNumber : extractMeta . OriginalMeta . FileNumber ,
FileName : extractMeta . OriginalMeta . FileName ,
Offset : extractMeta . OriginalMeta . Offset ,
FileSize : extractMeta . OriginalMeta . FileSize ,
CRCMatch : extractMeta . ExtractedCRC == extractMeta . OriginalMeta . CRC ,
CRC : extractMeta . OriginalMeta . CRC ,
NextType : extractMeta . OriginalMeta . NextType ,
LocalFileName : destPath ,
UploadID : extractMeta . ExtractedSHA256 ,
}
//fmt.Println(dboFile)
// Add fileIDs from new unknowns
dboFiles = append ( dboFiles , dboFile )
//fmt.Println(dboGMA2File)
gma2FileIDs = append ( gma2FileIDs , dboGMA2File . ID )
dboGMA2Files = append ( dboGMA2Files , dboGMA2File )
}
lastFile := files [ len ( files ) - 1 ]
dboGMA . FooterAddonCRC , err = gmaReader . ReadAddonCRC ( lastFile . Offset + lastFile . FileSize )
if err != nil {
return err
}
dboGMA . ProcessingEnd = time . Now ( )
dboGMA . ProcessingDuration = dboGMA . ProcessingEnd . Sub ( dboGMA . ProcessingStart ) . Milliseconds ( )
// TODO: Calculate dboGMA.OptimizedSize
dboGMA . OptimizedSize = 0
_ , err = colGMA . CreateDocument ( arangoCTX , dboGMA )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
// TODO: Check all dboFiles and dboGMA2Files if they exist, if something is odd, queue reupload
dboExistFile := map [ string ] bool { }
dboExistFile2GMA := map [ string ] bool { }
for _ , dboFile := range dboFiles {
exists , err := colFile . DocumentExists ( arangoCTX , dboFile . ID )
if err != nil {
return err
}
dboExistFile [ dboFile . ID ] = exists
}
for _ , dboGMA2File := range dboGMA2Files {
exists , err := colGMA2File . DocumentExists ( arangoCTX , dboGMA2File . ID )
if err != nil {
return err
}
dboExistFile2GMA [ dboGMA2File . ID ] = exists
}
// TODO: upload all unknownNewFiles to StorageServer
http . DefaultTransport . ( * http . Transport ) . MaxIdleConnsPerHost = 200
var httpClient * http . Client = http . DefaultClient
uploadBar := progressbar . Default ( int64 ( len ( dboFiles ) ) , "Uploading to StorageServer" )
for _ , dboFile := range dboFiles {
dboFileID := fmt . Sprintf ( "file/%s" , dboFile . ID )
//fmt.Printf("Line 460: %s checking if we need to store this on the server", dboFileID)
//dboFile2ChunkID := fmt.Sprintf("file_chunk_map/%s", dboFile.ID)
// TODO: Check against Storage backend
res , err := http . Get ( fmt . Sprintf ( "http://127.0.0.1:13371/check/%s" , dboFile . ID ) )
if err != nil {
return err
}
defer res . Body . Close ( )
if _ , err = io . Copy ( io . Discard , res . Body ) ; err != nil {
return err
}
//body, _ := ioutil.ReadAll(res.Body)
//fmt.Printf("res.StatusCode = %d\n", res.StatusCode)
if res . StatusCode == http . StatusAlreadyReported {
uploadBar . Describe ( "Skipping" )
uploadBar . Add ( 1 )
continue
}
for _ , dboGMA2File := range dboGMA2Files {
if dboFileID == dboGMA2File . File { // find corresponding dboGMA2File
//log.Println("Found dboFileID == dboGMA2File.Ref ID")
uploadSuccess := true
for {
//log.Printf("Uploading %s to Storage\n", dboGMA2File.UploadID)
// TODO: move file management to storageserver
/ * existsFile , err := colFile . DocumentExists ( arangoCTX , dboFile . ID )
if err != nil {
log . Println ( "err @colFile.DocumentExist" )
return err
}
if ! existsFile {
_ , err := colFile . CreateDocument ( arangoCTX , dboFile )
if err != nil {
// TODO: error handling
log . Println ( "err @colFile.CreateDocument" )
return err
}
} * /
fileInfoJSON , err := json . Marshal ( dboFile )
if err != nil {
log . Println ( "err @json.Marshal dboFile" )
return err
}
uploadBar . Describe ( "Uploading" )
err = common . MultipartUpload ( httpClient , fmt . Sprintf ( "http://127.0.0.1:13371/stash/%s/%d" , dboGMA2File . UploadID , dboGMA2File . FileSize ) , dboGMA2File . LocalFileName , fileInfoJSON )
if err != nil {
log . Println ( "err @common.MultipartUpload" )
log . Println ( err )
if strings . Contains ( err . Error ( ) , "cannot assign requested address" ) {
uploadSuccess = false
} else {
log . Println ( "oopsie" )
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
}
if uploadSuccess {
// Create File and dboGMA2File Object
exists , err := colGMA2File . DocumentExists ( arangoCTX , dboGMA2File . ID )
if err != nil {
log . Println ( "err @colGMA2File.DocumentExists" )
log . Println ( "oopsie" )
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
if ! exists {
_ , err = colGMA2File . CreateDocument ( arangoCTX , dboGMA2File )
if err != nil {
log . Println ( "err @colGMA2File.CreateDocument" )
log . Println ( "oopsie" )
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
}
uploadBar . Add ( 1 )
break
}
time . Sleep ( 10 * time . Second )
}
if uploadSuccess {
break
}
}
}
}
// at this point we can release the write semaphore
unlockOnce . Do ( GlobalWriteLock . Unlock ) // release anyway
fmt . Println ( "unlocking GlobalWriteLock" )
// TODO : fetch all files from storageServer
// TODO : write new gma from arangoinfo
// TODO : compare hashes
{
log . Println ( "rewriting gma" )
rewriteBar := progressbar . Default ( int64 ( len ( dboGMA2Files ) ) , "Rewriting GMA" )
destPath := filepath . Join ( gmaTempPath , "rewrite.gma" )
dir := filepath . Dir ( destPath )
err := os . MkdirAll ( dir , os . ModePerm )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
gmaWriter , err := gma . NewWriter ( destPath )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
defer gmaWriter . Close ( )
//fmt.Printf("Writing Header with FormatVersion: %d\n", dboGMA.Header.FormatVersion)
err = gmaWriter . WriteHeader ( dboGMA . Header )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
err = gmaWriter . WriteFirstType ( dboGMA . FirstType )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
sort . SliceStable ( dboGMA2Files , func ( i , j int ) bool { return dboGMA2Files [ i ] . FileNumber < dboGMA2Files [ j ] . FileNumber } )
for _ , dboGMA2File := range dboGMA2Files {
//fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber)
err = gmaWriter . WriteFileIndex ( dboGMA2File . FileName , dboGMA2File . FileSize , dboGMA2File . CRC , dboGMA2File . NextType )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
}
var httpClient * http . Client = & http . Client {
Timeout : 15 * time . Minute ,
}
for _ , dboGMA2File := range dboGMA2Files {
//fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID)
resp , err := httpClient . Get ( fmt . Sprintf ( "http://127.0.0.1:13371/fetch/%s" , dboGMA2File . UploadID ) )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
defer resp . Body . Close ( )
err = gmaWriter . WriteFile ( resp . Body )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
rewriteBar . Add ( 1 )
}
gmaWriter . FileHandle . Seek ( 0 , 2 )
log . Printf ( "Writing Footer CRC %d\n\n" , dboGMA . FooterAddonCRC )
gmaWriter . WriteFooterCRC ( dboGMA . FooterAddonCRC )
// TODO: maybe use io.MultiWriter ??
gmaWriter . FileHandle . Seek ( 0 , 0 )
writeHash , err := gmaWriter . GetSHA256 ( )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
log . Printf ( "Rewrite Hash is %s %s\n" , writeHash , destPath )
log . Printf ( "Original Hash is %s %s\n" , dboGMA . GMAHash , dboGMA . OriginalPath )
log . Println ( )
writeStat , err := os . Stat ( destPath )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
writeSize := writeStat . Size ( )
if writeSize != dboGMA . GMASize {
//fail
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return fmt . Errorf ( "RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)" , dboGMA . GMAHash , dboGMA . GMASize , writeHash , writeSize )
}
if writeHash != dboGMA . GMAHash {
//fail
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return fmt . Errorf ( "RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)" , dboGMA . GMAHash , dboGMA . GMASize , writeHash , writeSize )
}
}
// TODO: 4... profit?
dboGMA . ProcessingEnd = time . Now ( )
dboGMA . ProcessingDuration = dboGMA . ProcessingEnd . Sub ( dboGMA . ProcessingStart ) . Milliseconds ( )
dboGMA . Success = true
_ , err = colGMA . UpdateDocument ( arangoCTX , dboGMA . ID , dboGMA )
if err != nil {
undoBatch ( true , dboGMA . ID , fileIDs , gma2FileIDs )
return err
}
return nil
}