|
|
|
@ -5,6 +5,7 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"crypto/sha256"
|
|
|
|
|
"crypto/tls"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
@ -32,6 +33,17 @@ var (
|
|
|
|
|
PoolPathTemp = "/mnt/ramfs/"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// TODO: write Recovery Data after Packing
|
|
|
|
|
type PoolRecoveryData struct {
|
|
|
|
|
PoolID string `json:"_key"`
|
|
|
|
|
Size uint64 `json:"size"`
|
|
|
|
|
Created time.Time `json:"date"`
|
|
|
|
|
Hash string `json:"hash"`
|
|
|
|
|
|
|
|
|
|
ItemCount int `json:"itemCount"`
|
|
|
|
|
Items []string `json:"items"`
|
|
|
|
|
RecoveryData []common.DB_File `json:"recoveryData"`
|
|
|
|
|
}
|
|
|
|
|
type Pool struct {
|
|
|
|
|
PoolID string `json:"_key"`
|
|
|
|
|
Finalized bool `json:"finalized"`
|
|
|
|
@ -58,6 +70,7 @@ type PoolMaster struct {
|
|
|
|
|
finalPath string
|
|
|
|
|
CurrentPool *Pool
|
|
|
|
|
lock sync.Mutex
|
|
|
|
|
WORMLock sync.Mutex
|
|
|
|
|
|
|
|
|
|
LocalPools []*Pool
|
|
|
|
|
FullPools []*Pool
|
|
|
|
@ -442,17 +455,52 @@ func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) {
|
|
|
|
|
fmt.Printf("MoveWORM Duration %dms\n", time.Since(startTime).Milliseconds())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PoolMaster) WriteRecoveryFile(packResult PoolPackResult, pool *Pool) (err error) {
|
|
|
|
|
fileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.json", packResult.PoolID))
|
|
|
|
|
|
|
|
|
|
recoveryFile, err := os.Create(fileName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer recoveryFile.Close()
|
|
|
|
|
|
|
|
|
|
poolRecoveryData := PoolRecoveryData{
|
|
|
|
|
PoolID: packResult.PoolID,
|
|
|
|
|
Size: uint64(packResult.Size),
|
|
|
|
|
Created: time.Now(),
|
|
|
|
|
Hash: packResult.Hash,
|
|
|
|
|
ItemCount: pool.itemCount,
|
|
|
|
|
Items: pool.items,
|
|
|
|
|
//RecoveryData,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//TODO: fetch RecoveryData from DB
|
|
|
|
|
poolRecoveryData.RecoveryData = make([]common.DB_File, len(pool.items))
|
|
|
|
|
_, _, err = colFile.ReadDocuments(arangoCTX, pool.items, poolRecoveryData.RecoveryData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error @ReadDocuments %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
json, err := json.MarshalIndent(poolRecoveryData, "", "\t")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error @json.MarshalIndent %v", err)
|
|
|
|
|
}
|
|
|
|
|
_, err = recoveryFile.Write(json)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error @recoveryFile.Write %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Only call this in a locked Sate
|
|
|
|
|
*/
|
|
|
|
|
func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err error) {
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
packResult.PoolID = poolID
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
fmt.Printf("Aquiring Lock for PackPool(%s)\n", poolID)
|
|
|
|
|
p.lock.Lock()
|
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
|
defer fmt.Printf("unlock PackPool\n")
|
|
|
|
|
fmt.Printf("Aquired Lock success for PackPool(%s)\n", poolID)
|
|
|
|
|
*/
|
|
|
|
|
packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID))
|
|
|
|
|
tarFile, err := os.Create(packResult.outputFileName)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -528,6 +576,9 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
|
|
|
|
|
packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil))
|
|
|
|
|
fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash)
|
|
|
|
|
|
|
|
|
|
// TODO: write index json describing the files inside, pure hash list, aswell
|
|
|
|
|
// as dictionary containing all the infos for restore/disaster-recovery into Arango
|
|
|
|
|
|
|
|
|
|
packFileStats, err := tarFileCheck.Stat()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return packResult, err
|
|
|
|
@ -552,7 +603,6 @@ func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PoolMaster) Lookup(id string) (exists bool) {
|
|
|
|
|
// TODO: DB check
|
|
|
|
|
if p.CurrentPool != nil { // CurrentPool
|
|
|
|
|
for _, poolItem := range p.CurrentPool.items {
|
|
|
|
|
if poolItem == id {
|
|
|
|
@ -560,6 +610,8 @@ func (p *PoolMaster) Lookup(id string) (exists bool) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
p.WORMLock.Lock()
|
|
|
|
|
defer p.WORMLock.Unlock()
|
|
|
|
|
for _, wormPool := range p.WORMPools { // WORM Pools
|
|
|
|
|
for _, poolItem := range wormPool.items {
|
|
|
|
|
if poolItem == id {
|
|
|
|
@ -581,8 +633,6 @@ func (p *PoolMaster) Lookup(id string) (exists bool) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// TODO : DB Check
|
|
|
|
|
// ArangoDB
|
|
|
|
|
dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false
|
|
|
|
@ -608,6 +658,15 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
|
|
|
|
|
// else load wormPool into disk-cache extract to "worm"
|
|
|
|
|
// wormMode
|
|
|
|
|
|
|
|
|
|
// TODO: every Method here should have locked the WORMLock!!
|
|
|
|
|
/*
|
|
|
|
|
fmt.Printf("Aquiring WORMLock for FetchLoadWORM\n")
|
|
|
|
|
p.WORMLock.Lock()
|
|
|
|
|
defer p.WORMLock.Unlock()
|
|
|
|
|
defer fmt.Printf("unlock WORMLock FetchLoadWORM\n")
|
|
|
|
|
fmt.Printf("Aquired WORMLock success for FetchLoadWORM\n")
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
fmt.Printf("Aquiring Lock for FetchLoadWORM\n")
|
|
|
|
|
p.lock.Lock()
|
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
@ -640,6 +699,8 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
|
|
|
|
|
}
|
|
|
|
|
func (p *PoolMaster) CleanWORMTemp() (err error) {
|
|
|
|
|
p.lock.Lock()
|
|
|
|
|
p.WORMLock.Lock()
|
|
|
|
|
defer p.WORMLock.Unlock()
|
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
|
|
|
|
|
|
for _, wormPool := range p.WORMPools {
|
|
|
|
@ -674,6 +735,8 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
p.WORMLock.Lock()
|
|
|
|
|
defer p.WORMLock.Unlock()
|
|
|
|
|
for _, wormPool := range p.WORMPools {
|
|
|
|
|
for _, poolItem := range wormPool.items {
|
|
|
|
|
if poolItem == id {
|
|
|
|
@ -794,7 +857,7 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro
|
|
|
|
|
newItemCount := 0
|
|
|
|
|
for _, e := range entries {
|
|
|
|
|
if !e.IsDir() {
|
|
|
|
|
pool.items = append(pool.items, e.Name())
|
|
|
|
|
//pool.items = append(pool.items, e.Name())
|
|
|
|
|
newItemCount++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -827,39 +890,51 @@ func main() {
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
var deletedPools []string
|
|
|
|
|
for _, fullPool := range poolMaster.FullPools {
|
|
|
|
|
if len(poolMaster.FullPools) > 0 {
|
|
|
|
|
poolMaster.lock.Lock()
|
|
|
|
|
|
|
|
|
|
packResult, err := poolMaster.PackPool(fullPool.PoolID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
err = poolMaster.ImportPoolPackResult(packResult)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
err = poolMaster.MovePoolPackToWORM(packResult)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
|
|
|
|
|
deletedPools = append(deletedPools, fullPool.PoolID)
|
|
|
|
|
_, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{
|
|
|
|
|
NotReady: false,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n")
|
|
|
|
|
poolMaster.WORMLock.Lock()
|
|
|
|
|
fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n")
|
|
|
|
|
|
|
|
|
|
var deletedPools []string
|
|
|
|
|
for _, fullPool := range poolMaster.FullPools {
|
|
|
|
|
packResult, err := poolMaster.PackPool(fullPool.PoolID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
err = poolMaster.ImportPoolPackResult(packResult)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
err = poolMaster.MovePoolPackToWORM(packResult)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
err = poolMaster.WriteRecoveryFile(packResult, fullPool)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
|
|
|
|
|
deletedPools = append(deletedPools, fullPool.PoolID)
|
|
|
|
|
_, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{
|
|
|
|
|
NotReady: false,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
poolMaster.lock.Unlock()
|
|
|
|
|
}
|
|
|
|
|
for _, deletedPoolID := range deletedPools {
|
|
|
|
|
for index, fullPool := range poolMaster.FullPools {
|
|
|
|
|
if fullPool.PoolID == deletedPoolID {
|
|
|
|
|
poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index)
|
|
|
|
|
break
|
|
|
|
|
for _, deletedPoolID := range deletedPools {
|
|
|
|
|
for index, fullPool := range poolMaster.FullPools {
|
|
|
|
|
if fullPool.PoolID == deletedPoolID {
|
|
|
|
|
poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
poolMaster.lock.Unlock()
|
|
|
|
|
poolMaster.WORMLock.Unlock()
|
|
|
|
|
fmt.Printf("unlock WORMLock Regular FullPool Pack\n")
|
|
|
|
|
}
|
|
|
|
|
//
|
|
|
|
|
poolMaster.CleanWORMTemp()
|
|
|
|
@ -905,7 +980,9 @@ func main() {
|
|
|
|
|
|
|
|
|
|
e.GET("/check/:id", func(c echo.Context) error {
|
|
|
|
|
id := c.Param("id")
|
|
|
|
|
fmt.Printf("/check/%s checking...\n", id)
|
|
|
|
|
exists := poolMaster.Lookup(id)
|
|
|
|
|
fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists))
|
|
|
|
|
if exists {
|
|
|
|
|
return c.JSON(http.StatusAlreadyReported, exists)
|
|
|
|
|
}
|
|
|
|
@ -914,6 +991,7 @@ func main() {
|
|
|
|
|
|
|
|
|
|
e.GET("/fetch/:id", func(c echo.Context) error {
|
|
|
|
|
id := c.Param("id")
|
|
|
|
|
fmt.Printf("/fetch/%s fetching...\n", id)
|
|
|
|
|
exists := poolMaster.Lookup(id)
|
|
|
|
|
if exists {
|
|
|
|
|
c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream)
|
|
|
|
@ -928,9 +1006,7 @@ func main() {
|
|
|
|
|
fmt.Printf("/fetch/%s does not exist\n", id)
|
|
|
|
|
return c.String(http.StatusNotFound, "Not Found")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
//return c.Stream(200, "application/x-octet-stream", nil)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
e.POST("/stash/:id/:size", func(c echo.Context) error {
|
|
|
|
@ -941,7 +1017,13 @@ func main() {
|
|
|
|
|
fmt.Println(err)
|
|
|
|
|
return c.String(http.StatusExpectationFailed, "Error")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
infoJSON := c.FormValue("info")
|
|
|
|
|
fileInfo := common.DB_File{}
|
|
|
|
|
err = json.Unmarshal([]byte(infoJSON), &fileInfo)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println(err)
|
|
|
|
|
return c.String(http.StatusBadRequest, "Error")
|
|
|
|
|
}
|
|
|
|
|
exists := poolMaster.Lookup(id)
|
|
|
|
|
if exists {
|
|
|
|
|
fmt.Printf("/stash/%s exists already\n", id)
|
|
|
|
@ -981,6 +1063,12 @@ func main() {
|
|
|
|
|
return c.String(http.StatusExpectationFailed, "Error")
|
|
|
|
|
}
|
|
|
|
|
fmt.Println("...stashed")
|
|
|
|
|
fmt.Println(fileInfo)
|
|
|
|
|
_, err = colFile.CreateDocument(arangoCTX, fileInfo)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println(err)
|
|
|
|
|
return c.String(http.StatusExpectationFailed, "Error")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c.JSON(http.StatusOK, true)
|
|
|
|
|
})
|
|
|
|
|