package main
import (
"archive/tar"
"context"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"git.cheetah.cat/worksucc/gma-puzzles/common"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/klauspost/compress/zstd"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/twinj/uuid"
)
var (
FastCacheEnabled = true
FastCachePath = "/zpool0/cheetah/workshop/garrysmod/gma-inator/cache"
WORMCachePath = FastCachePath
PoolMaxItems = 500
PoolPathFinal = "/zpool0/cheetah/workshop/garrysmod/gma-inator/chunks/"
PoolPathTemp = "/zpool0/cheetah/workshop/garrysmod/gma-inator/temp"
)
type PoolRecoveryData struct {
PoolID string ` json:"_key" `
Size uint64 ` json:"size" `
Created time . Time ` json:"date" `
Hash string ` json:"hash" `
ItemCount int ` json:"itemCount" `
Items [ ] string ` json:"items" `
RecoveryData [ ] common . DB_File ` json:"recoveryData" `
}
type Pool struct {
PoolID string ` json:"_key" `
Finalized bool ` json:"finalized" `
ReadOnly bool ` json:"readOnly" `
Size uint64 ` json:"size" `
LastTouchy time . Time ` json:"-" `
itemCount int
items [ ] string
wormMode bool
filePath string
file * os . File
//tarWriter *tar.Writer
tarReader * tar . Reader
}
type PoolFile struct {
FileID string
Size uint64
}
type PoolMaster struct {
cachePath string
finalPath string
CurrentPool map [ string ] * Pool
lock sync . Mutex
WORMLock sync . Mutex
LocalPools [ ] * Pool
FullPools [ ] * Pool
WORMPools map [ string ] * Pool
}
type PoolPackResult struct {
PoolID string
Files [ ] string
FileCount int
Size int64
Hash string
outputFileName string
}
var (
arangoDB adriver . Database
arangoCTX context . Context
colChunk adriver . Collection
colFile adriver . Collection
colFile2Chunk adriver . Collection
// PoolMaster
poolMaster PoolMaster
)
func ConnectDB ( baseURL string , arangoUser string , arangoPWD string , arangoDatabase string ) ( driver adriver . Database , ctx context . Context , err error ) {
log . Println ( "connectDB:" , "Starting Connection Process..." )
// Retry Loop for Failed Connections
for i := 0 ; i < 6 ; i ++ {
if i == 5 {
return driver , ctx , fmt . Errorf ( "connectdb unable to connect to database %d times" , i )
} else if i > 0 {
time . Sleep ( 30 * time . Second )
}
// Connect to ArangoDB URL
conn , err := ahttp . NewConnection ( ahttp . ConnectionConfig {
Endpoints : [ ] string { baseURL } ,
TLSConfig : & tls . Config { /*...*/ } ,
} )
if err != nil {
log . Println ( "connectDB:" , "Cannot Connect to ArangoDB!" , err )
continue
}
// Connect Driver to User
client , err := adriver . NewClient ( adriver . ClientConfig {
Connection : conn ,
Authentication : adriver . BasicAuthentication ( arangoUser , arangoPWD ) ,
} )
if err != nil {
log . Println ( "connectDB:" , "Cannot Authenticate ArangoDB User!" , err )
continue
}
// Create Context for Database Access
ctx = context . Background ( )
driver , err = client . Database ( ctx , arangoDatabase )
if err != nil {
log . Println ( "connectDB:" , "Cannot Load ArangoDB Database!" , err )
continue
}
log . Println ( "connectDB:" , "Connection Sucessful!" )
return driver , ctx , nil
}
return driver , ctx , fmt . Errorf ( "connectDB: FUCK HOW DID THIS EXCUTE?" )
}
func InitDatabase ( ) ( err error ) {
arangoDB , arangoCTX , err = ConnectDB ( "http://192.168.133.6:8529" , "gma-inator" , "gma-inator" , "gma-inator" )
if err != nil {
return err
}
colChunk , err = arangoDB . Collection ( arangoCTX , "chunk" )
if err != nil {
return err
}
colFile , err = arangoDB . Collection ( arangoCTX , "file" )
if err != nil {
return err
}
colFile2Chunk , err = arangoDB . Collection ( arangoCTX , "file_chunk_map" )
if err != nil {
return err
}
return nil
}
func ( p * Pool ) OpenTar ( ) ( err error ) {
p . wormMode = true
outputDir := filepath . Join ( WORMCachePath , "worm" , p . PoolID )
err = os . MkdirAll ( outputDir , os . ModePerm )
if err != nil {
return err
}
p . file , err = os . Open ( p . filePath )
if err != nil {
return err
}
p . items = [ ] string { }
decompressor , err := zstd . NewReader ( p . file , zstd . WithDecoderConcurrency ( 8 ) )
if err != nil {
panic ( err )
}
defer decompressor . Close ( )
p . tarReader = tar . NewReader ( decompressor )
for {
header , err := p . tarReader . Next ( )
if err == io . EOF {
break
}
if err != nil {
return err
}
path := filepath . Join ( outputDir , header . Name )
info := header . FileInfo ( )
file , err := os . OpenFile ( path , os . O_CREATE | os . O_TRUNC | os . O_WRONLY , info . Mode ( ) )
if err != nil {
return err
}
defer file . Close ( )
_ , err = io . Copy ( file , p . tarReader )
if err != nil {
return err
}
p . items = append ( p . items , header . Name )
fmt . Print ( "." )
}
return nil
}
func ( p * Pool ) Fetch ( id string , writer io . Writer ) ( err error ) {
for _ , poolItem := range p . items {
if poolItem == id {
//fmt.Printf("Fetch WORMPool %s\n", id)
p . LastTouchy = time . Now ( )
poolLocalFilePath := filepath . Join ( WORMCachePath , "worm" , p . PoolID , id )
srcLocalFile , err := os . Open ( poolLocalFilePath )
if err != nil {
return err
}
defer srcLocalFile . Close ( )
if _ , err = io . Copy ( writer , srcLocalFile ) ; err != nil {
return err
}
return nil
}
}
return fmt . Errorf ( "%s not found" , id )
}
func ( p * Pool ) Unload ( ) {
log . Printf ( "Unloading WORMPool [%s]\n" , p . PoolID )
p . file . Close ( )
}
func NewPoolMaster ( finalPath string , cachePath string ) ( poolMaster PoolMaster , err error ) {
poolMaster . finalPath = finalPath
poolMaster . cachePath = cachePath
poolMaster . CurrentPool = make ( map [ string ] * Pool )
poolMaster . WORMPools = make ( map [ string ] * Pool )
//poolMaster.lock = sync.Mutex{}
destPath := filepath . Join ( poolMaster . cachePath , "pool" )
err = os . MkdirAll ( destPath , os . ModePerm )
if err != nil {
return PoolMaster { } , err
}
destPath = filepath . Join ( poolMaster . cachePath , "worm" )
err = os . MkdirAll ( destPath , os . ModePerm )
if err != nil {
return PoolMaster { } , err
}
err = os . MkdirAll ( poolMaster . finalPath , os . ModePerm )
if err != nil {
return PoolMaster { } , err
}
return
}
func ( p * PoolMaster ) NewPool ( ) ( * Pool , error ) {
var err error
pool := Pool { }
pool . PoolID = uuid . NewV4 ( ) . String ( )
pool . Finalized = false
pool . ReadOnly = false
//TODO : Sync to DB
destPath := filepath . Join ( p . cachePath , "pool" , pool . PoolID )
//dir := filepath.Dir(destPath)
err = os . MkdirAll ( destPath , os . ModePerm )
if err != nil {
return & pool , err
}
return & pool , nil
}
func ( p * PoolMaster ) GetCurrentWriteablePool ( workerID string ) ( pool * Pool , err error ) {
//fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
if p . CurrentPool [ workerID ] != nil && p . CurrentPool [ workerID ] . itemCount >= PoolMaxItems {
fmt . Printf ( "Aquiring Lock for GetCurrentWriteablepool\n" )
p . lock . Lock ( )
defer p . lock . Unlock ( )
defer fmt . Printf ( "unlock GetCurrentWriteablepool\n" )
fmt . Printf ( "Aquired Lock success for GetCurrentWriteablepool\n" )
p . CurrentPool [ workerID ] . ReadOnly = true
p . CurrentPool [ workerID ] . LastTouchy = time . Now ( )
p . FullPools = append ( p . FullPools , p . CurrentPool [ workerID ] )
// queue for compression
fmt . Printf ( "GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n" , p . CurrentPool [ workerID ] . PoolID , p . CurrentPool [ workerID ] . itemCount )
p . CurrentPool [ workerID ] = nil
}
if p . CurrentPool [ workerID ] == nil {
fmt . Printf ( "Creating new Pool" )
p . CurrentPool [ workerID ] , err = p . AcquireNewOrRecoverPool ( )
err := os . WriteFile ( filepath . Join ( p . cachePath , "pool" , fmt . Sprintf ( "%s.worker" , p . CurrentPool [ workerID ] . PoolID ) ) , [ ] byte ( workerID ) , os . ModePerm )
if err != nil {
return pool , err
}
fmt . Printf ( "... got [%s]\n" , p . CurrentPool [ workerID ] . PoolID )
}
return p . CurrentPool [ workerID ] , nil
}
func RestorePoolFromFolder ( folderPath string ) ( pool * Pool , err error ) {
pool = & Pool { }
pool . PoolID = path . Base ( folderPath )
entries , err := os . ReadDir ( folderPath )
if err != nil {
return pool , err
}
for _ , e := range entries {
if ! e . IsDir ( ) {
pool . items = append ( pool . items , e . Name ( ) )
pool . itemCount ++
}
}
pool . ReadOnly = pool . itemCount >= PoolMaxItems
pool . Finalized = false // we are still local
return pool , err
}
func ( p * PoolMaster ) ScanForLocalPools ( ) ( err error ) {
fmt . Printf ( "Aquiring Lock for ScanForLocalPools\n" )
p . lock . Lock ( )
defer p . lock . Unlock ( )
defer fmt . Printf ( "unlock ScanForLocalPools\n" )
fmt . Printf ( "Aquired Lock success for ScanForLocalPools\n" )
entries , err := os . ReadDir ( filepath . Join ( p . cachePath , "pool" ) )
if err != nil {
return err
}
for _ , e := range entries {
if e . IsDir ( ) {
fmt . Printf ( "Scanning For Local Pools, found %s:" , e . Name ( ) )
tarFinalPath := filepath . Join ( p . finalPath , fmt . Sprintf ( "%s.tar.zst" , e . Name ( ) ) )
_ , err = os . Stat ( tarFinalPath )
finalPathExists := false
if err != nil {
if ! errors . Is ( err , os . ErrNotExist ) {
return err
}
}
dboChunkExists , err := colChunk . DocumentExists ( arangoCTX , e . Name ( ) )
if err != nil {
return err
}
if dboChunkExists {
var dboChunk common . DB_Chunk
_ , err := colChunk . ReadDocument ( arangoCTX , e . Name ( ) , & dboChunk )
if err != nil {
return err
}
finalPathExists = dboChunk . Finalized && dboChunk . ReadOnly && ! dboChunk . NotReady
fmt . Printf ( "is in DB readonly %v finalized %v notready %v itemCount=%d size=%d hash=%s\n" , dboChunk . ReadOnly , dboChunk . Finalized , dboChunk . NotReady , dboChunk . FileCount , dboChunk . Size , dboChunk . Hash )
if finalPathExists {
fmt . Println ( "skipping" )
}
}
if finalPathExists {
continue
}
poolDirPath := filepath . Join ( p . cachePath , "pool" , e . Name ( ) )
restoredPool , err := RestorePoolFromFolder ( poolDirPath )
if err != nil {
return err
}
workerCacheFileName := filepath . Join ( p . cachePath , "pool" , fmt . Sprintf ( "%s.worker" , e . Name ( ) ) )
fmt . Printf ( "is readonly %v itemCount=%d\n" , restoredPool . ReadOnly , restoredPool . itemCount )
if restoredPool . itemCount == 500 {
p . LocalPools = append ( p . LocalPools , restoredPool )
} else {
_ , err = os . Stat ( workerCacheFileName ) //if we have a worker assingment file
if err != nil {
if ! errors . Is ( err , os . ErrNotExist ) {
return err
}
// if not exists
p . LocalPools = append ( p . LocalPools , restoredPool )
} else {
workerBytes , err := os . ReadFile ( workerCacheFileName )
if err != nil {
return err
}
p . CurrentPool [ string ( workerBytes ) ] = restoredPool
fmt . Println ( string ( workerBytes ) )
}
}
}
}
return nil
}
// Pool Packing
func ( p * PoolMaster ) ImportPoolPackResult ( packResult PoolPackResult ) ( err error ) {
startTime := time . Now ( )
dboChunk := common . DB_Chunk {
ID : packResult . PoolID ,
Hash : packResult . Hash ,
Size : packResult . Size ,
FileCount : packResult . FileCount ,
Created : time . Now ( ) ,
NotReady : true ,
ReadOnly : true ,
Finalized : true ,
}
var dboChunk2File [ ] common . DB_File2Chunk
for _ , prFile := range packResult . Files {
dboChunk2File = append ( dboChunk2File , common . DB_File2Chunk {
ID : prFile ,
File : fmt . Sprintf ( "file/%s" , prFile ) ,
Chunk : fmt . Sprintf ( "chunk/%s" , dboChunk . ID ) ,
} )
}
_ , err = colChunk . CreateDocument ( arangoCTX , dboChunk )
if err != nil {
return err
}
chunkSize := 500
for {
if len ( dboChunk2File ) == 0 {
break
}
// necessary check to avoid slicing beyond
// slice capacity
if len ( dboChunk2File ) < chunkSize {
chunkSize = len ( dboChunk2File )
}
_ , errorSlice , _ := colFile2Chunk . CreateDocuments ( arangoCTX , dboChunk2File [ 0 : chunkSize ] )
//metaSlice, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize])
//fmt.Println("Metaslice")
//fmt.Println(metaSlice)
/ * for _ , meta := range metaSlice {
if ! meta . ID . IsEmpty ( ) {
newUnknownFiles = append ( newUnknownFiles , meta . Key )
fileIDs = append ( fileIDs , meta . Key )
}
} * /
for _ , createError := range errorSlice {
if createError != nil && strings . Contains ( createError . Error ( ) , "unique constraint violated - in index primary of type primary over '_key'" ) {
} else if createError != nil {
return createError
}
}
dboChunk2File = dboChunk2File [ chunkSize : ]
}
fmt . Printf ( "ImportPool Duration %dms\n" , time . Since ( startTime ) . Milliseconds ( ) )
return nil
}
func ( p * PoolMaster ) MovePoolPackToWORM ( packResult PoolPackResult ) ( err error ) {
startTime := time . Now ( )
targetFileName := filepath . Join ( p . finalPath , fmt . Sprintf ( "%s.tar.zst" , packResult . PoolID ) )
err = common . MoveFile ( packResult . outputFileName , targetFileName )
if err != nil {
return err
}
tarFileCheck , err := os . Open ( targetFileName )
if err != nil {
return err
}
defer tarFileCheck . Close ( )
shaHasher := sha256 . New ( )
_ , err = io . Copy ( shaHasher , tarFileCheck )
if err != nil {
return err
}
wormHash := fmt . Sprintf ( "%x" , shaHasher . Sum ( nil ) )
fmt . Printf ( "WORMTarPool hash is %s , old is %s\n" , wormHash , packResult . Hash )
if wormHash != packResult . Hash {
os . Remove ( targetFileName )
return err
}
fmt . Printf ( "MoveWORM Duration %dms\n" , time . Since ( startTime ) . Milliseconds ( ) )
return nil
}
func ( p * PoolMaster ) WriteRecoveryFile ( packResult PoolPackResult , pool * Pool ) ( err error ) {
fileName := filepath . Join ( p . finalPath , fmt . Sprintf ( "%s.json" , packResult . PoolID ) )
recoveryFile , err := os . Create ( fileName )
if err != nil {
return err
}
defer recoveryFile . Close ( )
poolRecoveryData := PoolRecoveryData {
PoolID : packResult . PoolID ,
Size : uint64 ( packResult . Size ) ,
Created : time . Now ( ) ,
Hash : packResult . Hash ,
ItemCount : pool . itemCount ,
Items : pool . items ,
//RecoveryData,
}
//TODO: fetch RecoveryData from DB
poolRecoveryData . RecoveryData = make ( [ ] common . DB_File , len ( pool . items ) )
_ , _ , err = colFile . ReadDocuments ( arangoCTX , pool . items , poolRecoveryData . RecoveryData )
if err != nil {
return fmt . Errorf ( "error @ReadDocuments %v" , err )
}
json , err := json . MarshalIndent ( poolRecoveryData , "" , "\t" )
if err != nil {
return fmt . Errorf ( "error @json.MarshalIndent %v" , err )
}
_ , err = recoveryFile . Write ( json )
if err != nil {
return fmt . Errorf ( "error @recoveryFile.Write %v" , err )
}
return nil
}
/ *
* Only call this in a locked Sate
* /
func ( p * PoolMaster ) PackPool ( poolID string ) ( packResult PoolPackResult , err error ) {
startTime := time . Now ( )
packResult . PoolID = poolID
packResult . outputFileName = filepath . Join ( p . cachePath , "pool" , fmt . Sprintf ( "%s.tar.zst" , poolID ) )
tarFile , err := os . Create ( packResult . outputFileName )
if err != nil {
return packResult , fmt . Errorf ( "os.Create: %v" , err )
}
defer tarFile . Close ( )
compressor , err := zstd . NewWriter ( tarFile , zstd . WithEncoderLevel ( 4 ) )
if err != nil {
return packResult , fmt . Errorf ( "zstd.NewWriter: %v" , err )
}
defer compressor . Close ( )
tw := tar . NewWriter ( compressor )
defer tw . Close ( )
entries , err := os . ReadDir ( filepath . Join ( p . cachePath , "pool" , poolID ) )
if err != nil {
return packResult , fmt . Errorf ( "os.ReadDir: %v" , err )
}
//fmt.Printf("len(entries) == %d\n", len(entries))
if len ( entries ) != PoolMaxItems {
return packResult , fmt . Errorf ( "Pool contains %d items, but there should be %d" , len ( entries ) , PoolMaxItems )
}
for _ , e := range entries {
originalPath := filepath . Join ( p . cachePath , "pool" , poolID , e . Name ( ) )
file , err := os . Open ( originalPath )
if err != nil {
return packResult , fmt . Errorf ( "for os.Open: %v" , err )
}
defer file . Close ( )
info , err := file . Stat ( )
if err != nil {
return packResult , fmt . Errorf ( "for fs.Stat: %v" , err )
}
tarFileHeader , err := tar . FileInfoHeader ( info , info . Name ( ) )
if err != nil {
return packResult , err
}
err = tw . WriteHeader ( tarFileHeader )
if err != nil {
return packResult , err
}
_ , err = io . Copy ( tw , file )
if err != nil {
return packResult , err
}
packResult . FileCount ++
packResult . Files = append ( packResult . Files , e . Name ( ) )
fmt . Printf ( "*" )
}
err = tw . Close ( )
if err != nil {
return packResult , err
}
err = compressor . Close ( )
if err != nil {
return packResult , err
}
err = tarFile . Close ( )
if err != nil {
return packResult , err
}
// re-open and check
{
tarFileCheck , err := os . Open ( packResult . outputFileName )
if err != nil {
return packResult , err
}
defer tarFileCheck . Close ( )
shaHasher := sha256 . New ( )
hashedBytes , err := io . Copy ( shaHasher , tarFileCheck )
if err != nil {
return packResult , err
}
packResult . Hash = fmt . Sprintf ( "%x" , shaHasher . Sum ( nil ) )
fmt . Printf ( "PackPoolTar hash is %s\n" , packResult . Hash )
tarFileCheck . Seek ( 0 , 0 )
// validate written tar-chunk
decompressor , err := zstd . NewReader ( tarFileCheck , zstd . WithDecoderConcurrency ( 8 ) )
if err != nil {
panic ( err )
}
defer decompressor . Close ( )
tarFileCheckReader := tar . NewReader ( decompressor )
//filenamesReadBackList := []string{}
for {
header , err := tarFileCheckReader . Next ( )
//header.PAXRecords
if err == io . EOF {
break
}
if err != nil {
return packResult , err
}
hasher := sha256 . New ( )
hashedBytes , err := io . Copy ( hasher , tarFileCheckReader )
if err != nil {
return packResult , err
}
readBackChecksum := fmt . Sprintf ( "%x" , hasher . Sum ( nil ) )
if hashedBytes != header . Size {
return packResult , fmt . Errorf ( "validation on output archive, incorrect size file %s has %d should be %d" , header . Name , hashedBytes , header . Size )
}
if header . Name != readBackChecksum {
return packResult , fmt . Errorf ( "validation on output archive, incorrect checksum file %s has %s" , header . Name , readBackChecksum )
}
//filenamesReadBackList = append(filenamesReadBackList, header.Name)
}
packFileStats , err := tarFileCheck . Stat ( )
if err != nil {
return packResult , err
}
packResult . Size = packFileStats . Size ( )
if hashedBytes != packResult . Size {
return packResult , fmt . Errorf ( "WORM Copy HashedBytes %d != FileSize %d" , hashedBytes , packResult . Size )
}
fmt . Printf ( "PackPool Duration %dms\n" , time . Since ( startTime ) . Milliseconds ( ) )
}
// TODO: write index json describing the files inside, pure hash list, aswell
// as dictionary containing all the infos for restore/disaster-recovery into Arango
return packResult , nil
}
func ( p * PoolMaster ) AcquireNewOrRecoverPool ( ) ( pool * Pool , err error ) {
// p.NewPool()
for _ , localPool := range p . LocalPools {
if ! localPool . ReadOnly && localPool . itemCount < PoolMaxItems {
return localPool , nil
}
}
return p . NewPool ( )
}
func ( p * PoolMaster ) Lookup ( id string ) ( exists bool ) {
p . lock . Lock ( )
defer p . lock . Unlock ( )
for _ , cp := range p . CurrentPool {
if cp != nil { // CurrentPool
for _ , poolItem := range cp . items {
if poolItem == id {
return true
}
}
}
}
for _ , wormPool := range p . WORMPools { // WORM Pools
for _ , poolItem := range wormPool . items {
if poolItem == id {
return true
}
}
}
for _ , fullPool := range p . FullPools { // Full Pools
for _ , poolItem := range fullPool . items {
if poolItem == id {
return true
}
}
}
for _ , localPool := range p . LocalPools { // Local Pools
for _ , poolItem := range localPool . items {
if poolItem == id {
return true
}
}
}
dboFile2ChunkExists , err := colFile2Chunk . DocumentExists ( arangoCTX , id )
if err != nil {
return false
}
return dboFile2ChunkExists
}
func ( p * PoolMaster ) FetchLoadWORM ( chunkID string , fileID string , writer io . Writer ) ( err error ) {
fmt . Printf ( "FetchLoadWORM(chunkID %s, fileID %s, ...)\n" , chunkID , fileID )
// search within loaded worm-pools
//fmt.Println("WormPool For Start")
for wormID , wormPool := range p . WORMPools {
//fmt.Printf("WORMPool[%s] for-iter\n", wormID)
if wormID != chunkID {
continue
}
for _ , poolItem := range wormPool . items {
if poolItem == fileID {
//fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID)
return wormPool . Fetch ( fileID , writer )
}
}
break
}
// else load wormPool into disk-cache extract to "worm"
// wormMode
// TODO: every Method here should have locked the WORMLock!!
/ *
fmt . Printf ( "Aquiring WORMLock for FetchLoadWORM\n" )
p . WORMLock . Lock ( )
defer p . WORMLock . Unlock ( )
0 defer fmt . Printf ( "unlock WORMLock FetchLoadWORM\n" )
fmt . Printf ( "Aquired WORMLock success for FetchLoadWORM\n" )
* /
fmt . Printf ( "Aquiring Lock for FetchLoadWORM\n" )
p . lock . Lock ( )
defer p . lock . Unlock ( )
defer fmt . Printf ( "unlock FetchLoadWORM\n" )
fmt . Printf ( "Aquired Lock success for FetchLoadWORM\n" )
var dboChunk common . DB_Chunk
_ , err = colChunk . ReadDocument ( arangoCTX , chunkID , & dboChunk )
if err != nil {
return err
}
loadedWormPool := Pool {
PoolID : dboChunk . ID ,
Size : uint64 ( dboChunk . Size ) ,
ReadOnly : dboChunk . ReadOnly ,
Finalized : dboChunk . Finalized ,
LastTouchy : time . Now ( ) ,
filePath : filepath . Join ( p . finalPath , fmt . Sprintf ( "%s.tar.zst" , dboChunk . ID ) ) ,
}
fmt . Printf ( "initialized loadedWormPool (%s), Opening tar...\n" , dboChunk . ID )
err = loadedWormPool . OpenTar ( )
if err != nil {
fmt . Println ( err )
return err
}
fmt . Printf ( "extracted and key = %s\n" , loadedWormPool . PoolID )
p . WORMPools [ loadedWormPool . PoolID ] = & loadedWormPool
return loadedWormPool . Fetch ( fileID , writer )
//return nil
}
func ( p * PoolMaster ) CleanWORMTemp ( ) ( err error ) {
p . lock . Lock ( )
//p.WORMLock.Lock()
//defer p.WORMLock.Unlock()
defer p . lock . Unlock ( )
for _ , wormPool := range p . WORMPools {
if time . Since ( wormPool . LastTouchy ) . Minutes ( ) > 4 {
wormPool . Unload ( )
delete ( p . WORMPools , wormPool . PoolID )
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "worm", wormPool.PoolID))
}
}
return nil
}
func ( p * PoolMaster ) PackFullPools ( ) ( err error ) {
if len ( poolMaster . FullPools ) >= 16 {
poolMaster . lock . Lock ( )
fmt . Printf ( "Aquiring WORMLock for Regular FullPool Pack\n" )
//poolMaster.WORMLock.Lock()
//fmt.Printf("Aquired WORMLock success for Regular FullPool Pack\n")
poolChannel := make ( chan error ) // Channel to receive the results
var deletedPools [ ] string
var poolChannelCounter = 0
for _ , fullPool := range poolMaster . FullPools {
if time . Since ( fullPool . LastTouchy ) . Minutes ( ) < 5 {
continue
}
// start parallel
go func ( fullPool * Pool ) {
packResult , err := poolMaster . PackPool ( fullPool . PoolID )
if err != nil {
poolChannel <- fmt . Errorf ( "error @PackPool: %v" , err )
return
}
err = poolMaster . ImportPoolPackResult ( packResult )
if err != nil {
poolChannel <- fmt . Errorf ( "error @ImportPoolPackResult: %v" , err )
return
}
err = poolMaster . MovePoolPackToWORM ( packResult )
if err != nil {
poolChannel <- fmt . Errorf ( "error @MovePoolPackToWORM: %v" , err )
return
}
err = poolMaster . WriteRecoveryFile ( packResult , fullPool )
if err != nil {
poolChannel <- fmt . Errorf ( "error @WriteRecoveryFile: %v" , err )
return
}
os . RemoveAll ( filepath . Join ( poolMaster . cachePath , "pool" , fullPool . PoolID ) )
deletedPools = append ( deletedPools , fullPool . PoolID )
_ , err = colChunk . UpdateDocument ( arangoCTX , packResult . PoolID , common . DB_Chunk {
NotReady : false ,
Finalized : true ,
ReadOnly : true ,
Hash : packResult . Hash ,
Size : packResult . Size ,
} )
if err != nil {
poolChannel <- err
return
}
poolChannel <- nil
} ( fullPool )
// increment total parallel counter
poolChannelCounter ++
}
// Waiting for them to finish
for i := 0 ; i < poolChannelCounter ; i ++ {
result := <- poolChannel
if result != nil {
fmt . Printf ( "PoolChannel Error: %v\n" , result )
}
}
// delete pools that have successfully packed
for _ , deletedPoolID := range deletedPools {
for index , fullPool := range poolMaster . FullPools {
if fullPool . PoolID == deletedPoolID {
poolMaster . FullPools = removeFromSlice ( poolMaster . FullPools , index )
break
}
}
}
//fmt.Printf("unlock WORMLock Regular FullPool Pack\n")
//poolMaster.WORMLock.Unlock()
fmt . Printf ( "unlock lock Regular FullPool Pack\n" )
poolMaster . lock . Unlock ( )
}
return nil
}
func ( p * PoolMaster ) Fetch ( id string , writer io . Writer ) ( err error ) {
var unlocKOnce sync . Once
p . lock . Lock ( )
defer unlocKOnce . Do ( p . lock . Unlock )
for _ , cp := range p . CurrentPool {
if cp != nil {
for _ , poolItem := range cp . items {
if poolItem == id {
fmt . Printf ( "Fetch CurrentPool %s\n" , id )
poolLocalFilePath := filepath . Join ( p . cachePath , "pool" , cp . PoolID , id )
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem)
srcLocalFile , err := os . Open ( poolLocalFilePath )
if err != nil {
return err
}
//fmt.Println("Closer")
defer srcLocalFile . Close ( )
//fmt.Println("io.Copy")
if _ , err = io . Copy ( writer , srcLocalFile ) ; err != nil {
return err
}
return nil
}
}
}
}
for _ , wormPool := range p . WORMPools {
for _ , poolItem := range wormPool . items {
if poolItem == id {
fmt . Printf ( "Fetch WORMPool %s file %s\n" , wormPool . PoolID , id )
return wormPool . Fetch ( id , writer )
}
}
}
unlocKOnce . Do ( p . lock . Unlock )
for _ , fullPool := range p . FullPools {
for _ , poolItem := range fullPool . items {
if poolItem == id {
fmt . Printf ( "Fetch FullPool %s\n" , id )
fullPool . LastTouchy = time . Now ( )
poolLocalFilePath := filepath . Join ( p . cachePath , "pool" , fullPool . PoolID , id )
srcLocalFile , err := os . Open ( poolLocalFilePath )
if err != nil {
return err
}
defer srcLocalFile . Close ( )
if _ , err = io . Copy ( writer , srcLocalFile ) ; err != nil {
return err
}
return nil
}
}
}
for _ , localPool := range p . LocalPools {
for _ , poolItem := range localPool . items {
if poolItem == id {
fmt . Printf ( "Fetch LocalPool %s\n" , id )
poolLocalFilePath := filepath . Join ( p . cachePath , "pool" , localPool . PoolID , id )
srcLocalFile , err := os . Open ( poolLocalFilePath )
if err != nil {
return err
}
defer srcLocalFile . Close ( )
if _ , err = io . Copy ( writer , srcLocalFile ) ; err != nil {
return err
}
return nil
}
}
}
// ArangoDB
dboFile2ChunkExists , err := colFile2Chunk . DocumentExists ( arangoCTX , id )
if err != nil {
return err
}
fmt . Printf ( "dboFile2ChunkExists %s = %v\n" , id , dboFile2ChunkExists )
if dboFile2ChunkExists {
var dboFile2Chunk common . DB_File2Chunk
_ , err = colFile2Chunk . ReadDocument ( arangoCTX , id , & dboFile2Chunk )
if err != nil {
return err
}
//FetchFromPoolPack
apparentPoolID := dboFile2Chunk . Chunk [ 6 : ]
fastCachePath := filepath . Join ( WORMCachePath , "worm" , apparentPoolID , id )
if checkFileExists ( fastCachePath ) {
fastCacheDir := filepath . Join ( WORMCachePath , "worm" , apparentPoolID )
srcLocalFile , err := os . Open ( fastCachePath )
if err != nil {
return err
}
defer srcLocalFile . Close ( )
if _ , err = io . Copy ( writer , srcLocalFile ) ; err != nil {
return err
}
// at this point acquire lock til end
p . lock . Lock ( )
defer p . lock . Unlock ( )
if _ , ok := p . WORMPools [ apparentPoolID ] ; ! ok {
// create virtual worm
fastCachePool , err := RestorePoolFromFolder ( fastCacheDir )
if err != nil {
return err
}
// fastCachePool
p . WORMPools [ fastCachePool . PoolID ] = fastCachePool
}
return nil
}
//dboFile2Chunk.Chunk <- which chunk i need to find
return p . FetchLoadWORM ( dboFile2Chunk . Chunk [ 6 : ] , id , writer )
}
return nil
}
func checkFileExists ( filePath string ) bool {
_ , error := os . Stat ( filePath )
//return !os.IsNotExist(err)
return ! errors . Is ( error , os . ErrNotExist )
}
func ( p * PoolMaster ) Store ( id string , workerID string , src io . Reader , targetSize int64 ) ( err error ) {
pool , err := p . GetCurrentWriteablePool ( workerID )
if err != nil {
return err
}
if pool . ReadOnly {
return fmt . Errorf ( "WTF Pool %s is ReadOnly but GetCurrentWriteablePool returned it" , pool . PoolID )
}
fmt . Printf ( "Store(%s)\n" , id )
fmt . Printf ( "Aquiring Lock for Store\n" )
p . lock . Lock ( )
defer p . lock . Unlock ( )
defer fmt . Printf ( "unlock Store\n" )
fmt . Printf ( "Aquired Lock success for Store\n" )
// figuring out paths
poolFolder := filepath . Join ( p . cachePath , "pool" , pool . PoolID )
destPath := filepath . Join ( poolFolder , id )
dst , err := os . Create ( destPath )
if err != nil {
_ = os . Remove ( destPath )
return err
}
defer dst . Close ( )
// copy from ioReader to file
writtenBytes , err := io . Copy ( dst , src )
if err != nil {
_ = os . Remove ( destPath )
return err
}
if writtenBytes != targetSize {
_ = os . Remove ( destPath )
return err
}
// check transferred data
dst . Seek ( 0 , 0 )
shaHasher := sha256 . New ( )
if _ , err := io . Copy ( shaHasher , dst ) ; err != nil {
return err
}
outputHash := fmt . Sprintf ( "%x" , shaHasher . Sum ( nil ) )
if outputHash != id {
return fmt . Errorf ( "Store() Sha256 Hash Mismatch" )
}
pool . itemCount ++
pool . items = append ( pool . items , id )
fmt . Printf ( "Current Pool %s, ItemCount = %d\n" , pool . PoolID , pool . itemCount )
entries , err := os . ReadDir ( poolFolder )
if err != nil {
return err
}
newItemCount := 0
for _ , e := range entries {
if ! e . IsDir ( ) {
//pool.items = append(pool.items, e.Name())
newItemCount ++
}
}
pool . itemCount = newItemCount
//fmt.Printf("Current Pool %s, Recounted ItemCount = %d\n", pool.PoolID, pool.itemCount)
if pool . itemCount >= PoolMaxItems {
pool . ReadOnly = true
}
return nil
}
func removeFromSlice ( slice [ ] * Pool , s int ) [ ] * Pool {
return append ( slice [ : s ] , slice [ s + 1 : ] ... )
}
func main ( ) {
err := InitDatabase ( )
if err != nil {
panic ( err )
}
poolMaster , err = NewPoolMaster ( PoolPathFinal , PoolPathTemp )
if err != nil {
panic ( err )
}
// Scan for local existing Pools
err = poolMaster . ScanForLocalPools ( )
if err != nil {
panic ( err )
}
go func ( ) {
for {
poolMaster . PackFullPools ( )
poolMaster . CleanWORMTemp ( )
time . Sleep ( time . Minute * 2 )
}
} ( )
{
poolChannel := make ( chan error ) // Channel to receive the results
var poolChannelCounter = 0
// Initial packing
for _ , localPool := range poolMaster . LocalPools {
if localPool . ReadOnly {
dboChunkExists , err := colChunk . DocumentExists ( arangoCTX , localPool . PoolID )
if err != nil {
panic ( err )
}
if ! dboChunkExists {
//spawn thread
go func ( localPool * Pool ) {
fmt . Printf ( "Packing Pool %s\n" , localPool . PoolID )
packResult , err := poolMaster . PackPool ( localPool . PoolID )
if err != nil {
panic ( err )
}
err = poolMaster . ImportPoolPackResult ( packResult )
if err != nil {
panic ( err )
}
err = poolMaster . MovePoolPackToWORM ( packResult )
if err != nil {
panic ( err )
}
err = poolMaster . WriteRecoveryFile ( packResult , localPool )
if err != nil {
panic ( err )
}
os . RemoveAll ( filepath . Join ( poolMaster . cachePath , "pool" , localPool . PoolID ) )
poolChannel <- nil
} ( localPool )
// increment total parallel counter
poolChannelCounter ++
}
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
}
// packResult.FileCount
}
// Waiting for them to finish
for i := 0 ; i < poolChannelCounter ; i ++ {
result := <- poolChannel
if result != nil {
fmt . Printf ( "PoolChannel Error: %v\n" , result )
}
}
}
e := echo . New ( )
e . Use ( middleware . RecoverWithConfig ( middleware . RecoverConfig {
StackSize : 1 << 10 , // 1 KB
} ) )
//e.Use(middleware.Logger())
e . GET ( "/" , func ( c echo . Context ) error {
return c . String ( http . StatusOK , "Hello, World!" )
} )
e . GET ( "/pmdump" , func ( c echo . Context ) error {
return c . JSON ( http . StatusOK , poolMaster )
} )
e . GET ( "/check/:id" , func ( c echo . Context ) error {
id := c . Param ( "id" )
fmt . Printf ( "/check/%s checking...\n" , id )
exists := poolMaster . Lookup ( id )
//fmt.Printf("%s exists = %s\n", id, strconv.FormatBool(exists))
if exists {
return c . JSON ( http . StatusAlreadyReported , exists )
}
return c . JSON ( http . StatusOK , exists )
} )
e . GET ( "/fetch/:id" , func ( c echo . Context ) error {
id := c . Param ( "id" )
//fmt.Printf("/fetch/%s fetching...\n", id)
exists := poolMaster . Lookup ( id )
if exists {
c . Response ( ) . Header ( ) . Set ( echo . HeaderContentType , echo . MIMEOctetStream )
c . Response ( ) . WriteHeader ( http . StatusOK )
err = poolMaster . Fetch ( id , c . Response ( ) )
if err != nil {
fmt . Printf ( "%v" , err )
return c . String ( http . StatusInternalServerError , err . Error ( ) )
}
c . Response ( ) . Flush ( )
} else {
fmt . Printf ( "/fetch/%s does not exist\n" , id )
return c . String ( http . StatusNotFound , "Not Found" )
}
return nil
} )
e . POST ( "/stash/:id/:size" , func ( c echo . Context ) error {
id := c . Param ( "id" )
sizeStr := c . Param ( "size" )
sizeVal , err := strconv . ParseInt ( sizeStr , 10 , 64 )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
workerID := c . FormValue ( "worker" )
infoJSON := c . FormValue ( "info" )
fileInfo := common . DB_File { }
err = json . Unmarshal ( [ ] byte ( infoJSON ) , & fileInfo )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusBadRequest , "Error" )
}
exists := poolMaster . Lookup ( id )
if exists {
fmt . Printf ( "/stash/%s exists already\n" , id )
file , err := c . FormFile ( "file" )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
formStream , err := file . Open ( )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
defer formStream . Close ( )
if _ , err = io . Copy ( io . Discard , formStream ) ; err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
return c . String ( http . StatusAlreadyReported , "Exists already" )
}
fmt . Printf ( "stashing %s" , id )
file , err := c . FormFile ( "file" )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
formStream , err := file . Open ( )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
defer formStream . Close ( )
err = poolMaster . Store ( id , workerID , formStream , sizeVal )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
fmt . Println ( "...stashed" )
fmt . Println ( fileInfo )
fileInfo . Created = time . Now ( )
_ , err = colFile . CreateDocument ( arangoCTX , fileInfo )
if err != nil {
fmt . Println ( err )
return c . String ( http . StatusExpectationFailed , "Error" )
}
return c . JSON ( http . StatusOK , true )
} )
e . Logger . Fatal ( e . Start ( ":13371" ) )
}