added recursiveness and new progbar

master
cheetah 2 years ago
parent f859235f45
commit 7973a6fc33

3
.gitignore vendored

@ -7,4 +7,5 @@ storageserver/storageserver
gmad_linux
.vscode/
storageserver/test/
zstd-tar-test/
zstd-tar-test/
gma-puzzles

@ -0,0 +1,138 @@
package chunk
import (
"archive/tar"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"os"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/common"
"github.com/klauspost/compress/zstd"
)
type PoolRecoveryData struct {
PoolID string `json:"_key"`
Size uint64 `json:"size"`
Created time.Time `json:"date"`
Hash string `json:"hash"`
ItemCount int `json:"itemCount"`
Items []string `json:"items"`
RecoveryData []common.DB_File `json:"recoveryData"`
}
type ChunkReader struct {
FileHandle *os.File
ExpectedHash *string
ExpectedSize *uint64
}
func NewChunkReader(fileName string) (_ ChunkReader, err error) {
return ChunkReader{}.NewReader(fileName)
}
func (r ChunkReader) NewReader(fileName string) (_ ChunkReader, err error) {
r.FileHandle, err = os.Open(fileName)
if err != nil {
return r, err
}
return r, nil
}
func (r ChunkReader) NewReaderFrom(fileHandle *os.File) (_ ChunkReader, err error) {
r.FileHandle = fileHandle
return r, nil
}
func (r *ChunkReader) LoadRecoveryFile(fileName string) (err error) {
var poolRecoveryData PoolRecoveryData
readJSONFile, err := os.Open(fileName)
if err != nil {
return err
}
defer readJSONFile.Close()
readBytes, err := io.ReadAll(readJSONFile)
if err != nil {
return err
}
err = json.Unmarshal(readBytes, &poolRecoveryData)
if err != nil {
return err
}
r.ExpectedHash = &poolRecoveryData.Hash
r.ExpectedSize = &poolRecoveryData.Size
return nil
}
func (r *ChunkReader) CheckIntegrity() (err error) {
// re-open and check
r.FileHandle.Seek(0, 0)
shaHasher := sha256.New()
hashedBytes, err := io.Copy(shaHasher, r.FileHandle)
if err != nil {
return err
}
readHash := fmt.Sprintf("%x", shaHasher.Sum(nil))
//fmt.Printf("PackPoolTar hash is %s\n", readHash)
if readHash != *r.ExpectedHash {
return fmt.Errorf("WORM Hash %s != Hash %s", readHash, *r.ExpectedHash)
}
packFileStats, err := r.FileHandle.Stat()
if err != nil {
return err
}
readSize := packFileStats.Size()
if readSize != int64(*r.ExpectedSize) {
return fmt.Errorf("WORM Copy FileSize %d != FileSize %d", readSize, *r.ExpectedSize)
}
// validate written tar-chunk
_, err = r.FileHandle.Seek(0, 0)
if err != nil {
return err
}
decompressor, err := zstd.NewReader(r.FileHandle, zstd.WithDecoderConcurrency(8))
if err != nil {
return err
}
defer decompressor.Close()
tarFileCheckReader := tar.NewReader(decompressor)
//filenamesReadBackList := []string{}
for {
header, err := tarFileCheckReader.Next()
//header.PAXRecords
if err == io.EOF {
break
}
if err != nil {
return err
}
hasher := sha256.New()
hashedBytes, err := io.Copy(hasher, tarFileCheckReader)
if err != nil {
return err
}
readBackChecksum := fmt.Sprintf("%x", hasher.Sum(nil))
if hashedBytes != header.Size {
return fmt.Errorf("validation on output archive, incorrect size file %s has %d should be %d", header.Name, hashedBytes, header.Size)
}
if header.Name != readBackChecksum {
return fmt.Errorf("validation on output archive, incorrect checksum file %s has %s", header.Name, readBackChecksum)
}
//filenamesReadBackList = append(filenamesReadBackList, header.Name)
}
if hashedBytes != int64(*r.ExpectedSize) {
return fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, *r.ExpectedSize)
}
return nil
}
func (r *ChunkReader) Close() {
r.FileHandle.Close()
}

@ -15,6 +15,7 @@ import (
"strings"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/chunk"
"git.cheetah.cat/worksucc/gma-puzzles/common"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
@ -94,23 +95,19 @@ func InitDatabase() (err error) {
return nil
}
type PoolRecoveryData struct {
PoolID string `json:"_key"`
Size uint64 `json:"size"`
Created time.Time `json:"date"`
Hash string `json:"hash"`
ItemCount int `json:"itemCount"`
Items []string `json:"items"`
RecoveryData []common.DB_File `json:"recoveryData"`
}
func bla() error {
entries, err := os.ReadDir("/mnt/SC9000/storagePools/")
if err != nil {
return err
}
for _, e := range entries {
if strings.Contains(e.Name(), ".json") {
continue
}
if !strings.Contains(e.Name(), "78d8553f-d716-47e3-ad21-c94f00c5e19c") {
continue
}
if !e.IsDir() {
fmt.Printf("Scanning For Local Pools, found %s:", e.Name())
@ -120,15 +117,15 @@ func bla() error {
return err
}
parts := strings.Split(e.Name(), ".")
var chunk common.DB_Chunk
_, err = colChunk.ReadDocument(arangoCTX, parts[0], &chunk)
var dboChunk common.DB_Chunk
_, err = colChunk.ReadDocument(arangoCTX, parts[0], &dboChunk)
if err != nil {
return err
}
chunk.Finalized = true
chunk.NotReady = false
chunk.ReadOnly = true
chunk.Size = stats.Size()
dboChunk.Finalized = true
dboChunk.NotReady = false
dboChunk.ReadOnly = true
dboChunk.Size = stats.Size()
zstFile, err := os.Open(tarFinalPath)
if err != nil {
@ -141,11 +138,11 @@ func bla() error {
}
jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0]))
fmt.Print(jsonPath)
_, err = os.Stat(jsonPath)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
if errors.Is(err, os.ErrNotExist) {
fmt.Println("json 404")
// rewrite json from db
zstFile.Seek(0, 0)
@ -168,16 +165,17 @@ func bla() error {
items = append(items, header.Name)
}
poolRecoveryData := PoolRecoveryData{
poolRecoveryData := chunk.PoolRecoveryData{
PoolID: parts[0],
Size: uint64(stats.Size()),
Created: time.Now(),
Hash: fmt.Sprintf("%x", shaHasher.Sum(nil)),
ItemCount: 500,
ItemCount: len(items),
Items: items,
//RecoveryData,
}
chunk.Hash = poolRecoveryData.Hash
dboChunk.Hash = poolRecoveryData.Hash
dboChunk.FileCount = len(items)
//TODO: fetch RecoveryData from DB
poolRecoveryData.RecoveryData = make([]common.DB_File, len(items))
@ -201,7 +199,7 @@ func bla() error {
}
}
} else {
var poolRecoveryData PoolRecoveryData
var poolRecoveryData chunk.PoolRecoveryData
readJSONFile, err := os.Open(jsonPath)
if err != nil {
@ -221,8 +219,7 @@ func bla() error {
poolRecoveryData.Size = uint64(stats.Size())
poolRecoveryData.Created = time.Now()
poolRecoveryData.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil))
chunk.Hash = poolRecoveryData.Hash
dboChunk.Hash = poolRecoveryData.Hash
json, err := json.MarshalIndent(poolRecoveryData, "", "\t")
if err != nil {
return fmt.Errorf("error @json.MarshalIndent %v", err)
@ -236,13 +233,14 @@ func bla() error {
return fmt.Errorf("error @recoveryFile.Write %v", err)
}
}
_, err = colChunk.UpdateDocument(arangoCTX, parts[0], &chunk)
_, err = colChunk.UpdateDocument(arangoCTX, parts[0], &dboChunk)
if err != nil {
return err
}
fmt.Printf(":%d\n", dboChunk.FileCount)
}
}
return nil
}
func main() {

@ -8,6 +8,10 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/djherbis/times v1.5.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/gosuri/uilive v0.0.4 // indirect
github.com/gosuri/uiprogress v0.0.1 // indirect
github.com/jedib0t/go-pretty v4.3.0+incompatible // indirect
github.com/jedib0t/go-pretty/v6 v6.4.6 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/labstack/echo v3.3.10+incompatible // indirect
github.com/labstack/echo/v4 v4.10.2 // indirect

@ -10,6 +10,14 @@ github.com/djherbis/times v1.5.0 h1:79myA211VwPhFTqUk8xehWrsEO+zcIZj0zT8mXPVARU=
github.com/djherbis/times v1.5.0/go.mod h1:5q7FDLvbNg1L/KaBmPcWlVR9NmoKo3+ucqUA3ijQhA0=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY=
github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI=
github.com/gosuri/uiprogress v0.0.1 h1:0kpv/XY/qTmFWl/SkaJykZXrBBzwwadmW8fRb7RJSxw=
github.com/gosuri/uiprogress v0.0.1/go.mod h1:C1RTYn4Sc7iEyf6j8ft5dyoZ4212h8G1ol9QQluh5+0=
github.com/jedib0t/go-pretty v4.3.0+incompatible h1:CGs8AVhEKg/n9YbUenWmNStRW2PHJzaeDodcfvRAbIo=
github.com/jedib0t/go-pretty v4.3.0+incompatible/go.mod h1:XemHduiw8R651AF9Pt4FwCTKeG3oo7hrHJAoznj9nag=
github.com/jedib0t/go-pretty/v6 v6.4.6 h1:v6aG9h6Uby3IusSSEjHaZNXpHFhzqMmjXcPq1Rjl9Jw=
github.com/jedib0t/go-pretty/v6 v6.4.6/go.mod h1:Ndk3ase2CkQbXLLNf5QDHoYb6J9WtVfmHZu9n8rk2xs=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
@ -26,12 +34,14 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
@ -39,9 +49,12 @@ github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc
github.com/schollz/progressbar/v3 v3.13.1 h1:o8rySDYiQ59Mwzy2FELeHY5ZARXZTVJC7iHD6PEFUiE=
github.com/schollz/progressbar/v3 v3.13.1/go.mod h1:xvrbki8kfT1fzWzBT/UZd9L6GA+jdL7HAgq2RFnO6fQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/twinj/uuid v1.0.0 h1:fzz7COZnDrXGTAOHGuUGYd6sG+JMq+AoE7+Jlu0przk=
github.com/twinj/uuid v1.0.0/go.mod h1:mMgcE1RHFUFqe5AfiwlINXisXfDGro23fWdPUfOMjRY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@ -57,6 +70,7 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -73,3 +87,4 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -0,0 +1,228 @@
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/chunk"
"git.cheetah.cat/worksucc/gma-puzzles/common"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/schollz/progressbar/v3"
)
var (
arangoDB adriver.Database
arangoCTX context.Context
colChunk adriver.Collection
colFile adriver.Collection
colFile2Chunk adriver.Collection
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
log.Println("connectDB:", "Starting Connection Process...")
// Retry Loop for Failed Connections
for i := 0; i < 6; i++ {
if i == 5 {
return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i)
} else if i > 0 {
time.Sleep(30 * time.Second)
}
// Connect to ArangoDB URL
conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{
Endpoints: []string{baseURL},
TLSConfig: &tls.Config{ /*...*/ },
})
if err != nil {
log.Println("connectDB:", "Cannot Connect to ArangoDB!", err)
continue
}
// Connect Driver to User
client, err := adriver.NewClient(adriver.ClientConfig{
Connection: conn,
Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD),
})
if err != nil {
log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err)
continue
}
// Create Context for Database Access
ctx = context.Background()
driver, err = client.Database(ctx, arangoDatabase)
if err != nil {
log.Println("connectDB:", "Cannot Load ArangoDB Database!", err)
continue
}
log.Println("connectDB:", "Connection Sucessful!")
return driver, ctx, nil
}
return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?")
}
func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator")
if err != nil {
return err
}
colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
if err != nil {
return err
}
colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil {
return err
}
colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map")
if err != nil {
return err
}
return nil
}
func CheckIntegrity(tarPath string) (err error) {
return nil
}
type PoolRecoveryData struct {
PoolID string `json:"_key"`
Size uint64 `json:"size"`
Created time.Time `json:"date"`
Hash string `json:"hash"`
ItemCount int `json:"itemCount"`
Items []string `json:"items"`
RecoveryData []common.DB_File `json:"recoveryData"`
}
var DoneTaskCount chan int
var TotalTaskCount chan int
var DoneTaskCountV int
var TotalTaskCountV int
var ConcurrencyLimit int = 12
var WorkerJobPool chan string
func bla() error {
WorkerJobPool = make(chan string)
sem := common.NewSemaphore(ConcurrencyLimit)
wg := sync.WaitGroup{}
entries, err := os.ReadDir("/mnt/SC9000/storagePools/")
if err != nil {
return err
}
chunkNames := []string{}
for _, e := range entries {
if strings.Contains(e.Name(), ".json") {
continue
}
if !e.IsDir() {
chunkNames = append(chunkNames, e.Name())
}
}
//
TotalTaskCount = make(chan int)
DoneTaskCount = make(chan int)
validationBar := progressbar.Default(int64(len(chunkNames)), "Validating Chunks")
go func() {
for {
select {
case <-TotalTaskCount:
TotalTaskCountV++
case <-DoneTaskCount:
DoneTaskCountV++
validationBar.Add(1)
if TotalTaskCountV == DoneTaskCountV {
fmt.Println("APPARENTLY WE are done")
close(TotalTaskCount)
close(DoneTaskCount)
return
}
}
}
}()
for _, chunkName := range chunkNames {
wg.Add(1)
TotalTaskCount <- 1
go func(job string, wg *sync.WaitGroup) (err error) {
sem.Acquire() // Wait for worker to have slot open
defer sem.Release() // Release the slot
defer wg.Done() // Finish job
defer func() {
DoneTaskCount <- 1
}()
//fmt.Printf("Scanning For Local Pools, found %s:", job)
tarFinalPath := filepath.Join("/mnt/SC9000/storagePools/", job)
_, err = os.Stat(tarFinalPath)
if err != nil {
log.Println(err)
return err
}
parts := strings.Split(job, ".")
jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0]))
_, err = os.Stat(jsonPath)
if err != nil {
log.Println(err)
return err
}
var dboChunk common.DB_Chunk
_, err = colChunk.ReadDocument(arangoCTX, parts[0], &dboChunk)
if err != nil {
log.Println(err)
log.Printf("Chunk %s does exist on disk but not in database\n", job)
}
chunkReader, err := chunk.NewChunkReader(tarFinalPath)
if err != nil {
log.Println(err)
return err
}
err = chunkReader.LoadRecoveryFile(jsonPath)
if err != nil {
log.Println(err)
return err
}
err = chunkReader.CheckIntegrity()
if err != nil {
log.Println(err)
return err
}
return nil
}(chunkName, &wg)
if err != nil {
return err
}
}
// Wait for all jobs to finish
wg.Wait()
return nil
}
func main() {
err := InitDatabase()
if err != nil {
panic(err)
}
err = bla()
if err != nil {
panic(err)
}
}

@ -25,6 +25,8 @@ import (
"github.com/schollz/progressbar/v3"
"github.com/twinj/uuid"
"github.com/jedib0t/go-pretty/v6/progress"
_ "net/http/pprof"
)
@ -266,6 +268,23 @@ func modeRebuild(id string) (err error) {
return nil
}
func recursive(jobs []string, folderPath string) (WorkerJobPool []string) {
entries, err := os.ReadDir(folderPath)
if err != nil {
panic(err)
}
for _, e := range entries {
fullPath := filepath.Join(folderPath, e.Name())
if e.IsDir() {
jobs = recursive(jobs, fullPath)
}
if !e.IsDir() {
jobs = append(jobs, filepath.Join(folderPath, e.Name()))
}
}
return jobs
}
func modeIngress(folderPath string, skipName string) {
skipNameEnabled := len(skipName) > 0
entries, err := os.ReadDir(folderPath)
@ -275,6 +294,10 @@ func modeIngress(folderPath string, skipName string) {
var WorkerJobPool []string
for _, e := range entries {
fullPath := filepath.Join(folderPath, e.Name())
if e.IsDir() {
WorkerJobPool = recursive(WorkerJobPool, fullPath)
}
if !e.IsDir() && skipNameEnabled {
if e.Name() == skipName {
skipNameEnabled = false
@ -286,17 +309,41 @@ func modeIngress(folderPath string, skipName string) {
WorkerJobPool = append(WorkerJobPool, filepath.Join(folderPath, e.Name()))
}
}
wg := sync.WaitGroup{}
pw := progress.NewWriter()
pw.SetAutoStop(true)
pw.SetTrackerLength(40)
pw.SetMessageWidth(40)
//pw.SetNumTrackersExpected(*flagNumTrackers)
pw.SetSortBy(progress.SortByPercentDsc)
pw.SetStyle(progress.StyleDefault)
pw.SetTrackerPosition(progress.PositionRight)
pw.SetUpdateFrequency(time.Millisecond * 100)
pw.Style().Colors = progress.StyleColorsExample
pw.Style().Options.PercentFormat = "%4.1f%%"
pw.Style().Visibility.ETA = true
pw.Style().Visibility.ETAOverall = true
pw.Style().Visibility.Percentage = false
pw.Style().Visibility.Speed = true
pw.Style().Visibility.SpeedOverall = false
pw.Style().Visibility.Time = true
pw.Style().Visibility.TrackerOverall = true
pw.Style().Visibility.Value = true
pw.Style().Visibility.Pinned = true
// call Render() in async mode; yes we don't have any trackers at the moment
go pw.Render()
for _, jobFile := range WorkerJobPool {
wg.Add(1)
go func(jobFile string, wg *sync.WaitGroup) {
err = ProcessGMA(jobFile)
defer wg.Done()
err = ProcessGMA(pw, jobFile)
if err != nil {
log.Printf("\nERROR: %v\n", err)
}
wg.Done()
}(jobFile, &wg)
}
@ -325,7 +372,7 @@ func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []str
*/
return nil
}
func ProcessGMA(filePath string) (err error) {
func ProcessGMA(pw progress.Writer, filePath string) (err error) {
var unlockOnce sync.Once
fmt.Println("trying to acquire global write lock")
@ -379,8 +426,11 @@ func ProcessGMA(filePath string) (err error) {
if dboGMA.GMASize < 200 {
return fmt.Errorf("GMA File too small, skipping")
}
niceName := filepath.Base(filePath)
trackerProcess := progress.Tracker{Message: fmt.Sprintf("Extracting %s", niceName), Total: 0, Units: progress.UnitsDefault}
pw.AppendTracker(&trackerProcess)
log.Printf("Opening %s\n", filePath)
//log.Printf("Opening %s\n", filePath)
gmaReader, err := gma.NewReader(filePath)
if err != nil {
return err
@ -416,8 +466,8 @@ func ProcessGMA(filePath string) (err error) {
return err
}
dboGMA.Header = header
log.Printf("Name=%s\n", header.Title)
log.Printf("Desc=%s\n", header.Description)
//log.Printf("Name=%s\n", header.Title)
//log.Printf("Desc=%s\n", header.Description)
// log.Printf("AddonVersion=%d\n", header.AddonVersion)
// log.Printf("FormatVersion=%d\n", header.FormatVersion)
// log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte)
@ -534,12 +584,15 @@ func ProcessGMA(filePath string) (err error) {
}
dboExistFile2GMA[dboGMA2File.ID] = exists
}
trackerProcess.MarkAsDone()
// TODO: upload all unknownNewFiles to StorageServer
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200
var httpClient *http.Client = http.DefaultClient
uploadBar := progressbar.Default(int64(len(dboFiles)), "Uploading to StorageServer")
trackerUpload := progress.Tracker{Message: fmt.Sprintf("Uploading %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault}
pw.AppendTracker(&trackerUpload)
for _, dboFile := range dboFiles {
dboFileID := fmt.Sprintf("file/%s", dboFile.ID)
//fmt.Printf("Line 460: %s checking if we need to store this on the server", dboFileID)
@ -557,8 +610,7 @@ func ProcessGMA(filePath string) (err error) {
//body, _ := ioutil.ReadAll(res.Body)
//fmt.Printf("res.StatusCode = %d\n", res.StatusCode)
if res.StatusCode == http.StatusAlreadyReported {
uploadBar.Describe("Skipping")
uploadBar.Add(1)
trackerUpload.Increment(1)
continue
}
@ -588,7 +640,7 @@ func ProcessGMA(filePath string) (err error) {
return err
}
uploadBar.Describe("Uploading")
//uploadBar.Describe("Uploading")
err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName, fileInfoJSON, workerID)
if err != nil {
log.Println("err @common.MultipartUpload")
@ -598,6 +650,7 @@ func ProcessGMA(filePath string) (err error) {
} else {
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
}
@ -608,6 +661,7 @@ func ProcessGMA(filePath string) (err error) {
log.Println("err @colGMA2File.DocumentExists")
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
if !exists {
@ -616,10 +670,11 @@ func ProcessGMA(filePath string) (err error) {
log.Println("err @colGMA2File.CreateDocument")
log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerUpload.MarkAsErrored()
return err
}
}
uploadBar.Add(1)
trackerUpload.Increment(1)
break
}
time.Sleep(10 * time.Second)
@ -630,17 +685,18 @@ func ProcessGMA(filePath string) (err error) {
}
}
}
trackerUpload.MarkAsDone()
// at this point we can release the write semaphore
unlockOnce.Do(GlobalWriteLock.Unlock) // release anyway
fmt.Println("unlocking GlobalWriteLock")
//fmt.Println("unlocking GlobalWriteLock")
// TODO : fetch all files from storageServer
// TODO : write new gma from arangoinfo
// TODO : compare hashes
{
log.Println("rewriting gma")
rewriteBar := progressbar.Default(int64(len(dboGMA2Files)), "Rewriting GMA")
trackerRewrite := progress.Tracker{Message: fmt.Sprintf("Rewriting %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault}
pw.AppendTracker(&trackerRewrite)
destPath := filepath.Join(gmaTempPath, "rewrite.gma")
dir := filepath.Dir(destPath)
@ -695,7 +751,7 @@ func ProcessGMA(filePath string) (err error) {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
rewriteBar.Add(1)
trackerRewrite.Increment(1)
}
gmaWriter.FileHandle.Seek(0, 2)
log.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC)
@ -708,10 +764,9 @@ func ProcessGMA(filePath string) (err error) {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath)
log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath)
log.Println()
//log.Printf("Rewrite Hash is %s %s\n", writeHash, destPath)
//log.Printf("Original Hash is %s %s\n", dboGMA.GMAHash, dboGMA.OriginalPath)
//log.Println()
writeStat, err := os.Stat(destPath)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
@ -721,13 +776,16 @@ func ProcessGMA(filePath string) (err error) {
if writeSize != dboGMA.GMASize {
//fail
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerRewrite.MarkAsErrored()
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
}
if writeHash != dboGMA.GMAHash {
//fail
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerRewrite.MarkAsErrored()
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
}
trackerRewrite.MarkAsDone()
}
// TODO: 4... profit?

Loading…
Cancel
Save