From 5ca50c3d4930de04310e0fe475b65fad707c61d1 Mon Sep 17 00:00:00 2001 From: cheetah Date: Mon, 3 Jul 2023 12:19:39 -0500 Subject: [PATCH] parallelizing packing --- .gitignore | 3 +- go.sum | 2 + storageserver/storageserver.go | 210 ++++++++++++++++++++------------- 3 files changed, 130 insertions(+), 85 deletions(-) diff --git a/.gitignore b/.gitignore index 6efbedf..004aea6 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ main storageserver/storageserver gmad_linux .vscode/ -storageserver/test/ \ No newline at end of file +storageserver/test/ +zstd-tar-test/ \ No newline at end of file diff --git a/go.sum b/go.sum index 806dd6c..1dcaa4b 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/djherbis/times v1.5.0/go.mod h1:5q7FDLvbNg1L/KaBmPcWlVR9NmoKo3+ucqUA3 github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= +github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= +github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8bbnE7CX5OEgg= github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s= github.com/labstack/echo/v4 v4.10.2 h1:n1jAhnq/elIFTHr1EYpiYtyKgx4RW9ccVgkqByZaN2M= diff --git a/storageserver/storageserver.go b/storageserver/storageserver.go index 8487ceb..3f3aaf7 100644 --- a/storageserver/storageserver.go +++ b/storageserver/storageserver.go @@ -213,7 +213,7 @@ func (p *Pool) OpenTar() (err error) { func (p *Pool) Fetch(id string, writer io.Writer) (err error) { for _, poolItem := range p.items { if poolItem == id { - fmt.Printf("Fetch WORMPool %s\n", id) + //fmt.Printf("Fetch WORMPool %s\n", id) p.LastTouchy = time.Now() poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id) srcLocalFile, err := os.Open(poolLocalFilePath) @@ -797,6 +797,84 @@ func (p *PoolMaster) CleanWORMTemp() (err error) { } return nil } +func (p *PoolMaster) PackFullPools() (err error) { + if len(poolMaster.FullPools) > 0 { + poolMaster.lock.Lock() + fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n") + //poolMaster.WORMLock.Lock() + //fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n") + poolChannel := make(chan error) // Channel to receive the results + var deletedPools []string + var poolChannelCounter = 0 + for _, fullPool := range poolMaster.FullPools { + if time.Since(fullPool.LastTouchy).Minutes() < 1 { + continue + } + // start parallel + go func(fullPool *Pool) { + packResult, err := poolMaster.PackPool(fullPool.PoolID) + if err != nil { + poolChannel <- fmt.Errorf("error @PackPool: %v", err) + return + } + err = poolMaster.ImportPoolPackResult(packResult) + if err != nil { + poolChannel <- fmt.Errorf("error @ImportPoolPackResult: %v", err) + return + } + err = poolMaster.MovePoolPackToWORM(packResult) + if err != nil { + poolChannel <- fmt.Errorf("error @MovePoolPackToWORM: %v", err) + return + } + err = poolMaster.WriteRecoveryFile(packResult, fullPool) + if err != nil { + poolChannel <- fmt.Errorf("error @WriteRecoveryFile: %v", err) + return + } + os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) + deletedPools = append(deletedPools, fullPool.PoolID) + + _, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{ + NotReady: false, + Finalized: true, + ReadOnly: true, + Hash: packResult.Hash, + Size: packResult.Size, + }) + if err != nil { + poolChannel <- err + return + } + poolChannel <- nil + }(fullPool) + // increment total parallel counter + poolChannelCounter++ + } + // Waiting for them to finish + for i := 0; i < poolChannelCounter; i++ { + result := <-poolChannel + if result != nil { + fmt.Printf("PoolChannel Error: %v\n", result) + } + } + // delete pools that have successfully packed + for _, deletedPoolID := range deletedPools { + for index, fullPool := range poolMaster.FullPools { + if fullPool.PoolID == deletedPoolID { + poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index) + break + } + } + } + + //fmt.Printf("unlock WORMLock Regular FullPool Pack\n") + //poolMaster.WORMLock.Unlock() + fmt.Printf("unlock lock Regular FullPool Pack\n") + poolMaster.lock.Unlock() + } + return nil +} func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { for _, cp := range p.CurrentPool { @@ -978,98 +1056,62 @@ func main() { go func() { for { - if len(poolMaster.FullPools) > 0 { - poolMaster.lock.Lock() - fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n") - //poolMaster.WORMLock.Lock() - //fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n") - - var deletedPools []string - for _, fullPool := range poolMaster.FullPools { - if time.Since(fullPool.LastTouchy).Minutes() < 1 { - continue - } - packResult, err := poolMaster.PackPool(fullPool.PoolID) - if err != nil { - panic(fmt.Errorf("error @PackPool: %v", err)) - } - err = poolMaster.ImportPoolPackResult(packResult) - if err != nil { - panic(fmt.Errorf("error @ImportPoolPackResult: %v", err)) - } - err = poolMaster.MovePoolPackToWORM(packResult) - if err != nil { - panic(fmt.Errorf("error @MovePoolPackToWORM: %v", err)) - } - err = poolMaster.WriteRecoveryFile(packResult, fullPool) - if err != nil { - panic(fmt.Errorf("error @WriteRecoveryFile: %v", err)) - } - os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID)) - deletedPools = append(deletedPools, fullPool.PoolID) - - _, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{ - NotReady: false, - Finalized: true, - ReadOnly: true, - Hash: packResult.Hash, - Size: packResult.Size, - }) - if err != nil { - panic(err) - } - } - for _, deletedPoolID := range deletedPools { - for index, fullPool := range poolMaster.FullPools { - if fullPool.PoolID == deletedPoolID { - poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index) - break - } - } - } - - //fmt.Printf("unlock WORMLock Regular FullPool Pack\n") - //poolMaster.WORMLock.Unlock() - fmt.Printf("unlock lock Regular FullPool Pack\n") - poolMaster.lock.Unlock() - } - // + poolMaster.PackFullPools() poolMaster.CleanWORMTemp() - time.Sleep(time.Second * 10) + time.Sleep(time.Minute * 2) } }() - // Initial packing - for _, localPool := range poolMaster.LocalPools { - if localPool.ReadOnly { + { + poolChannel := make(chan error) // Channel to receive the results + var poolChannelCounter = 0 + // Initial packing + for _, localPool := range poolMaster.LocalPools { + if localPool.ReadOnly { - dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID) - if err != nil { - panic(err) - } - if !dboChunkExists { - fmt.Printf("Packing Pool %s\n", localPool.PoolID) - packResult, err := poolMaster.PackPool(localPool.PoolID) + dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID) if err != nil { panic(err) } - err = poolMaster.ImportPoolPackResult(packResult) - if err != nil { - panic(err) - } - err = poolMaster.MovePoolPackToWORM(packResult) - if err != nil { - panic(err) - } - err = poolMaster.WriteRecoveryFile(packResult, localPool) - if err != nil { - panic(err) + if !dboChunkExists { + //spawn thread + go func(localPool *Pool) { + fmt.Printf("Packing Pool %s\n", localPool.PoolID) + packResult, err := poolMaster.PackPool(localPool.PoolID) + if err != nil { + panic(err) + } + err = poolMaster.ImportPoolPackResult(packResult) + if err != nil { + panic(err) + } + err = poolMaster.MovePoolPackToWORM(packResult) + if err != nil { + panic(err) + } + err = poolMaster.WriteRecoveryFile(packResult, localPool) + if err != nil { + panic(err) + } + os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) + + poolChannel <- nil + }(localPool) + // increment total parallel counter + poolChannelCounter++ } - os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) + //os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) + } + // packResult.FileCount + } + + // Waiting for them to finish + for i := 0; i < poolChannelCounter; i++ { + result := <-poolChannel + if result != nil { + fmt.Printf("PoolChannel Error: %v\n", result) } - //os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID)) } - // packResult.FileCount } e := echo.New() e.Use(middleware.RecoverWithConfig(middleware.RecoverConfig{ @@ -1088,7 +1130,7 @@ func main() { id := c.Param("id") fmt.Printf("/check/%s checking...\n", id) exists := poolMaster.Lookup(id) - fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists)) + //fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists)) if exists { return c.JSON(http.StatusAlreadyReported, exists) } @@ -1097,7 +1139,7 @@ func main() { e.GET("/fetch/:id", func(c echo.Context) error { id := c.Param("id") - fmt.Printf("/fetch/%s fetching...\n", id) + //fmt.Printf("/fetch/%s fetching...\n", id) exists := poolMaster.Lookup(id) if exists { c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream)