From 3d9b5feba9d00a41d4dd9bccca9efc9cd09869e6 Mon Sep 17 00:00:00 2001 From: cheetah Date: Sat, 24 Jun 2023 20:08:40 -0500 Subject: [PATCH] first release --- main.go | 25 ++++++++++---- storageserver/storageserver.go | 61 +++++++++++++++++++++++++--------- 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/main.go b/main.go index c341367..8c438ad 100644 --- a/main.go +++ b/main.go @@ -420,17 +420,30 @@ func ProcessGMA(filePath string) (err error) { fmt.Printf("Import Duration %dms\n", time.Since(importStartTime).Milliseconds()) fmt.Println() // TODO: upload all unknownNewFiles to StorageServer + http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200 var httpClient *http.Client = http.DefaultClient + for _, unknownFile := range newUnknownFiles { unknownFileID := fmt.Sprintf("file/%s", unknownFile) for _, dboGMA2File := range dboGMA2Files { if unknownFileID == dboGMA2File.File { - fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName) - err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) - if err != nil { - fmt.Println("oopsie") - undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) - return err + uploadSuccess := true + for { + fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName) + err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) + if err != nil { + if strings.Contains(err.Error(), "cannot assign requested address") { + uploadSuccess = false + } else { + fmt.Println("oopsie") + undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) + return err + } + } + if uploadSuccess { + break + } + time.Sleep(10 * time.Second) } } } diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index b9aab1c..65e98c4 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -32,11 +32,11 @@ var ( ) type Pool struct { - PoolID string `json:"_key"` - Finalized bool `json:"finalized"` - ReadOnly bool `json:"readOnly"` - Size uint64 `json:"size"` - //folder string `json:"-"` + PoolID string `json:"_key"` + Finalized bool `json:"finalized"` + ReadOnly bool `json:"readOnly"` + Size uint64 `json:"size"` + LastTouchy time.Time `json:"-"` itemCount int items []string @@ -88,7 +88,7 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba // Retry Loop for Failed Connections for i := 0; i < 6; i++ { if i == 5 { - return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i) + return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i) } else if i > 0 { time.Sleep(30 * time.Second) } @@ -128,6 +128,9 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba } func InitDatabase() (err error) { arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") + if err != nil { + return err + } colChunk, err = arangoDB.Collection(arangoCTX, "chunk") if err != nil { @@ -189,6 +192,7 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) { for _, poolItem := range p.items { if poolItem == id { fmt.Printf("Fetch WORMPool %s\n", id) + p.LastTouchy = time.Now() poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id) srcLocalFile, err := os.Open(poolLocalFilePath) if err != nil { @@ -203,6 +207,10 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) { } return fmt.Errorf("%s not found", id) } +func (p *Pool) Unload() { + log.Printf("Unloading WORMPool [%s]\n", p.PoolID) + p.file.Close() +} func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { poolMaster.finalPath = finalPath @@ -213,20 +221,20 @@ func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, e destPath := filepath.Join(poolMaster.cachePath, "pool") err = os.MkdirAll(destPath, os.ModePerm) if err != nil { - return poolMaster, err + return PoolMaster{}, err } destPath = filepath.Join(poolMaster.cachePath, "worm") err = os.MkdirAll(destPath, os.ModePerm) if err != nil { - return poolMaster, err + return PoolMaster{}, err } err = os.MkdirAll(poolMaster.finalPath, os.ModePerm) if err != nil { - return poolMaster, err + return PoolMaster{}, err } - return poolMaster, nil + return } func (p *PoolMaster) NewPool() (*Pool, error) { var err error @@ -244,7 +252,6 @@ func (p *PoolMaster) NewPool() (*Pool, error) { return &pool, nil } - func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) { //fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems { @@ -536,7 +543,7 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { // p.NewPool() for _, localPool := range p.LocalPools { - if !localPool.ReadOnly && localPool.itemCount < 500 { + if !localPool.ReadOnly && localPool.itemCount < PoolMaxItems { return localPool, nil } } @@ -612,10 +619,11 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ return err } loadedWormPool := Pool{ - PoolID: dboChunk.ID, - Size: uint64(dboChunk.Size), - ReadOnly: dboChunk.ReadOnly, - Finalized: dboChunk.Finalized, + PoolID: dboChunk.ID, + Size: uint64(dboChunk.Size), + ReadOnly: dboChunk.ReadOnly, + Finalized: dboChunk.Finalized, + LastTouchy: time.Now(), filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", dboChunk.ID)), } @@ -629,6 +637,19 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ return loadedWormPool.Fetch(fileID, writer) //return nil } +func (p *PoolMaster) CleanWORMTemp() (err error) { + p.lock.Lock() + defer p.lock.Unlock() + + for _, wormPool := range p.WORMPools { + if time.Since(wormPool.LastTouchy).Minutes() > 2 { + wormPool.Unload() + delete(p.WORMPools, wormPool.PoolID) + os.RemoveAll(filepath.Join(poolMaster.cachePath, "worm", wormPool.PoolID)) + } + } + return nil +} func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { if p.CurrentPool != nil { @@ -823,6 +844,12 @@ func main() { } os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) deletedPools = append(deletedPools, fullPool.PoolID) + _, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{ + NotReady: false, + }) + if err != nil { + panic(err) + } poolMaster.lock.Unlock() } for _, deletedPoolID := range deletedPools { @@ -833,6 +860,8 @@ func main() { } } } + // + poolMaster.CleanWORMTemp() time.Sleep(time.Second * 10) } }()