master
cheetah 2 years ago
parent 79cad442e2
commit 34207a199c

@ -62,9 +62,11 @@ type DB_GMA2File struct {
type DB_Chunk struct { type DB_Chunk struct {
ID string `json:"_key"` ID string `json:"_key"`
NotReady bool `json:"notReady"`
Finalized bool `json:"finalized"` Finalized bool `json:"finalized"`
ReadOnly bool `json:"readOnly"` ReadOnly bool `json:"readOnly"`
FileCount int `json:"fileCount"`
Size int64 `json:"size"` Size int64 `json:"size"`
Hash string `json:"hash"` Hash string `json:"hash"`
} }

@ -138,36 +138,47 @@ func (r *GMAReader) readFileMetadata() (GMAFileMetadata, error) {
if err != nil { if err != nil {
return metadata, err return metadata, err
} }
fileName = fileName[:len(fileName)-1] // remove nullbyte //fmt.Printf("bufio ReadString(byte(0)) = len(%d) data=%x\n", len(fileName), fileName)
/*if len(fileName) == 1 { //fucky retry
fileName, err := r.gmaStreamReader.ReadString(byte(0))
if err != nil {
return metadata, err
}
fmt.Printf("RETRY!! bufio ReadString(byte(0)) = len(%d) data=%x\n", len(fileName), fileName)
}*/
fileName = fileName[:len(fileName)-1] // remove nullbyte that causes go string fuckyness
r.cursorOffset += uint32(len(fileName) + 1) // Add name length + null byte r.cursorOffset += uint32(len(fileName) + 1) // Add name length + null byte
metadata.FileName = fileName metadata.FileName = fileName
// Read the file size // Read the file size
fileSizeBytes := make([]byte, 8) fileSizeBytes := make([]byte, 8)
_, err = r.gmaStreamReader.Read(fileSizeBytes) _, err = io.ReadFull(r.gmaStreamReader, fileSizeBytes)
if err != nil { if err != nil {
return metadata, err return metadata, err
} }
r.cursorOffset += 8 r.cursorOffset += 8
//fmt.Printf("bufio Read([]byte(4)]) fileSizeBytes = bytesRead(%d) data=%x\n", bytesRead, fileSizeBytes)
metadata.FileSize = int64(binary.LittleEndian.Uint64(fileSizeBytes)) metadata.FileSize = int64(binary.LittleEndian.Uint64(fileSizeBytes))
// Read the file crc // Read the file crc
crcBytes := make([]byte, 4) crcBytes := make([]byte, 4)
_, err = r.gmaStreamReader.Read(crcBytes) _, err = io.ReadFull(r.gmaStreamReader, crcBytes)
if err != nil { if err != nil {
return metadata, err return metadata, err
} }
r.cursorOffset += 4 r.cursorOffset += 4
//fmt.Printf("bufio Read([]byte(4)]) crcBytes = bytesRead(%d) data=%x\n", bytesRead, crcBytes)
metadata.CRC = binary.LittleEndian.Uint32(crcBytes) metadata.CRC = binary.LittleEndian.Uint32(crcBytes)
// Read the next type // Read the next type
nextTypeBytes := make([]byte, 4) nextTypeBytes := make([]byte, 4)
_, err = r.gmaStreamReader.Read(nextTypeBytes) _, err = io.ReadFull(r.gmaStreamReader, nextTypeBytes)
if err != nil { if err != nil {
return metadata, err return metadata, err
} }
r.cursorOffset += 4 r.cursorOffset += 4
metadata.NextType = binary.LittleEndian.Uint32(nextTypeBytes) metadata.NextType = binary.LittleEndian.Uint32(nextTypeBytes)
//fmt.Printf("bufio Read([]byte(4)]) nextTypeBytes = bytesRead(%d) data=%x\n", bytesRead, nextTypeBytes)
return metadata, nil return metadata, nil
} }

@ -114,6 +114,7 @@ func main() {
panic(err) panic(err)
} }
/*
err = colGMA.Truncate(arangoCTX) err = colGMA.Truncate(arangoCTX)
if err != nil { if err != nil {
panic(err) panic(err)
@ -125,13 +126,14 @@ func main() {
err = colGMA2File.Truncate(arangoCTX) err = colGMA2File.Truncate(arangoCTX)
if err != nil { if err != nil {
panic(err) panic(err)
} }*/
// /mnt/worksucc/san2/gma/2/1/2/3/2123406190.1591573904.gma // /mnt/worksucc/san2/gma/2/1/2/3/2123406190.1591573904.gma
//fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin") //fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin")
//gma, err := gma.NewReader("2143898000.1593250551.bin.gma") //gma, err := gma.NewReader("2143898000.1593250551.bin.gma")
folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/" folderPath := "/mnt/SC9000/TemporaryTestingShit/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
//0 //0
entries, err := os.ReadDir(folderPath) entries, err := os.ReadDir(folderPath)
if err != nil { if err != nil {
@ -140,22 +142,26 @@ func main() {
skipBla := false skipBla := false
for _, e := range entries { for _, e := range entries {
if !e.IsDir() && skipBla == true { if !e.IsDir() && skipBla == true {
if e.Name() == "2548863549.1626463997.gma" { if e.Name() == "2547463094.1626322945.gma" {
skipBla = false skipBla = false
} } else {
continue continue
} }
}
if !e.IsDir() { if !e.IsDir() {
err = ProcessGMA(filepath.Join(folderPath, e.Name())) err = ProcessGMA(filepath.Join(folderPath, e.Name()))
if err != nil { if err != nil {
fmt.Printf("\nERROR: %v\n", err) fmt.Printf("\nERROR: %v\n", err)
//panic(err) //panic(err)
continue
} }
os.Rename(filepath.Join(folderPath, e.Name()), filepath.Join(folderPathTarget, e.Name()))
} }
} }
} }
func undoBatch(gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { func undoBatch(undoBatch *bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) {
fmt.Printf("undoBatch(%x, %s)\n", undoBatch, gmaID)
_, err = colGMA.RemoveDocument(arangoCTX, gmaID) _, err = colGMA.RemoveDocument(arangoCTX, gmaID)
if err != nil { if err != nil {
return err return err
@ -174,6 +180,7 @@ func ProcessGMA(filePath string) (err error) {
var ( var (
fileIDs []string fileIDs []string
gma2FileIDs []string gma2FileIDs []string
failedProcessing = true
) )
dboGMA := common.DB_GMA{} dboGMA := common.DB_GMA{}
dboGMA.BatchID = uuid.NewV4().String() // use this for rapid unscheduled dissassembly dboGMA.BatchID = uuid.NewV4().String() // use this for rapid unscheduled dissassembly
@ -187,6 +194,9 @@ func ProcessGMA(filePath string) (err error) {
dboGMA.StatModTime = fileStat.ModTime() dboGMA.StatModTime = fileStat.ModTime()
dboGMA.GMASize = fileStat.Size() dboGMA.GMASize = fileStat.Size()
if dboGMA.GMASize < 200 {
return fmt.Errorf("GMA File too small, skipping")
}
fmt.Printf("Opening %s\n", filePath) fmt.Printf("Opening %s\n", filePath)
gmaReader, err := gma.NewReader(filePath) gmaReader, err := gma.NewReader(filePath)
if err != nil { if err != nil {
@ -217,6 +227,9 @@ func ProcessGMA(filePath string) (err error) {
return err return err
} }
dboGMA.Header = header dboGMA.Header = header
fmt.Printf("AddonVersion=%d\n", header.AddonVersion)
fmt.Printf("FormatVersion=%d\n", header.FormatVersion)
fmt.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte)
//fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset()) //fmt.Printf("r.cursorOffset = %d\n", gmaReader.GetOffset())
firstType, files, err := gmaReader.ReadFiles() firstType, files, err := gmaReader.ReadFiles()
if err != nil { if err != nil {
@ -233,6 +246,9 @@ func ProcessGMA(filePath string) (err error) {
fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber) fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber)
if file.NextType > uint32(file.FileNumber+10) { // Something is fucked if file.NextType > uint32(file.FileNumber+10) { // Something is fucked
fmt.Printf("Current Cursor %d", gmaReader.GetOffset()) fmt.Printf("Current Cursor %d", gmaReader.GetOffset())
for _, otherFile := range files[file.FileNumber:] {
fmt.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber)
}
return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber) return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber)
} }
destPath := filepath.Join(gmaTempPath, "contents", file.FileName) destPath := filepath.Join(gmaTempPath, "contents", file.FileName)
@ -298,6 +314,8 @@ func ProcessGMA(filePath string) (err error) {
dboGMA.ProcessingEnd = time.Now() dboGMA.ProcessingEnd = time.Now()
dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds() dboGMA.ProcessingDuration = dboGMA.ProcessingEnd.Sub(dboGMA.ProcessingStart).Milliseconds()
// if anything fails, lets undo the documents we imported
defer undoBatch(&failedProcessing, dboGMA.ID, fileIDs, gma2FileIDs)
// TODO: Calculate dboGMA.OptimizedSize // TODO: Calculate dboGMA.OptimizedSize
dboGMA.OptimizedSize = 0 dboGMA.OptimizedSize = 0
_, err = colGMA.CreateDocument(arangoCTX, dboGMA) _, err = colGMA.CreateDocument(arangoCTX, dboGMA)
@ -420,9 +438,12 @@ func ProcessGMA(filePath string) (err error) {
return err return err
} }
} }
var httpClient *http.Client = http.DefaultClient var httpClient *http.Client = &http.Client{
Timeout: 15 * time.Minute,
}
for _, dboGMA2File := range dboGMA2Files { for _, dboGMA2File := range dboGMA2Files {
//fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID) fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID)
resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID)) resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID))
if err != nil { if err != nil {
return err return err
@ -434,7 +455,7 @@ func ProcessGMA(filePath string) (err error) {
} }
} }
gmaWriter.FileHandle.Seek(0, 2) gmaWriter.FileHandle.Seek(0, 2)
//fmt.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC) fmt.Printf("Writing Footer CRC %d\n\n", dboGMA.FooterAddonCRC)
gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC) gmaWriter.WriteFooterCRC(dboGMA.FooterAddonCRC)
gmaWriter.FileHandle.Seek(0, 0) gmaWriter.FileHandle.Seek(0, 0)
@ -471,5 +492,6 @@ func ProcessGMA(filePath string) (err error) {
return err return err
} }
failedProcessing = false
return nil return nil
} }

@ -0,0 +1 @@
package main

@ -3,7 +3,9 @@ package main
import ( import (
"archive/tar" "archive/tar"
"context" "context"
"crypto/sha256"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"io" "io"
"log" "log"
@ -12,9 +14,11 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"git.cheetah.cat/worksucc/gma-puzzles/common"
adriver "github.com/arangodb/go-driver" adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http" ahttp "github.com/arangodb/go-driver/http"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
@ -32,13 +36,15 @@ type Pool struct {
Finalized bool `json:"finalized"` Finalized bool `json:"finalized"`
ReadOnly bool `json:"readOnly"` ReadOnly bool `json:"readOnly"`
Size uint64 `json:"size"` Size uint64 `json:"size"`
folder string `json:"-"` //folder string `json:"-"`
itemCount int itemCount int
items []string items []string
wormMode bool
filePath string
file *os.File file *os.File
tarWriter *tar.Writer //tarWriter *tar.Writer
tarReader *tar.Reader tarReader *tar.Reader
} }
type PoolFile struct { type PoolFile struct {
@ -54,11 +60,154 @@ type PoolMaster struct {
LocalPools []Pool LocalPools []Pool
FullPools []Pool FullPools []Pool
WORMPools map[string]Pool
}
type PoolPackResult struct {
PoolID string
Files []string
FileCount int
Size int64
Hash string
outputFileName string
}
var (
arangoDB adriver.Database
arangoCTX context.Context
colChunk adriver.Collection
colFile adriver.Collection
colFile2Chunk adriver.Collection
// PoolMaster
poolMaster PoolMaster
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
log.Println("connectDB:", "Starting Connection Process...")
// Retry Loop for Failed Connections
for i := 0; i < 6; i++ {
if i == 5 {
return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i)
} else if i > 0 {
time.Sleep(30 * time.Second)
}
// Connect to ArangoDB URL
conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{
Endpoints: []string{baseURL},
TLSConfig: &tls.Config{ /*...*/ },
})
if err != nil {
log.Println("connectDB:", "Cannot Connect to ArangoDB!", err)
continue
}
// Connect Driver to User
client, err := adriver.NewClient(adriver.ClientConfig{
Connection: conn,
Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD),
})
if err != nil {
log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err)
continue
}
// Create Context for Database Access
ctx = context.Background()
driver, err = client.Database(ctx, arangoDatabase)
if err != nil {
log.Println("connectDB:", "Cannot Load ArangoDB Database!", err)
continue
}
log.Println("connectDB:", "Connection Sucessful!")
return driver, ctx, nil
}
return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?")
}
func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator")
colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
if err != nil {
return err
}
colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil {
return err
}
colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map")
if err != nil {
return err
}
return nil
}
func (p *Pool) OpenTar() (err error) {
p.wormMode = true
outputDir := filepath.Join(poolMaster.cachePath, "worm", p.PoolID)
err = os.MkdirAll(outputDir, os.ModePerm)
if err != nil {
return err
}
p.file, err = os.Open(p.filePath)
if err != nil {
return err
}
p.items = []string{}
p.tarReader = tar.NewReader(p.file)
for {
header, err := p.tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
path := filepath.Join(outputDir, header.Name)
info := header.FileInfo()
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode())
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, p.tarReader)
if err != nil {
return err
}
p.items = append(p.items, header.Name)
fmt.Print(".")
}
return nil
}
func (p *Pool) Fetch(id string, writer io.Writer) (err error) {
for _, poolItem := range p.items {
if poolItem == id {
fmt.Printf("Fetch WORMPool %s\n", id)
poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
defer srcLocalFile.Close()
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
return fmt.Errorf("%s not found", id)
} }
func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) {
poolMaster.finalPath = finalPath poolMaster.finalPath = finalPath
poolMaster.cachePath = cachePath poolMaster.cachePath = cachePath
poolMaster.WORMPools = make(map[string]Pool)
//poolMaster.lock = sync.Mutex{} //poolMaster.lock = sync.Mutex{}
destPath := filepath.Join(poolMaster.cachePath, "pool") destPath := filepath.Join(poolMaster.cachePath, "pool")
@ -66,6 +215,13 @@ func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, e
if err != nil { if err != nil {
return poolMaster, err return poolMaster, err
} }
destPath = filepath.Join(poolMaster.cachePath, "worm")
err = os.MkdirAll(destPath, os.ModePerm)
if err != nil {
return poolMaster, err
}
err = os.MkdirAll(poolMaster.finalPath, os.ModePerm) err = os.MkdirAll(poolMaster.finalPath, os.ModePerm)
if err != nil { if err != nil {
return poolMaster, err return poolMaster, err
@ -89,12 +245,14 @@ func (p *PoolMaster) NewPool() (pool *Pool, err error) {
} }
func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) { func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) {
//fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems { if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
p.CurrentPool.ReadOnly = true p.CurrentPool.ReadOnly = true
p.FullPools = append(p.FullPools, *p.CurrentPool) p.FullPools = append(p.FullPools, *p.CurrentPool)
// queue for compression // queue for compression
fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one", p.CurrentPool.PoolID, p.CurrentPool.itemCount)
p.CurrentPool = nil p.CurrentPool = nil
} }
if p.CurrentPool == nil { if p.CurrentPool == nil {
@ -135,7 +293,37 @@ func (p *PoolMaster) ScanForLocalPools() (err error) {
} }
for _, e := range entries { for _, e := range entries {
if e.IsDir() { if e.IsDir() {
fmt.Printf("Scanning For Local Pools, found %s:\n", e.Name()) fmt.Printf("Scanning For Local Pools, found %s:", e.Name())
tarFinalPath := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", e.Name()))
_, err = os.Stat(tarFinalPath)
finalPathExists := false
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
}
dboChunkExists, err := colChunk.DocumentExists(arangoCTX, e.Name())
if err != nil {
return err
}
if dboChunkExists {
var dboChunk common.DB_Chunk
_, err := colChunk.ReadDocument(arangoCTX, e.Name(), &dboChunk)
if err != nil {
return err
}
finalPathExists = dboChunk.Finalized && dboChunk.ReadOnly && !dboChunk.NotReady
fmt.Printf("is in DB readonly %v finalized %v notready %v itemCount=%d size=%d hash=%s\n", dboChunk.ReadOnly, dboChunk.Finalized, dboChunk.NotReady, dboChunk.FileCount, dboChunk.Size, dboChunk.Hash)
if finalPathExists {
fmt.Println("skipping")
}
}
if finalPathExists {
continue
}
poolDirPath := filepath.Join(p.cachePath, "pool", e.Name()) poolDirPath := filepath.Join(p.cachePath, "pool", e.Name())
restoredPool, err := RestorePoolFromFolder(poolDirPath) restoredPool, err := RestorePoolFromFolder(poolDirPath)
if err != nil { if err != nil {
@ -147,6 +335,189 @@ func (p *PoolMaster) ScanForLocalPools() (err error) {
} }
return nil return nil
} }
// Pool Packing
func (p *PoolMaster) ImportPoolPackResult(packResult PoolPackResult) (err error) {
startTime := time.Now()
dboChunk := common.DB_Chunk{
ID: packResult.PoolID,
Hash: packResult.Hash,
Size: packResult.Size,
FileCount: packResult.FileCount,
NotReady: true,
ReadOnly: true,
Finalized: true,
}
var dboChunk2File []common.DB_File2Chunk
for _, prFile := range packResult.Files {
dboChunk2File = append(dboChunk2File, common.DB_File2Chunk{
ID: prFile,
File: fmt.Sprintf("file/%s", prFile),
Chunk: fmt.Sprintf("chunk/%s", dboChunk.ID),
})
}
_, err = colChunk.CreateDocument(arangoCTX, dboChunk)
if err != nil {
return err
}
chunkSize := 500
for {
if len(dboChunk2File) == 0 {
break
}
// necessary check to avoid slicing beyond
// slice capacity
if len(dboChunk2File) < chunkSize {
chunkSize = len(dboChunk2File)
}
_, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize])
//metaSlice, errorSlice, _ := colFile2Chunk.CreateDocuments(arangoCTX, dboChunk2File[0:chunkSize])
//fmt.Println("Metaslice")
//fmt.Println(metaSlice)
/*for _, meta := range metaSlice {
if !meta.ID.IsEmpty() {
newUnknownFiles = append(newUnknownFiles, meta.Key)
fileIDs = append(fileIDs, meta.Key)
}
}*/
for _, createError := range errorSlice {
if createError != nil && strings.Contains(createError.Error(), "unique constraint violated - in index primary of type primary over '_key'") {
} else if createError != nil {
return createError
}
}
dboChunk2File = dboChunk2File[chunkSize:]
}
fmt.Printf("ImportPool Duration %dms\n", time.Since(startTime).Milliseconds())
return nil
}
func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) {
startTime := time.Now()
targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", packResult.PoolID))
os.Rename(packResult.outputFileName, targetFileName)
tarFileCheck, err := os.Open(targetFileName)
if err != nil {
return err
}
defer tarFileCheck.Close()
shaHasher := sha256.New()
_, err = io.Copy(shaHasher, tarFileCheck)
if err != nil {
return err
}
wormHash := fmt.Sprintf("%x", shaHasher.Sum(nil))
fmt.Printf("WORMTarPool hash is %s , old is %s\n", wormHash, packResult.Hash)
if wormHash != packResult.Hash {
os.Remove(targetFileName)
return err
}
fmt.Printf("MoveWORM Duration %dms\n", time.Since(startTime).Milliseconds())
return nil
}
func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err error) {
startTime := time.Now()
packResult.PoolID = poolID
p.lock.Lock()
defer p.lock.Unlock()
packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID))
tarFile, err := os.Create(packResult.outputFileName)
if err != nil {
return packResult, err
}
defer tarFile.Close()
tw := tar.NewWriter(tarFile)
defer tw.Close()
entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool", poolID))
if err != nil {
return packResult, err
}
//fmt.Printf("len(entries) == %d\n", len(entries))
if len(entries) != PoolMaxItems {
return packResult, fmt.Errorf("Pool contains %d items, but there should be %d", len(entries), PoolMaxItems)
}
for _, e := range entries {
originalPath := filepath.Join(p.cachePath, "pool", poolID, e.Name())
file, err := os.Open(originalPath)
if err != nil {
return packResult, err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return packResult, err
}
tarFileHeader, err := tar.FileInfoHeader(info, info.Name())
if err != nil {
return packResult, err
}
err = tw.WriteHeader(tarFileHeader)
if err != nil {
return packResult, err
}
_, err = io.Copy(tw, file)
if err != nil {
return packResult, err
}
packResult.FileCount++
packResult.Files = append(packResult.Files, e.Name())
}
err = tw.Flush()
if err != nil {
return packResult, err
}
err = tarFile.Close()
if err != nil {
return packResult, err
}
// re-open and check
tarFileCheck, err := os.Open(packResult.outputFileName)
if err != nil {
return packResult, err
}
defer tarFileCheck.Close()
shaHasher := sha256.New()
hashedBytes, err := io.Copy(shaHasher, tarFileCheck)
if err != nil {
return packResult, err
}
packResult.Hash = fmt.Sprintf("%x", shaHasher.Sum(nil))
fmt.Printf("PackPoolTar hash is %s\n", packResult.Hash)
packFileStats, err := tarFileCheck.Stat()
if err != nil {
return packResult, err
}
packResult.Size = packFileStats.Size()
if hashedBytes != packResult.Size {
return packResult, fmt.Errorf("WORM Copy HashedBytes %d != FileSize %d", hashedBytes, packResult.Size)
}
fmt.Printf("PackPool Duration %dms\n", time.Since(startTime).Milliseconds())
return packResult, nil
}
func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
// p.NewPool() // p.NewPool()
for _, localPool := range p.LocalPools { for _, localPool := range p.LocalPools {
@ -156,22 +527,31 @@ func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
} }
return p.NewPool() return p.NewPool()
} }
func (p *PoolMaster) Lookup(id string) (exists bool) { func (p *PoolMaster) Lookup(id string) (exists bool) {
if p.CurrentPool != nil { // TODO: DB check
if p.CurrentPool != nil { // CurrentPool
for _, poolItem := range p.CurrentPool.items { for _, poolItem := range p.CurrentPool.items {
if poolItem == id { if poolItem == id {
return true return true
} }
} }
} }
for _, fullPool := range p.FullPools { for _, wormPool := range p.WORMPools { // WORM Pools
for _, poolItem := range wormPool.items {
if poolItem == id {
return true
}
}
}
for _, fullPool := range p.FullPools { // Full Pools
for _, poolItem := range fullPool.items { for _, poolItem := range fullPool.items {
if poolItem == id { if poolItem == id {
return true return true
} }
} }
} }
for _, localPool := range p.LocalPools { for _, localPool := range p.LocalPools { // Local Pools
for _, poolItem := range localPool.items { for _, poolItem := range localPool.items {
if poolItem == id { if poolItem == id {
return true return true
@ -179,8 +559,59 @@ func (p *PoolMaster) Lookup(id string) (exists bool) {
} }
} }
// TODO : DB Check // TODO : DB Check
// ArangoDB
dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id)
if err != nil {
return false return false
} }
return dboFile2ChunkExists
}
func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writer) (err error) {
fmt.Printf("FetchLoadWORM(chunkID %s, fileID %s, ...)\n", chunkID, fileID)
// search within loaded worm-pools
for wormID, wormPool := range p.WORMPools {
if wormID != chunkID {
continue
}
for _, poolItem := range wormPool.items {
if poolItem == fileID {
fmt.Printf("Fetch WORMPool %s file %s\n", wormID, fileID)
return wormPool.Fetch(fileID, writer)
}
}
break
}
// else load wormPool into disk-cache extract to "worm"
// wormMode
p.lock.Lock()
defer p.lock.Unlock()
var dboChunk common.DB_Chunk
_, err = colChunk.ReadDocument(arangoCTX, chunkID, &dboChunk)
if err != nil {
return err
}
loadedWormPool := Pool{
PoolID: dboChunk.ID,
Size: uint64(dboChunk.Size),
ReadOnly: dboChunk.ReadOnly,
Finalized: dboChunk.Finalized,
filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", dboChunk.ID)),
}
fmt.Println("initialized loadedWormPool, Opening tar...")
err = loadedWormPool.OpenTar()
if err != nil {
return err
}
fmt.Println("extracted")
p.WORMPools[loadedWormPool.PoolID] = loadedWormPool
return loadedWormPool.Fetch(fileID, writer)
//return nil
}
func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) { func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
if p.CurrentPool != nil { if p.CurrentPool != nil {
for _, poolItem := range p.CurrentPool.items { for _, poolItem := range p.CurrentPool.items {
@ -203,20 +634,24 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
} }
} }
} }
for _, wormPool := range p.WORMPools {
for _, poolItem := range wormPool.items {
if poolItem == id {
fmt.Printf("Fetch WORMPool %s file %s\n", wormPool.PoolID, id)
return wormPool.Fetch(id, writer)
}
}
}
for _, fullPool := range p.FullPools { for _, fullPool := range p.FullPools {
for _, poolItem := range fullPool.items { for _, poolItem := range fullPool.items {
if poolItem == id { if poolItem == id {
fmt.Printf("Fetch FullPool %s\n", id) fmt.Printf("Fetch FullPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id) poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id)
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", fullPool.PoolID, poolItem)
srcLocalFile, err := os.Open(poolLocalFilePath) srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil { if err != nil {
return err return err
} }
//fmt.Println("Closer")
defer srcLocalFile.Close() defer srcLocalFile.Close()
//fmt.Println("io.Copy")
if _, err = io.Copy(writer, srcLocalFile); err != nil { if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err return err
} }
@ -229,15 +664,11 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
if poolItem == id { if poolItem == id {
fmt.Printf("Fetch LocalPool %s\n", id) fmt.Printf("Fetch LocalPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", localPool.PoolID, id) poolLocalFilePath := filepath.Join(p.cachePath, "pool", localPool.PoolID, id)
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", localPool.PoolID, poolItem)
srcLocalFile, err := os.Open(poolLocalFilePath) srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil { if err != nil {
return err return err
} }
//fmt.Println("Closer")
defer srcLocalFile.Close() defer srcLocalFile.Close()
//fmt.Println("io.Copy")
if _, err = io.Copy(writer, srcLocalFile); err != nil { if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err return err
} }
@ -245,6 +676,24 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
} }
} }
} }
// ArangoDB
dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id)
if err != nil {
return err
}
fmt.Printf("dboFile2ChunkExists %s = %v\n", id, dboFile2ChunkExists)
if dboFile2ChunkExists {
var dboFile2Chunk common.DB_File2Chunk
_, err = colFile2Chunk.ReadDocument(arangoCTX, id, &dboFile2Chunk)
if err != nil {
return err
}
//FetchFromPoolPack
//dboFile2Chunk.Chunk <- which chunk i need to find
return p.FetchLoadWORM(dboFile2Chunk.Chunk[6:], id, writer)
}
return nil return nil
} }
func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err error) { func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err error) {
@ -253,8 +702,15 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro
if err != nil { if err != nil {
return err return err
} }
if pool.ReadOnly {
return fmt.Errorf("WTF Pool %s is ReadOnly but GetCurrentWriteablePool returned it", pool.PoolID)
}
fmt.Printf("Store(%s)\n", id)
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
// figuring out paths
poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID) poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID)
destPath := filepath.Join(poolFolder, id) destPath := filepath.Join(poolFolder, id)
dst, err := os.Create(destPath) dst, err := os.Create(destPath)
@ -263,7 +719,7 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro
return err return err
} }
defer dst.Close() defer dst.Close()
// copy from ioReader to file
writtenBytes, err := io.Copy(dst, src) writtenBytes, err := io.Copy(dst, src)
if err != nil { if err != nil {
_ = os.Remove(destPath) _ = os.Remove(destPath)
@ -273,6 +729,18 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro
_ = os.Remove(destPath) _ = os.Remove(destPath)
return err return err
} }
// check transferred data
dst.Seek(0, 0)
shaHasher := sha256.New()
if _, err := io.Copy(shaHasher, dst); err != nil {
return err
}
outputHash := fmt.Sprintf("%x", shaHasher.Sum(nil))
if outputHash != id {
return fmt.Errorf("Store() Sha256 Hash Mismatch")
}
pool.itemCount++ pool.itemCount++
pool.items = append(pool.items, id) pool.items = append(pool.items, id)
fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
@ -297,99 +765,40 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro
return nil return nil
} }
var ( func main() {
arangoDB adriver.Database err := InitDatabase()
arangoCTX context.Context
colChunk adriver.Collection
colFile adriver.Collection
colFile2Chunk adriver.Collection
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
log.Println("connectDB:", "Starting Connection Process...")
// Retry Loop for Failed Connections
for i := 0; i < 6; i++ {
if i == 5 {
return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i)
} else if i > 0 {
time.Sleep(30 * time.Second)
}
// Connect to ArangoDB URL
conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{
Endpoints: []string{baseURL},
TLSConfig: &tls.Config{ /*...*/ },
})
if err != nil {
log.Println("connectDB:", "Cannot Connect to ArangoDB!", err)
continue
}
// Connect Driver to User
client, err := adriver.NewClient(adriver.ClientConfig{
Connection: conn,
Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD),
})
if err != nil {
log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err)
continue
}
// Create Context for Database Access
ctx = context.Background()
driver, err = client.Database(ctx, arangoDatabase)
if err != nil { if err != nil {
log.Println("connectDB:", "Cannot Load ArangoDB Database!", err) panic(err)
continue
}
log.Println("connectDB:", "Connection Sucessful!")
return driver, ctx, nil
}
return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?")
} }
func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator")
colChunk, err = arangoDB.Collection(arangoCTX, "chunk") poolMaster, err = NewPoolMaster(PoolPathFinal, PoolPathTemp)
if err != nil {
return err
}
colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil { if err != nil {
return err panic(err)
} }
colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") // Scan for local existing Pools
err = poolMaster.ScanForLocalPools()
if err != nil { if err != nil {
return err panic(err)
}
return nil
} }
var ( for _, localPool := range poolMaster.LocalPools {
poolMaster PoolMaster if localPool.ReadOnly {
//poolFiles = []PoolFile{} fmt.Printf("Packing Pool %s\n", localPool.PoolID)
//seq = 1 packResult, err := poolMaster.PackPool(localPool.PoolID)
//lock = sync.Mutex{}
)
func main() {
err := InitDatabase()
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = poolMaster.ImportPoolPackResult(packResult)
poolMaster, err = NewPoolMaster(PoolPathFinal, PoolPathTemp)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// Scan for local existing Pools err = poolMaster.MovePoolPackToWORM(packResult)
err = poolMaster.ScanForLocalPools()
if err != nil { if err != nil {
panic(err) panic(err)
} }
}
// packResult.FileCount
}
e := echo.New() e := echo.New()
//e.Use(middleware.Logger()) //e.Use(middleware.Logger())

Loading…
Cancel
Save