You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1265 lines
34 KiB
Go

package main
import (
"archive/tar"
"context"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/common"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/klauspost/compress/zstd"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/twinj/uuid"
)
var (
FastCacheEnabled = true
FastCachePath = "/zpool0/cheetah/workshop/garrysmod/gma-inator/cache"
WORMCachePath = FastCachePath
PoolMaxItems = 500
PoolPathFinal = "/zpool0/cheetah/workshop/garrysmod/gma-inator/chunks/"
PoolPathTemp = "/zpool0/cheetah/workshop/garrysmod/gma-inator/temp"
)
type PoolRecoveryData struct {
PoolID string `json:"_key"`
Size uint64 `json:"size"`
Created time.Time `json:"date"`
Hash string `json:"hash"`
ItemCount int `json:"itemCount"`
Items []string `json:"items"`
RecoveryData []common.DB_File `json:"recoveryData"`
}
type Pool struct {
PoolID string `json:"_key"`
Finalized bool `json:"finalized"`
ReadOnly bool `json:"readOnly"`
Size uint64 `json:"size"`
LastTouchy time.Time `json:"-"`
itemCount int
items []string
wormMode bool
filePath string
file *os.File
//tarWriter *tar.Writer
tarReader *tar.Reader
}
type PoolFile struct {
FileID string
Size uint64
}
type PoolMaster struct {
cachePath string
finalPath string
CurrentPool map[string]*Pool
lock sync.Mutex
WORMLock sync.Mutex
LocalPools []*Pool
FullPools []*Pool
WORMPools map[string]*Pool
}
type PoolPackResult struct {
PoolID string
Files []string
FileCount int
Size int64
Hash string
outputFileName string
}
var (
arangoDB adriver.Database
arangoCTX context.Context
colChunk adriver.Collection
colFile adriver.Collection
colFile2Chunk adriver.Collection
// PoolMaster
poolMaster PoolMaster
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
log.Println("connectDB:", "Starting Connection Process...")
// Retry Loop for Failed Connections
for i := 0; i < 6; i++ {
if i == 5 {
return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i)
} else if i > 0 {
time.Sleep(30 * time.Second)
}
// Connect to ArangoDB URL
conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{
Endpoints: []string{baseURL},
TLSConfig: &tls.Config{ /*...*/ },
})
if err != nil {
log.Println("connectDB:", "Cannot Connect to ArangoDB!", err)
continue
}
// Connect Driver to User
client, err := adriver.NewClient(adriver.ClientConfig{
Connection: conn,
Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD),
})
if err != nil {
log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err)
continue
}
// Create Context for Database Access
ctx = context.Background()
driver, err = client.Database(ctx, arangoDatabase)
if err != nil {
log.Println("connectDB:", "Cannot Load ArangoDB Database!", err)
continue
}
log.Println("connectDB:", "Connection Sucessful!")
return driver, ctx, nil
}
return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?")
}
func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB("http://192.168.133.6:8529", "gma-inator", "gma-inator", "gma-inator")
if err != nil {
return err
}
colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
if err != nil {
return err
}
colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil {
return err
}
colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map")
if err != nil {
return err
}
return nil
}
func (p *Pool) OpenTar() (err error) {
p.wormMode = true
outputDir := filepath.Join(WORMCachePath, "worm", p.PoolID)
err = os.MkdirAll(outputDir, os.ModePerm)
if err != nil {
return err
}
p.file, err = os.Open(p.filePath)
if err != nil {
return err
}
p.items = []string{}
decompressor, err := zstd.NewReader(p.file, zstd.WithDecoderConcurrency(8))
if err != nil {
panic(err)
}
defer decompressor.Close()
p.tarReader = tar.NewReader(decompressor)
for {
header, err := p.tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
path := filepath.Join(outputDir, header.Name)
info := header.FileInfo()
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode())
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, p.tarReader)
if err != nil {
return err
}
p.items = append(p.items, header.Name)
fmt.Print(".")
}
return nil
}
func (p *Pool) Fetch(id string, writer io.Writer) (err error) {
for _, poolItem := range p.items {
if poolItem == id {
//fmt.Printf("Fetch WORMPool %s\n", id)
p.LastTouchy = time.Now()
poolLocalFilePath := filepath.Join(WORMCachePath, "worm", p.PoolID, id)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
defer srcLocalFile.Close()
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
return fmt.Errorf("%s not found", id)
}
func (p *Pool) Unload() {
log.Printf("Unloading WORMPool [%s]\n", p.PoolID)
p.file.Close()
}
func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) {
poolMaster.finalPath = finalPath
poolMaster.cachePath = cachePath
poolMaster.CurrentPool = make(map[string]*Pool)
poolMaster.WORMPools = make(map[string]*Pool)
//poolMaster.lock = sync.Mutex{}
destPath := filepath.Join(poolMaster.cachePath, "pool")
err = os.MkdirAll(destPath, os.ModePerm)
if err != nil {
return PoolMaster{}, err
}
destPath = filepath.Join(poolMaster.cachePath, "worm")
err = os.MkdirAll(destPath, os.ModePerm)
if err != nil {
return PoolMaster{}, err
}
err = os.MkdirAll(poolMaster.finalPath, os.ModePerm)
if err != nil {
return PoolMaster{}, err
}
return
}
func (p *PoolMaster) NewPool() (*Pool, error) {
var err error
pool := Pool{}
pool.PoolID = uuid.NewV4().String()
pool.Finalized = false
pool.ReadOnly = false
//TODO : Sync to DB
destPath := filepath.Join(p.cachePath, "pool", pool.PoolID)
//dir := filepath.Dir(destPath)
err = os.MkdirAll(destPath, os.ModePerm)
if err != nil {
return &pool, err
}
return &pool, nil
}
func (p *PoolMaster) GetCurrentWriteablePool(workerID string) (pool *Pool, err error) {
//fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
if p.CurrentPool[workerID] != nil && p.CurrentPool[workerID].itemCount >= PoolMaxItems {
fmt.Printf("Aquiring Lock for GetCurrentWriteablepool\n")
p.lock.Lock()
defer p.lock.Unlock()
defer fmt.Printf("unlock GetCurrentWriteablepool\n")
fmt.Printf("Aquired Lock success for GetCurrentWriteablepool\n")
p.CurrentPool[workerID].ReadOnly = true
p.CurrentPool[workerID].LastTouchy = time.Now()
p.FullPools = append(p.FullPools, p.CurrentPool[workerID])
// queue for compression
fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool[workerID].PoolID, p.CurrentPool[workerID].itemCount)
p.CurrentPool[workerID] = nil
}
if p.CurrentPool[workerID] == nil {
fmt.Printf("Creating new Pool")
p.CurrentPool[workerID], err = p.AcquireNewOrRecoverPool()
err := os.WriteFile(filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", p.CurrentPool[workerID].PoolID)), []byte(workerID), os.ModePerm)
if err != nil {
return pool, err
}
fmt.Printf("... got [%s]\n", p.CurrentPool[workerID].PoolID)
}
return p.CurrentPool[workerID], nil
}
func RestorePoolFromFolder(folderPath string) (pool *Pool, err error) {
pool = &Pool{}
pool.PoolID = path.Base(folderPath)
entries, err := os.ReadDir(folderPath)
if err != nil {
return pool, err
}
for _, e := range entries {
if !e.IsDir() {
pool.items = append(pool.items, e.Name())
pool.itemCount++
}
}
pool.ReadOnly = pool.itemCount >= PoolMaxItems
pool.Finalized = false // we are still local
return pool, err
}
func (p *PoolMaster) ScanForLocalPools() (err error) {
fmt.Printf("Aquiring Lock for ScanForLocalPools\n")
p.lock.Lock()
defer p.lock.Unlock()
defer fmt.Printf("unlock ScanForLocalPools\n")
fmt.Printf("Aquired Lock success for ScanForLocalPools\n")
entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool"))
if err != nil {
return err
}
for _, e := range entries {
if e.IsDir() {
fmt.Printf("Scanning For Local Pools, found %s:", e.Name())
tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", e.Name()))
_, err = os.Stat(tarFinalPath)
finalPathExists := false
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
}
dboChunkExists, err := colChunk.DocumentExists(arangoCTX, e.Name())
if err != nil {
return err
}
if dboChunkExists {
var dboChunk common.DB_Chunk
_, err := colChunk.ReadDocument(arangoCTX, e.Name(), &dboChunk)
if err != nil {
return err
}
finalPathExists = dboChunk.Finalized && dboChunk.ReadOnly && !dboChunk.NotReady
fmt.Printf("is in DB readonly %v finalized %v notready %v itemCount=%d size=%d hash=%s\n", dboChunk.ReadOnly, dboChunk.Finalized, dboChunk.NotReady, dboChunk.FileCount, dboChunk.Size, dboChunk.Hash)
if finalPathExists {
fmt.Println("skipping")
}
}
if finalPathExists {
continue
}
poolDirPath := filepath.Join(p.cachePath, "pool", e.Name())
restoredPool, err := RestorePoolFromFolder(poolDirPath)
if err != nil {
return err
}
workerCacheFileName := filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.worker", e.Name()))
fmt.Printf("is readonly %v itemCount=%d\n", restoredPool.ReadOnly, restoredPool.itemCount)
if restoredPool.itemCount == 500 {
p.LocalPools = append(p.LocalPools, restoredPool)
} else {
_, err = os.Stat(workerCacheFileName) //if we have a worker assingment file
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
// if not exists
p.LocalPools = append(p.LocalPools, restoredPool)
} else {
workerBytes, err := os.ReadFile(workerCacheFileName)
if err != nil {
return err
}
p.CurrentPool[string(workerBytes)] = restoredPool
fmt.Println(string(workerBytes))
}
}
}
}
return nil
}
// Pool Packing
func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error) {
startTime := time.Now()
dboChunk := common.DB_Chunk{
ID: packResult.PoolID,
Hash: packResult.Hash,
Size: packResult.Size,
FileCount: packResult.FileCount,
Created: time.Now(),
NotReady: true,
ReadOnly: true,
Finalized: true,
}
var dboChunk2File []common.DB_File2Chunk
for _, prFile := range packResult.Files {
dboChunk2File = append(dboChunk2File, common.DB_File2Chunk{
ID: prFile,
File: fmt.Sprintf("file/%s", prFile),
Chunk: fmt.Sprintf("chunk/%s", dboChunk.ID),
})
}
_, err = colChunk.CreateDocument(arangoCTX, dboChunk)
if err != nil {
return err
}
chunkSize := 500
for {
if len(dboChunk2File) == 0 {
break
}
// necessary check to avoid slicing beyond
// slice capacity
if len(dboChunk2File) < chunkSize {
chunkSize = len(dboChunk2File)
}
_, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize])
//metaSlice, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize])
//fmt.Println("Metaslice")
//fmt.Println(metaSlice)
/*for _, meta := range metaSlice {
if !meta.ID.IsEmpty() {
newUnknownFiles = append(newUnknownFiles, meta.Key)
fileIDs = append(fileIDs, meta.Key)
}
}*/
for _, createError := range errorSlice {
if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") {
} else if createError != nil {
return createError
}
}
dboChunk2File = dboChunk2File[chunkSize:]
}
fmt.Printf("ImportPool Duration %dms\n", time.Since(startTime).Milliseconds())
return nil
}
func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) {
startTime := time.Now()
targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", packResult.PoolID))
err = common.MoveFile(packResult.outputFileName, targetFileName)
if err != nil {
return err
}
tarFileCheck, err := os.Open(targetFileName)
if err != nil {
return err
}
defer tarFileCheck.Close()
shaHasher := sha256.New()
_, err = io.Copy(shaHasher, tarFileCheck)
if err != nil {
return err
}
wormHash := fmt.Sprintf("%x", shaHasher.Sum(nil))
fmt.Printf("WORMTarPool hash is %s , old is %s\n", wormHash, packResult.Hash)
if wormHash != packResult.Hash {
os.Remove(targetFileName)
return err
}
fmt.Printf("MoveWORM Duration %dms\n", time.Since(startTime).Milliseconds())
return nil
}
func (p *PoolMaster) WriteRecoveryFile(packResult PoolPackResult, pool *Pool) (err error) {
fileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.json", packResult.PoolID))
recoveryFile, err := os.Create(fileName)
if err != nil {
return err
}
defer recoveryFile.Close()
poolRecoveryData := PoolRecoveryData{
PoolID: packResult.PoolID,
Size: uint64(packResult.Size),
Created: time.Now(),
Hash: packResult.Hash,
ItemCount: pool.itemCount,
Items: pool.items,
//RecoveryData,
}
//TODO: fetch RecoveryData from DB
poolRecoveryData.RecoveryData = make([]common.DB_File, len(pool.items))
_, _, err = colFile.ReadDocuments(arangoCTX, pool.items, poolRecoveryData.RecoveryData)
if err != nil {
return fmt.Errorf("error @ReadDocuments %v", err)
}
json, err := json.MarshalIndent(poolRecoveryData, "", "\t")
if err != nil {
return fmt.Errorf("error @json.MarshalIndent %v", err)
}
_, err = recoveryFile.Write(json)
if err != nil {
return fmt.Errorf("error @recoveryFile.Write %v", err)
}
return nil
}
/*
* Only call this in a locked Sate
*/
func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err error) {
startTime := time.Now()
packResult.PoolID = poolID
packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar.zst", poolID))
tarFile, err := os.Create(packResult.outputFileName)
if err != nil {
return packResult, fmt.Errorf("os.Create: %v", err)
}
defer tarFile.Close()
compressor, err := zstd.NewWriter(tarFile, zstd.WithEncoderLevel(4))
if err != nil {
return packResult, fmt.Errorf("zstd.NewWriter: %v", err)
}
defer compressor.Close()
tw := tar.NewWriter(compressor)
defer tw.Close()
entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool", poolID))
if err != nil {
return packResult, fmt.Errorf("os.ReadDir: %v", err)
}
//fmt.Printf("len(entries) == %d\n", len(entries))
if len(entries) != PoolMaxItems {
return packResult, fmt.Errorf("Pool contains %d items, but there should be %d", len(entries), PoolMaxItems)
}
for _, e := range entries {
originalPath := filepath.Join(p.cachePath, "pool", poolID, e.Name())
file, err := os.Open(originalPath)
if err != nil {
return packResult, fmt.Errorf("for os.Open: %v", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return packResult, fmt.Errorf("for fs.Stat: %v", err)
}
tarFileHeader, err := tar.FileInfoHeader(info, info.Name())
if err != nil {
return packResult, err
}
err = tw.WriteHeader(tarFileHeader)
if err != nil {
return packResult, err
}
_, err = io.Copy(tw, file)
if err != nil {
return packResult, err
}
packResult.FileCount++
packResult.Files = append(packResult.Files, e.Name())
fmt.Printf("*")
}
err = tw.Close()
if err != nil {
return packResult, err
}
err = compressor.Close()
if err != nil {
return packResult, err
}
err = tarFile.Close()
if err != nil {
return packResult, err
}
// re-open and check
{
tarFileCheck, err := os.Open(packResult.outputFileName)
if err != nil {
return packResult, err
}
defer tarFileCheck.Close()
shaHasher := sha256.New()
hashedBytes, err := io.Copy(shaHasher, tarFileCheck)
if err != nil {
return packResult, err
}
packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil))
fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash)
tarFileCheck.Seek(0, 0)
// validate written tar-chunk
decompressor, err := zstd.NewReader(tarFileCheck, zstd.WithDecoderConcurrency(8))
if err != nil {
panic(err)
}
defer decompressor.Close()
tarFileCheckReader := tar.NewReader(decompressor)
//filenamesReadBackList := []string{}
for {
header, err := tarFileCheckReader.Next()
//header.PAXRecords
if err == io.EOF {
break
}
if err != nil {
return packResult, err
}
hasher := sha256.New()
hashedBytes, err := io.Copy(hasher, tarFileCheckReader)
if err != nil {
return packResult, err
}
readBackChecksum := fmt.Sprintf("%x", hasher.Sum(nil))
if hashedBytes != header.Size {
return packResult, fmt.Errorf("validation on output archive, incorrect size file %s has %d should be %d", header.Name, hashedBytes, header.Size)
}
if header.Name != readBackChecksum {
return packResult, fmt.Errorf("validation on output archive, incorrect checksum file %s has %s", header.Name, readBackChecksum)
}
//filenamesReadBackList = append(filenamesReadBackList, header.Name)
}
packFileStats, err := tarFileCheck.Stat()
if err != nil {
return packResult, err
}
packResult.Size = packFileStats.Size()
if hashedBytes != packResult.Size {
return packResult, fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, packResult.Size)
}
fmt.Printf("PackPool Duration %dms\n", time.Since(startTime).Milliseconds())
}
// TODO: write index json describing the files inside, pure hash list, aswell
// as dictionary containing all the infos for restore/disaster-recovery into Arango
return packResult, nil
}
func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
// p.NewPool()
for _, localPool := range p.LocalPools {
if !localPool.ReadOnly && localPool.itemCount < PoolMaxItems {
return localPool, nil
}
}
return p.NewPool()
}
func (p *PoolMaster) Lookup(id string) (exists bool) {
p.lock.Lock()
defer p.lock.Unlock()
for _, cp := range p.CurrentPool {
if cp != nil { // CurrentPool
for _, poolItem := range cp.items {
if poolItem == id {
return true
}
}
}
}
for _, wormPool := range p.WORMPools { // WORM Pools
for _, poolItem := range wormPool.items {
if poolItem == id {
return true
}
}
}
for _, fullPool := range p.FullPools { // Full Pools
for _, poolItem := range fullPool.items {
if poolItem == id {
return true
}
}
}
for _, localPool := range p.LocalPools { // Local Pools
for _, poolItem := range localPool.items {
if poolItem == id {
return true
}
}
}
dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id)
if err != nil {
return false
}
return dboFile2ChunkExists
}
func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writer) (err error) {
fmt.Printf("FetchLoadWORM(chunkID %s, fileID %s, ...)\n", chunkID, fileID)
// search within loaded worm-pools
//fmt.Println("WormPool For Start")
for wormID, wormPool := range p.WORMPools {
//fmt.Printf("WORMPool[%s] for-iter\n", wormID)
if wormID != chunkID {
continue
}
for _, poolItem := range wormPool.items {
if poolItem == fileID {
//fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID)
return wormPool.Fetch(fileID, writer)
}
}
break
}
// else load wormPool into disk-cache extract to "worm"
// wormMode
// TODO: every Method here should have locked the WORMLock!!
/*
fmt.Printf("Aquiring WORMLock for FetchLoadWORM\n")
p.WORMLock.Lock()
defer p.WORMLock.Unlock()
0 defer fmt.Printf("unlock WORMLock FetchLoadWORM\n")
fmt.Printf("Aquired WORMLock success for FetchLoadWORM\n")
*/
fmt.Printf("Aquiring Lock for FetchLoadWORM\n")
p.lock.Lock()
defer p.lock.Unlock()
defer fmt.Printf("unlock FetchLoadWORM\n")
fmt.Printf("Aquired Lock success for FetchLoadWORM\n")
var dboChunk common.DB_Chunk
_, err = colChunk.ReadDocument(arangoCTX, chunkID, &dboChunk)
if err != nil {
return err
}
loadedWormPool := Pool{
PoolID: dboChunk.ID,
Size: uint64(dboChunk.Size),
ReadOnly: dboChunk.ReadOnly,
Finalized: dboChunk.Finalized,
LastTouchy: time.Now(),
filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar.zst", dboChunk.ID)),
}
fmt.Printf("initialized loadedWormPool (%s), Opening tar...\n", dboChunk.ID)
err = loadedWormPool.OpenTar()
if err != nil {
fmt.Println(err)
return err
}
fmt.Printf("extracted and key = %s\n", loadedWormPool.PoolID)
p.WORMPools[loadedWormPool.PoolID] = &loadedWormPool
return loadedWormPool.Fetch(fileID, writer)
//return nil
}
func (p *PoolMaster) CleanWORMTemp() (err error) {
p.lock.Lock()
//p.WORMLock.Lock()
//defer p.WORMLock.Unlock()
defer p.lock.Unlock()
for _, wormPool := range p.WORMPools {
if time.Since(wormPool.LastTouchy).Minutes() > 4 {
wormPool.Unload()
delete(p.WORMPools, wormPool.PoolID)
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "worm", wormPool.PoolID))
}
}
return nil
}
func (p *PoolMaster) PackFullPools() (err error) {
if len(poolMaster.FullPools) >= 16 {
poolMaster.lock.Lock()
fmt.Printf("Aquiring WORMLock for Regular FullPool Pack\n")
//poolMaster.WORMLock.Lock()
//fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n")
poolChannel := make(chan error) // Channel to receive the results
var deletedPools []string
var poolChannelCounter = 0
for _, fullPool := range poolMaster.FullPools {
if time.Since(fullPool.LastTouchy).Minutes() < 5 {
continue
}
// start parallel
go func(fullPool *Pool) {
packResult, err := poolMaster.PackPool(fullPool.PoolID)
if err != nil {
poolChannel <- fmt.Errorf("error @PackPool: %v", err)
return
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
poolChannel <- fmt.Errorf("error @ImportPoolPackResult: %v", err)
return
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
poolChannel <- fmt.Errorf("error @MovePoolPackToWORM: %v", err)
return
}
err = poolMaster.WriteRecoveryFile(packResult, fullPool)
if err != nil {
poolChannel <- fmt.Errorf("error @WriteRecoveryFile: %v", err)
return
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
deletedPools = append(deletedPools, fullPool.PoolID)
_, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{
NotReady: false,
Finalized: true,
ReadOnly: true,
Hash: packResult.Hash,
Size: packResult.Size,
})
if err != nil {
poolChannel <- err
return
}
poolChannel <- nil
}(fullPool)
// increment total parallel counter
poolChannelCounter++
}
// Waiting for them to finish
for i := 0; i < poolChannelCounter; i++ {
result := <-poolChannel
if result != nil {
fmt.Printf("PoolChannel Error: %v\n", result)
}
}
// delete pools that have successfully packed
for _, deletedPoolID := range deletedPools {
for index, fullPool := range poolMaster.FullPools {
if fullPool.PoolID == deletedPoolID {
poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index)
break
}
}
}
//fmt.Printf("unlock WORMLock Regular FullPool Pack\n")
//poolMaster.WORMLock.Unlock()
fmt.Printf("unlock lock Regular FullPool Pack\n")
poolMaster.lock.Unlock()
}
return nil
}
func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
var unlocKOnce sync.Once
p.lock.Lock()
defer unlocKOnce.Do(p.lock.Unlock)
for _, cp := range p.CurrentPool {
if cp != nil {
for _, poolItem := range cp.items {
if poolItem == id {
fmt.Printf("Fetch CurrentPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", cp.PoolID, id)
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
//fmt.Println("Closer")
defer srcLocalFile.Close()
//fmt.Println("io.Copy")
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
}
}
for _, wormPool := range p.WORMPools {
for _, poolItem := range wormPool.items {
if poolItem == id {
fmt.Printf("Fetch WORMPool %s file %s\n", wormPool.PoolID, id)
return wormPool.Fetch(id, writer)
}
}
}
unlocKOnce.Do(p.lock.Unlock)
for _, fullPool := range p.FullPools {
for _, poolItem := range fullPool.items {
if poolItem == id {
fmt.Printf("Fetch FullPool %s\n", id)
fullPool.LastTouchy = time.Now()
poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
defer srcLocalFile.Close()
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
}
for _, localPool := range p.LocalPools {
for _, poolItem := range localPool.items {
if poolItem == id {
fmt.Printf("Fetch LocalPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", localPool.PoolID, id)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
defer srcLocalFile.Close()
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
}
// ArangoDB
dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id)
if err != nil {
return err
}
fmt.Printf("dboFile2ChunkExists %s = %v\n", id, dboFile2ChunkExists)
if dboFile2ChunkExists {
var dboFile2Chunk common.DB_File2Chunk
_, err = colFile2Chunk.ReadDocument(arangoCTX, id, &dboFile2Chunk)
if err != nil {
return err
}
//FetchFromPoolPack
apparentPoolID := dboFile2Chunk.Chunk[6:]
fastCachePath := filepath.Join(WORMCachePath, "worm", apparentPoolID, id)
if checkFileExists(fastCachePath) {
fastCacheDir := filepath.Join(WORMCachePath, "worm", apparentPoolID)
srcLocalFile, err := os.Open(fastCachePath)
if err != nil {
return err
}
defer srcLocalFile.Close()
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
// at this point acquire lock til end
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.WORMPools[apparentPoolID]; !ok {
// create virtual worm
fastCachePool, err := RestorePoolFromFolder(fastCacheDir)
if err != nil {
return err
}
// fastCachePool
p.WORMPools[fastCachePool.PoolID] = fastCachePool
}
return nil
}
//dboFile2Chunk.Chunk <- which chunk i need to find
return p.FetchLoadWORM(dboFile2Chunk.Chunk[6:], id, writer)
}
return nil
}
func checkFileExists(filePath string) bool {
_, error := os.Stat(filePath)
//return !os.IsNotExist(err)
return !errors.Is(error, os.ErrNotExist)
}
func (p *PoolMaster) Store(id string, workerID string, src io.Reader, targetSize int64) (err error) {
pool, err := p.GetCurrentWriteablePool(workerID)
if err != nil {
return err
}
if pool.ReadOnly {
return fmt.Errorf("WTF Pool %s is ReadOnly but GetCurrentWriteablePool returned it", pool.PoolID)
}
fmt.Printf("Store(%s)\n", id)
fmt.Printf("Aquiring Lock for Store\n")
p.lock.Lock()
defer p.lock.Unlock()
defer fmt.Printf("unlock Store\n")
fmt.Printf("Aquired Lock success for Store\n")
// figuring out paths
poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID)
destPath := filepath.Join(poolFolder, id)
dst, err := os.Create(destPath)
if err != nil {
_ = os.Remove(destPath)
return err
}
defer dst.Close()
// copy from ioReader to file
writtenBytes, err := io.Copy(dst, src)
if err != nil {
_ = os.Remove(destPath)
return err
}
if writtenBytes != targetSize {
_ = os.Remove(destPath)
return err
}
// check transferred data
dst.Seek(0, 0)
shaHasher := sha256.New()
if _, err := io.Copy(shaHasher, dst); err != nil {
return err
}
outputHash := fmt.Sprintf("%x", shaHasher.Sum(nil))
if outputHash != id {
return fmt.Errorf("Store() Sha256 Hash Mismatch")
}
pool.itemCount++
pool.items = append(pool.items, id)
fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
entries, err := os.ReadDir(poolFolder)
if err != nil {
return err
}
newItemCount := 0
for _, e := range entries {
if !e.IsDir() {
//pool.items = append(pool.items, e.Name())
newItemCount++
}
}
pool.itemCount = newItemCount
//fmt.Printf("Current Pool %s, Recounted ItemCount = %d\n", pool.PoolID, pool.itemCount)
if pool.itemCount >= PoolMaxItems {
pool.ReadOnly = true
}
return nil
}
func removeFromSlice(slice []*Pool, s int) []*Pool {
return append(slice[:s], slice[s+1:]...)
}
func main() {
err := InitDatabase()
if err != nil {
panic(err)
}
poolMaster, err = NewPoolMaster(PoolPathFinal, PoolPathTemp)
if err != nil {
panic(err)
}
// Scan for local existing Pools
err = poolMaster.ScanForLocalPools()
if err != nil {
panic(err)
}
go func() {
for {
poolMaster.PackFullPools()
poolMaster.CleanWORMTemp()
time.Sleep(time.Minute * 2)
}
}()
{
poolChannel := make(chan error) // Channel to receive the results
var poolChannelCounter = 0
// Initial packing
for _, localPool := range poolMaster.LocalPools {
if localPool.ReadOnly {
dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID)
if err != nil {
panic(err)
}
if !dboChunkExists {
//spawn thread
go func(localPool *Pool) {
fmt.Printf("Packing Pool %s\n", localPool.PoolID)
packResult, err := poolMaster.PackPool(localPool.PoolID)
if err != nil {
panic(err)
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
panic(err)
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
panic(err)
}
err = poolMaster.WriteRecoveryFile(packResult, localPool)
if err != nil {
panic(err)
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
poolChannel <- nil
}(localPool)
// increment total parallel counter
poolChannelCounter++
}
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
}
// packResult.FileCount
}
// Waiting for them to finish
for i := 0; i < poolChannelCounter; i++ {
result := <-poolChannel
if result != nil {
fmt.Printf("PoolChannel Error: %v\n", result)
}
}
}
e := echo.New()
e.Use(middleware.RecoverWithConfig(middleware.RecoverConfig{
StackSize: 1 << 10, // 1 KB
}))
//e.Use(middleware.Logger())
e.GET("/", func(c echo.Context) error {
return c.String(http.StatusOK, "Hello, World!")
})
e.GET("/pmdump", func(c echo.Context) error {
return c.JSON(http.StatusOK, poolMaster)
})
e.GET("/check/:id", func(c echo.Context) error {
id := c.Param("id")
fmt.Printf("/check/%s checking...\n", id)
exists := poolMaster.Lookup(id)
//fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists))
if exists {
return c.JSON(http.StatusAlreadyReported, exists)
}
return c.JSON(http.StatusOK, exists)
})
e.GET("/fetch/:id", func(c echo.Context) error {
id := c.Param("id")
//fmt.Printf("/fetch/%s fetching...\n", id)
exists := poolMaster.Lookup(id)
if exists {
c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream)
c.Response().WriteHeader(http.StatusOK)
err = poolMaster.Fetch(id, c.Response())
if err != nil {
fmt.Printf("%v", err)
return c.String(http.StatusInternalServerError, err.Error())
}
c.Response().Flush()
} else {
fmt.Printf("/fetch/%s does not exist\n", id)
return c.String(http.StatusNotFound, "Not Found")
}
return nil
})
e.POST("/stash/:id/:size", func(c echo.Context) error {
id := c.Param("id")
sizeStr := c.Param("size")
sizeVal, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
workerID := c.FormValue("worker")
infoJSON := c.FormValue("info")
fileInfo := common.DB_File{}
err = json.Unmarshal([]byte(infoJSON), &fileInfo)
if err != nil {
fmt.Println(err)
return c.String(http.StatusBadRequest, "Error")
}
exists := poolMaster.Lookup(id)
if exists {
fmt.Printf("/stash/%s exists already\n", id)
file, err := c.FormFile("file")
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
formStream, err := file.Open()
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
defer formStream.Close()
if _, err = io.Copy(io.Discard, formStream); err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
return c.String(http.StatusAlreadyReported, "Exists already")
}
fmt.Printf("stashing %s", id)
file, err := c.FormFile("file")
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
formStream, err := file.Open()
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
defer formStream.Close()
err = poolMaster.Store(id, workerID, formStream, sizeVal)
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
fmt.Println("...stashed")
fmt.Println(fileInfo)
fileInfo.Created = time.Now()
_, err = colFile.CreateDocument(arangoCTX, fileInfo)
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
return c.JSON(http.StatusOK, true)
})
e.Logger.Fatal(e.Start(":13371"))
}