diff --git a/.gitignore b/.gitignore index 004aea6..b2ddb69 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ storageserver/storageserver gmad_linux .vscode/ storageserver/test/ -zstd-tar-test/ \ No newline at end of file +zstd-tar-test/ +gma-puzzles diff --git a/chunk/chunk.go b/chunk/chunk.go new file mode 100644 index 0000000..7e2cd6f --- /dev/null +++ b/chunk/chunk.go @@ -0,0 +1,138 @@ +package chunk + +import ( + "archive/tar" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "os" + "time" + + "git.cheetah.cat/worksucc/gma-puzzles/common" + "github.com/klauspost/compress/zstd" +) + +type PoolRecoveryData struct { + PoolID string `json:"_key"` + Size uint64 `json:"size"` + Created time.Time `json:"date"` + Hash string `json:"hash"` + + ItemCount int `json:"itemCount"` + Items []string `json:"items"` + RecoveryData []common.DB_File `json:"recoveryData"` +} + +type ChunkReader struct { + FileHandle *os.File + ExpectedHash *string + ExpectedSize *uint64 +} + +func NewChunkReader(fileName string) (_ ChunkReader, err error) { + return ChunkReader{}.NewReader(fileName) +} +func (r ChunkReader) NewReader(fileName string) (_ ChunkReader, err error) { + r.FileHandle, err = os.Open(fileName) + if err != nil { + return r, err + } + + return r, nil +} +func (r ChunkReader) NewReaderFrom(fileHandle *os.File) (_ ChunkReader, err error) { + r.FileHandle = fileHandle + return r, nil +} +func (r *ChunkReader) LoadRecoveryFile(fileName string) (err error) { + var poolRecoveryData PoolRecoveryData + + readJSONFile, err := os.Open(fileName) + if err != nil { + return err + } + defer readJSONFile.Close() + + readBytes, err := io.ReadAll(readJSONFile) + if err != nil { + return err + } + err = json.Unmarshal(readBytes, &poolRecoveryData) + if err != nil { + return err + } + + r.ExpectedHash = &poolRecoveryData.Hash + r.ExpectedSize = &poolRecoveryData.Size + return nil +} + +func (r *ChunkReader) CheckIntegrity() (err error) { + // re-open and check + r.FileHandle.Seek(0, 0) + shaHasher := sha256.New() + hashedBytes, err := io.Copy(shaHasher, r.FileHandle) + if err != nil { + return err + } + readHash := fmt.Sprintf("%x", shaHasher.Sum(nil)) + //fmt.Printf("PackPoolTar hash is %s\n", readHash) + if readHash != *r.ExpectedHash { + return fmt.Errorf("WORM Hash %s != Hash %s", readHash, *r.ExpectedHash) + } + packFileStats, err := r.FileHandle.Stat() + if err != nil { + return err + } + readSize := packFileStats.Size() + if readSize != int64(*r.ExpectedSize) { + return fmt.Errorf("WORM Copy FileSize %d != FileSize %d", readSize, *r.ExpectedSize) + } + // validate written tar-chunk + _, err = r.FileHandle.Seek(0, 0) + if err != nil { + return err + } + + decompressor, err := zstd.NewReader(r.FileHandle, zstd.WithDecoderConcurrency(8)) + if err != nil { + return err + } + defer decompressor.Close() + tarFileCheckReader := tar.NewReader(decompressor) + + //filenamesReadBackList := []string{} + for { + header, err := tarFileCheckReader.Next() + //header.PAXRecords + if err == io.EOF { + break + } + if err != nil { + return err + } + hasher := sha256.New() + hashedBytes, err := io.Copy(hasher, tarFileCheckReader) + if err != nil { + return err + } + readBackChecksum := fmt.Sprintf("%x", hasher.Sum(nil)) + if hashedBytes != header.Size { + return fmt.Errorf("validation on output archive, incorrect size file %s has %d should be %d", header.Name, hashedBytes, header.Size) + } + if header.Name != readBackChecksum { + return fmt.Errorf("validation on output archive, incorrect checksum file %s has %s", header.Name, readBackChecksum) + } + //filenamesReadBackList = append(filenamesReadBackList, header.Name) + } + + if hashedBytes != int64(*r.ExpectedSize) { + return fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, *r.ExpectedSize) + } + return nil +} + +func (r *ChunkReader) Close() { + r.FileHandle.Close() +} diff --git a/fix/fix.go b/fix/fix.go index 6879d91..bc02ac6 100644 --- a/fix/fix.go +++ b/fix/fix.go @@ -15,6 +15,7 @@ import ( "strings" "time" + "git.cheetah.cat/worksucc/gma-puzzles/chunk" "git.cheetah.cat/worksucc/gma-puzzles/common" adriver "github.com/arangodb/go-driver" ahttp "github.com/arangodb/go-driver/http" @@ -94,23 +95,19 @@ func InitDatabase() (err error) { return nil } -type PoolRecoveryData struct { - PoolID string `json:"_key"` - Size uint64 `json:"size"` - Created time.Time `json:"date"` - Hash string `json:"hash"` - - ItemCount int `json:"itemCount"` - Items []string `json:"items"` - RecoveryData []common.DB_File `json:"recoveryData"` -} - func bla() error { entries, err := os.ReadDir("/mnt/SC9000/storagePools/") if err != nil { return err } for _, e := range entries { + if strings.Contains(e.Name(), ".json") { + continue + } + if !strings.Contains(e.Name(), "78d8553f-d716-47e3-ad21-c94f00c5e19c") { + continue + } + if !e.IsDir() { fmt.Printf("Scanning For Local Pools, found %s:", e.Name()) @@ -120,15 +117,15 @@ func bla() error { return err } parts := strings.Split(e.Name(), ".") - var chunk common.DB_Chunk - _, err = colChunk.ReadDocument(arangoCTX, parts[0], &chunk) + var dboChunk common.DB_Chunk + _, err = colChunk.ReadDocument(arangoCTX, parts[0], &dboChunk) if err != nil { return err } - chunk.Finalized = true - chunk.NotReady = false - chunk.ReadOnly = true - chunk.Size = stats.Size() + dboChunk.Finalized = true + dboChunk.NotReady = false + dboChunk.ReadOnly = true + dboChunk.Size = stats.Size() zstFile, err := os.Open(tarFinalPath) if err != nil { @@ -141,11 +138,11 @@ func bla() error { } jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0])) - + fmt.Print(jsonPath) _, err = os.Stat(jsonPath) if err != nil { - if !errors.Is(err, os.ErrNotExist) { - + if errors.Is(err, os.ErrNotExist) { + fmt.Println("json 404") // rewrite json from db zstFile.Seek(0, 0) @@ -168,16 +165,17 @@ func bla() error { items = append(items, header.Name) } - poolRecoveryData := PoolRecoveryData{ + poolRecoveryData := chunk.PoolRecoveryData{ PoolID: parts[0], Size: uint64(stats.Size()), Created: time.Now(), Hash: fmt.Sprintf("%x", shaHasher.Sum(nil)), - ItemCount: 500, + ItemCount: len(items), Items: items, //RecoveryData, } - chunk.Hash = poolRecoveryData.Hash + dboChunk.Hash = poolRecoveryData.Hash + dboChunk.FileCount = len(items) //TODO: fetch RecoveryData from DB poolRecoveryData.RecoveryData = make([]common.DB_File, len(items)) @@ -201,7 +199,7 @@ func bla() error { } } } else { - var poolRecoveryData PoolRecoveryData + var poolRecoveryData chunk.PoolRecoveryData readJSONFile, err := os.Open(jsonPath) if err != nil { @@ -221,8 +219,7 @@ func bla() error { poolRecoveryData.Size = uint64(stats.Size()) poolRecoveryData.Created = time.Now() poolRecoveryData.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil)) - chunk.Hash = poolRecoveryData.Hash - + dboChunk.Hash = poolRecoveryData.Hash json, err := json.MarshalIndent(poolRecoveryData, "", "\t") if err != nil { return fmt.Errorf("error @json.MarshalIndent %v", err) @@ -236,13 +233,14 @@ func bla() error { return fmt.Errorf("error @recoveryFile.Write %v", err) } } - _, err = colChunk.UpdateDocument(arangoCTX, parts[0], &chunk) + _, err = colChunk.UpdateDocument(arangoCTX, parts[0], &dboChunk) if err != nil { return err } - + fmt.Printf(":%d\n", dboChunk.FileCount) } } + return nil } func main() { diff --git a/go.mod b/go.mod index 020e1c7..f3c6bbc 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,10 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/djherbis/times v1.5.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect + github.com/gosuri/uilive v0.0.4 // indirect + github.com/gosuri/uiprogress v0.0.1 // indirect + github.com/jedib0t/go-pretty v4.3.0+incompatible // indirect + github.com/jedib0t/go-pretty/v6 v6.4.6 // indirect github.com/klauspost/compress v1.16.6 // indirect github.com/labstack/echo v3.3.10+incompatible // indirect github.com/labstack/echo/v4 v4.10.2 // indirect diff --git a/go.sum b/go.sum index 1dcaa4b..11559d3 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,14 @@ github.com/djherbis/times v1.5.0 h1:79myA211VwPhFTqUk8xehWrsEO+zcIZj0zT8mXPVARU= github.com/djherbis/times v1.5.0/go.mod h1:5q7FDLvbNg1L/KaBmPcWlVR9NmoKo3+ucqUA3ijQhA0= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY= +github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI= +github.com/gosuri/uiprogress v0.0.1 h1:0kpv/XY/qTmFWl/SkaJykZXrBBzwwadmW8fRb7RJSxw= +github.com/gosuri/uiprogress v0.0.1/go.mod h1:C1RTYn4Sc7iEyf6j8ft5dyoZ4212h8G1ol9QQluh5+0= +github.com/jedib0t/go-pretty v4.3.0+incompatible h1:CGs8AVhEKg/n9YbUenWmNStRW2PHJzaeDodcfvRAbIo= +github.com/jedib0t/go-pretty v4.3.0+incompatible/go.mod h1:XemHduiw8R651AF9Pt4FwCTKeG3oo7hrHJAoznj9nag= +github.com/jedib0t/go-pretty/v6 v6.4.6 h1:v6aG9h6Uby3IusSSEjHaZNXpHFhzqMmjXcPq1Rjl9Jw= +github.com/jedib0t/go-pretty/v6 v6.4.6/go.mod h1:Ndk3ase2CkQbXLLNf5QDHoYb6J9WtVfmHZu9n8rk2xs= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= @@ -26,12 +34,14 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= @@ -39,9 +49,12 @@ github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc github.com/schollz/progressbar/v3 v3.13.1 h1:o8rySDYiQ59Mwzy2FELeHY5ZARXZTVJC7iHD6PEFUiE= github.com/schollz/progressbar/v3 v3.13.1/go.mod h1:xvrbki8kfT1fzWzBT/UZd9L6GA+jdL7HAgq2RFnO6fQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/twinj/uuid v1.0.0 h1:fzz7COZnDrXGTAOHGuUGYd6sG+JMq+AoE7+Jlu0przk= github.com/twinj/uuid v1.0.0/go.mod h1:mMgcE1RHFUFqe5AfiwlINXisXfDGro23fWdPUfOMjRY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -57,6 +70,7 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -73,3 +87,4 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/integritycheck/integritycheck.go b/integritycheck/integritycheck.go new file mode 100644 index 0000000..1595149 --- /dev/null +++ b/integritycheck/integritycheck.go @@ -0,0 +1,228 @@ +package main + +import ( + "context" + "crypto/tls" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "git.cheetah.cat/worksucc/gma-puzzles/chunk" + "git.cheetah.cat/worksucc/gma-puzzles/common" + adriver "github.com/arangodb/go-driver" + ahttp "github.com/arangodb/go-driver/http" + "github.com/schollz/progressbar/v3" +) + +var ( + arangoDB adriver.Database + arangoCTX context.Context + colChunk adriver.Collection + colFile adriver.Collection + colFile2Chunk adriver.Collection +) + +func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) { + log.Println("connectDB:", "Starting Connection Process...") + + // Retry Loop for Failed Connections + for i := 0; i < 6; i++ { + if i == 5 { + return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i) + } else if i > 0 { + time.Sleep(30 * time.Second) + } + + // Connect to ArangoDB URL + conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{ + Endpoints: []string{baseURL}, + TLSConfig: &tls.Config{ /*...*/ }, + }) + if err != nil { + log.Println("connectDB:", "Cannot Connect to ArangoDB!", err) + continue + } + + // Connect Driver to User + client, err := adriver.NewClient(adriver.ClientConfig{ + Connection: conn, + Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD), + }) + if err != nil { + log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err) + continue + } + + // Create Context for Database Access + ctx = context.Background() + driver, err = client.Database(ctx, arangoDatabase) + if err != nil { + log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) + continue + } + log.Println("connectDB:", "Connection Sucessful!") + return driver, ctx, nil + } + + return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?") +} +func InitDatabase() (err error) { + arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") + if err != nil { + return err + } + + colChunk, err = arangoDB.Collection(arangoCTX, "chunk") + if err != nil { + return err + } + colFile, err = arangoDB.Collection(arangoCTX, "file") + if err != nil { + return err + } + colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") + if err != nil { + return err + } + return nil +} + +func CheckIntegrity(tarPath string) (err error) { + + return nil +} + +type PoolRecoveryData struct { + PoolID string `json:"_key"` + Size uint64 `json:"size"` + Created time.Time `json:"date"` + Hash string `json:"hash"` + + ItemCount int `json:"itemCount"` + Items []string `json:"items"` + RecoveryData []common.DB_File `json:"recoveryData"` +} + +var DoneTaskCount chan int +var TotalTaskCount chan int +var DoneTaskCountV int +var TotalTaskCountV int +var ConcurrencyLimit int = 12 +var WorkerJobPool chan string + +func bla() error { + WorkerJobPool = make(chan string) + sem := common.NewSemaphore(ConcurrencyLimit) + wg := sync.WaitGroup{} + + entries, err := os.ReadDir("/mnt/SC9000/storagePools/") + if err != nil { + return err + } + chunkNames := []string{} + for _, e := range entries { + if strings.Contains(e.Name(), ".json") { + continue + } + if !e.IsDir() { + chunkNames = append(chunkNames, e.Name()) + } + } + + // + TotalTaskCount = make(chan int) + DoneTaskCount = make(chan int) + validationBar := progressbar.Default(int64(len(chunkNames)), "Validating Chunks") + go func() { + for { + select { + case <-TotalTaskCount: + TotalTaskCountV++ + case <-DoneTaskCount: + DoneTaskCountV++ + validationBar.Add(1) + if TotalTaskCountV == DoneTaskCountV { + fmt.Println("APPARENTLY WE are done") + close(TotalTaskCount) + close(DoneTaskCount) + return + } + } + } + }() + for _, chunkName := range chunkNames { + wg.Add(1) + TotalTaskCount <- 1 + go func(job string, wg *sync.WaitGroup) (err error) { + sem.Acquire() // Wait for worker to have slot open + defer sem.Release() // Release the slot + defer wg.Done() // Finish job + defer func() { + DoneTaskCount <- 1 + }() + + //fmt.Printf("Scanning For Local Pools, found %s:", job) + tarFinalPath := filepath.Join("/mnt/SC9000/storagePools/", job) + _, err = os.Stat(tarFinalPath) + if err != nil { + log.Println(err) + return err + } + parts := strings.Split(job, ".") + jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0])) + _, err = os.Stat(jsonPath) + if err != nil { + log.Println(err) + return err + } + var dboChunk common.DB_Chunk + _, err = colChunk.ReadDocument(arangoCTX, parts[0], &dboChunk) + if err != nil { + log.Println(err) + log.Printf("Chunk %s does exist on disk but not in database\n", job) + } + + chunkReader, err := chunk.NewChunkReader(tarFinalPath) + if err != nil { + log.Println(err) + return err + } + err = chunkReader.LoadRecoveryFile(jsonPath) + if err != nil { + log.Println(err) + return err + } + + err = chunkReader.CheckIntegrity() + if err != nil { + log.Println(err) + return err + } + + return nil + + }(chunkName, &wg) + if err != nil { + return err + } + } + // Wait for all jobs to finish + wg.Wait() + + return nil +} +func main() { + err := InitDatabase() + if err != nil { + panic(err) + } + + err = bla() + if err != nil { + panic(err) + } +} diff --git a/main.go b/main.go index ea853f4..2575322 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,8 @@ import ( "github.com/schollz/progressbar/v3" "github.com/twinj/uuid" + "github.com/jedib0t/go-pretty/v6/progress" + _ "net/http/pprof" ) @@ -266,6 +268,23 @@ func modeRebuild(id string) (err error) { return nil } +func recursive(jobs []string, folderPath string) (WorkerJobPool []string) { + entries, err := os.ReadDir(folderPath) + if err != nil { + panic(err) + } + for _, e := range entries { + fullPath := filepath.Join(folderPath, e.Name()) + if e.IsDir() { + jobs = recursive(jobs, fullPath) + } + if !e.IsDir() { + jobs = append(jobs, filepath.Join(folderPath, e.Name())) + } + } + return jobs +} + func modeIngress(folderPath string, skipName string) { skipNameEnabled := len(skipName) > 0 entries, err := os.ReadDir(folderPath) @@ -275,6 +294,10 @@ func modeIngress(folderPath string, skipName string) { var WorkerJobPool []string for _, e := range entries { + fullPath := filepath.Join(folderPath, e.Name()) + if e.IsDir() { + WorkerJobPool = recursive(WorkerJobPool, fullPath) + } if !e.IsDir() && skipNameEnabled { if e.Name() == skipName { skipNameEnabled = false @@ -286,17 +309,41 @@ func modeIngress(folderPath string, skipName string) { WorkerJobPool = append(WorkerJobPool, filepath.Join(folderPath, e.Name())) } } - wg := sync.WaitGroup{} + pw := progress.NewWriter() + pw.SetAutoStop(true) + pw.SetTrackerLength(40) + pw.SetMessageWidth(40) + //pw.SetNumTrackersExpected(*flagNumTrackers) + pw.SetSortBy(progress.SortByPercentDsc) + pw.SetStyle(progress.StyleDefault) + pw.SetTrackerPosition(progress.PositionRight) + pw.SetUpdateFrequency(time.Millisecond * 100) + pw.Style().Colors = progress.StyleColorsExample + pw.Style().Options.PercentFormat = "%4.1f%%" + pw.Style().Visibility.ETA = true + pw.Style().Visibility.ETAOverall = true + pw.Style().Visibility.Percentage = false + pw.Style().Visibility.Speed = true + pw.Style().Visibility.SpeedOverall = false + pw.Style().Visibility.Time = true + pw.Style().Visibility.TrackerOverall = true + pw.Style().Visibility.Value = true + pw.Style().Visibility.Pinned = true + + // call Render() in async mode; yes we don't have any trackers at the moment + go pw.Render() + for _, jobFile := range WorkerJobPool { wg.Add(1) go func(jobFile string, wg *sync.WaitGroup) { - err = ProcessGMA(jobFile) + defer wg.Done() + err = ProcessGMA(pw, jobFile) if err != nil { log.Printf("\nERROR: %v\n", err) } - wg.Done() + }(jobFile, &wg) } @@ -325,7 +372,7 @@ func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []str */ return nil } -func ProcessGMA(filePath string) (err error) { +func ProcessGMA(pw progress.Writer, filePath string) (err error) { var unlockOnce sync.Once fmt.Println("trying to acquire global write lock") @@ -379,8 +426,11 @@ func ProcessGMA(filePath string) (err error) { if dboGMA.GMASize < 200 { return fmt.Errorf("GMA File too small, skipping") } + niceName := filepath.Base(filePath) + trackerProcess := progress.Tracker{Message: fmt.Sprintf("Extracting %s", niceName), Total: 0, Units: progress.UnitsDefault} + pw.AppendTracker(&trackerProcess) - log.Printf("Opening %s\n", filePath) + //log.Printf("Opening %s\n", filePath) gmaReader, err := gma.NewReader(filePath) if err != nil { return err @@ -416,8 +466,8 @@ func ProcessGMA(filePath string) (err error) { return err } dboGMA.Header = header - log.Printf("Name=%s\n", header.Title) - log.Printf("Desc=%s\n", header.Description) + //log.Printf("Name=%s\n", header.Title) + //log.Printf("Desc=%s\n", header.Description) // log.Printf("AddonVersion=%d\n", header.AddonVersion) // log.Printf("FormatVersion=%d\n", header.FormatVersion) // log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte) @@ -534,12 +584,15 @@ func ProcessGMA(filePath string) (err error) { } dboExistFile2GMA[dboGMA2File.ID] = exists } + trackerProcess.MarkAsDone() // TODO: upload all unknownNewFiles to StorageServer http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200 var httpClient *http.Client = http.DefaultClient - uploadBar := progressbar.Default(int64(len(dboFiles)), "Uploading to StorageServer") + trackerUpload := progress.Tracker{Message: fmt.Sprintf("Uploading %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault} + pw.AppendTracker(&trackerUpload) + for _, dboFile := range dboFiles { dboFileID := fmt.Sprintf("file/%s", dboFile.ID) //fmt.Printf("Line 460: %s checking if we need to store this on the server", dboFileID) @@ -557,8 +610,7 @@ func ProcessGMA(filePath string) (err error) { //body, _ := ioutil.ReadAll(res.Body) //fmt.Printf("res.StatusCode = %d\n", res.StatusCode) if res.StatusCode == http.StatusAlreadyReported { - uploadBar.Describe("Skipping") - uploadBar.Add(1) + trackerUpload.Increment(1) continue } @@ -588,7 +640,7 @@ func ProcessGMA(filePath string) (err error) { return err } - uploadBar.Describe("Uploading") + //uploadBar.Describe("Uploading") err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON, workerID) if err != nil { log.Println("err @common.MultipartUpload") @@ -598,6 +650,7 @@ func ProcessGMA(filePath string) (err error) { } else { log.Println("oopsie") undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + trackerUpload.MarkAsErrored() return err } } @@ -608,6 +661,7 @@ func ProcessGMA(filePath string) (err error) { log.Println("err @colGMA2File.DocumentExists") log.Println("oopsie") undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + trackerUpload.MarkAsErrored() return err } if !exists { @@ -616,10 +670,11 @@ func ProcessGMA(filePath string) (err error) { log.Println("err @colGMA2File.CreateDocument") log.Println("oopsie") undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + trackerUpload.MarkAsErrored() return err } } - uploadBar.Add(1) + trackerUpload.Increment(1) break } time.Sleep(10 * time.Second) @@ -630,17 +685,18 @@ func ProcessGMA(filePath string) (err error) { } } } - + trackerUpload.MarkAsDone() // at this point we can release the write semaphore unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway - fmt.Println("unlocking GlobalWriteLock") + //fmt.Println("unlocking GlobalWriteLock") // TODO : fetch all files from storageServer // TODO : write new gma from arangoinfo // TODO : compare hashes { log.Println("rewriting gma") - rewriteBar := progressbar.Default(int64(len(dboGMA2Files)), "Rewriting GMA") + trackerRewrite := progress.Tracker{Message: fmt.Sprintf("Rewriting %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault} + pw.AppendTracker(&trackerRewrite) destPath := filepath.Join(gmaTempPath, "rewrite.gma") dir := filepath.Dir(destPath) @@ -695,7 +751,7 @@ func ProcessGMA(filePath string) (err error) { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } - rewriteBar.Add(1) + trackerRewrite.Increment(1) } gmaWriter.FileHandle.Seek(0, 2) log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) @@ -708,10 +764,9 @@ func ProcessGMA(filePath string) (err error) { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) return err } - - log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath) - log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath) - log.Println() + //log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath) + //log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath) + //log.Println() writeStat, err := os.Stat(destPath) if err != nil { undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) @@ -721,13 +776,16 @@ func ProcessGMA(filePath string) (err error) { if writeSize != dboGMA.GMASize { //fail undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + trackerRewrite.MarkAsErrored() return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) } if writeHash != dboGMA.GMAHash { //fail undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + trackerRewrite.MarkAsErrored() return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) } + trackerRewrite.MarkAsDone() } // TODO: 4... profit?