parallelizing packing

master
cheetah 2 years ago
parent 0b00bac7df
commit 5ca50c3d49

3
.gitignore vendored

@ -6,4 +6,5 @@ main
storageserver/storageserver
gmad_linux
.vscode/
storageserver/test/
storageserver/test/
zstd-tar-test/

@ -11,6 +11,8 @@ github.com/djherbis/times v1.5.0/go.mod h1:5q7FDLvbNg1L/KaBmPcWlVR9NmoKo3+ucqUA3
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8bbnE7CX5OEgg=
github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s=
github.com/labstack/echo/v4 v4.10.2 h1:n1jAhnq/elIFTHr1EYpiYtyKgx4RW9ccVgkqByZaN2M=

@ -213,7 +213,7 @@ func (p *Pool) OpenTar() (err error) {
func (p *Pool) Fetch(id string, writer io.Writer) (err error) {
for _, poolItem := range p.items {
if poolItem == id {
fmt.Printf("Fetch WORMPool %s\n", id)
//fmt.Printf("Fetch WORMPool %s\n", id)
p.LastTouchy = time.Now()
poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id)
srcLocalFile, err := os.Open(poolLocalFilePath)
@ -797,6 +797,84 @@ func (p *PoolMaster) CleanWORMTemp() (err error) {
}
return nil
}
func (p *PoolMaster) PackFullPools() (err error) {
if len(poolMaster.FullPools) > 0 {
poolMaster.lock.Lock()
fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n")
//poolMaster.WORMLock.Lock()
//fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n")
poolChannel := make(chan error) // Channel to receive the results
var deletedPools []string
var poolChannelCounter = 0
for _, fullPool := range poolMaster.FullPools {
if time.Since(fullPool.LastTouchy).Minutes() < 1 {
continue
}
// start parallel
go func(fullPool *Pool) {
packResult, err := poolMaster.PackPool(fullPool.PoolID)
if err != nil {
poolChannel <- fmt.Errorf("error @PackPool: %v", err)
return
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
poolChannel <- fmt.Errorf("error @ImportPoolPackResult: %v", err)
return
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
poolChannel <- fmt.Errorf("error @MovePoolPackToWORM: %v", err)
return
}
err = poolMaster.WriteRecoveryFile(packResult, fullPool)
if err != nil {
poolChannel <- fmt.Errorf("error @WriteRecoveryFile: %v", err)
return
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
deletedPools = append(deletedPools, fullPool.PoolID)
_, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{
NotReady: false,
Finalized: true,
ReadOnly: true,
Hash: packResult.Hash,
Size: packResult.Size,
})
if err != nil {
poolChannel <- err
return
}
poolChannel <- nil
}(fullPool)
// increment total parallel counter
poolChannelCounter++
}
// Waiting for them to finish
for i := 0; i < poolChannelCounter; i++ {
result := <-poolChannel
if result != nil {
fmt.Printf("PoolChannel Error: %v\n", result)
}
}
// delete pools that have successfully packed
for _, deletedPoolID := range deletedPools {
for index, fullPool := range poolMaster.FullPools {
if fullPool.PoolID == deletedPoolID {
poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index)
break
}
}
}
//fmt.Printf("unlock WORMLock Regular FullPool Pack\n")
//poolMaster.WORMLock.Unlock()
fmt.Printf("unlock lock Regular FullPool Pack\n")
poolMaster.lock.Unlock()
}
return nil
}
func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
for _, cp := range p.CurrentPool {
@ -978,98 +1056,62 @@ func main() {
go func() {
for {
if len(poolMaster.FullPools) > 0 {
poolMaster.lock.Lock()
fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n")
//poolMaster.WORMLock.Lock()
//fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n")
var deletedPools []string
for _, fullPool := range poolMaster.FullPools {
if time.Since(fullPool.LastTouchy).Minutes() < 1 {
continue
}
packResult, err := poolMaster.PackPool(fullPool.PoolID)
if err != nil {
panic(fmt.Errorf("error @PackPool: %v", err))
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
panic(fmt.Errorf("error @ImportPoolPackResult: %v", err))
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
panic(fmt.Errorf("error @MovePoolPackToWORM: %v", err))
}
err = poolMaster.WriteRecoveryFile(packResult, fullPool)
if err != nil {
panic(fmt.Errorf("error @WriteRecoveryFile: %v", err))
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
deletedPools = append(deletedPools, fullPool.PoolID)
_, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{
NotReady: false,
Finalized: true,
ReadOnly: true,
Hash: packResult.Hash,
Size: packResult.Size,
})
if err != nil {
panic(err)
}
}
for _, deletedPoolID := range deletedPools {
for index, fullPool := range poolMaster.FullPools {
if fullPool.PoolID == deletedPoolID {
poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index)
break
}
}
}
//fmt.Printf("unlock WORMLock Regular FullPool Pack\n")
//poolMaster.WORMLock.Unlock()
fmt.Printf("unlock lock Regular FullPool Pack\n")
poolMaster.lock.Unlock()
}
//
poolMaster.PackFullPools()
poolMaster.CleanWORMTemp()
time.Sleep(time.Second * 10)
time.Sleep(time.Minute * 2)
}
}()
// Initial packing
for _, localPool := range poolMaster.LocalPools {
if localPool.ReadOnly {
{
poolChannel := make(chan error) // Channel to receive the results
var poolChannelCounter = 0
// Initial packing
for _, localPool := range poolMaster.LocalPools {
if localPool.ReadOnly {
dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID)
if err != nil {
panic(err)
}
if !dboChunkExists {
fmt.Printf("Packing Pool %s\n", localPool.PoolID)
packResult, err := poolMaster.PackPool(localPool.PoolID)
dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID)
if err != nil {
panic(err)
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
panic(err)
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
panic(err)
}
err = poolMaster.WriteRecoveryFile(packResult, localPool)
if err != nil {
panic(err)
if !dboChunkExists {
//spawn thread
go func(localPool *Pool) {
fmt.Printf("Packing Pool %s\n", localPool.PoolID)
packResult, err := poolMaster.PackPool(localPool.PoolID)
if err != nil {
panic(err)
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
panic(err)
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
panic(err)
}
err = poolMaster.WriteRecoveryFile(packResult, localPool)
if err != nil {
panic(err)
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
poolChannel <- nil
}(localPool)
// increment total parallel counter
poolChannelCounter++
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
}
// packResult.FileCount
}
// Waiting for them to finish
for i := 0; i < poolChannelCounter; i++ {
result := <-poolChannel
if result != nil {
fmt.Printf("PoolChannel Error: %v\n", result)
}
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
}
// packResult.FileCount
}
e := echo.New()
e.Use(middleware.RecoverWithConfig(middleware.RecoverConfig{
@ -1088,7 +1130,7 @@ func main() {
id := c.Param("id")
fmt.Printf("/check/%s checking...\n", id)
exists := poolMaster.Lookup(id)
fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists))
//fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists))
if exists {
return c.JSON(http.StatusAlreadyReported, exists)
}
@ -1097,7 +1139,7 @@ func main() {
e.GET("/fetch/:id", func(c echo.Context) error {
id := c.Param("id")
fmt.Printf("/fetch/%s fetching...\n", id)
//fmt.Printf("/fetch/%s fetching...\n", id)
exists := poolMaster.Lookup(id)
if exists {
c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream)

Loading…
Cancel
Save