commit before server-exitus lefuckalitis

master
cheetah 1 year ago
parent 72082994e4
commit 4d6606d392

@ -40,6 +40,7 @@ type DB_File struct {
Size int64 `json:"size"` Size int64 `json:"size"`
CRC uint32 `json:"crc"` CRC uint32 `json:"crc"`
Hash string `json:"hash"` Hash string `json:"hash"`
G2FRef string `json:"-"`
} }
type DB_GMA2File struct { type DB_GMA2File struct {
@ -79,6 +80,11 @@ type DB_File2Chunk struct {
File string `json:"_from"` File string `json:"_from"`
} }
type JSON_GMARecovery struct {
GMA DB_GMA `json:"gma"`
Refs []DB_GMA2File `json:"refs"`
}
func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte, workerID string) (err error) { func MultipartUpload(client *http.Client, url string, path string, jsonBytes []byte, workerID string) (err error) {
//fmt.Printf("\nMultipartUpload(%s, %s)\n", url, path) //fmt.Printf("\nMultipartUpload(%s, %s)\n", url, path)
file, err := os.Open(path) file, err := os.Open(path)

@ -119,7 +119,7 @@ func bla() error {
sem := common.NewSemaphore(ConcurrencyLimit) sem := common.NewSemaphore(ConcurrencyLimit)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
entries, err := os.ReadDir("/mnt/SC9000/storagePools/") entries, err := os.ReadDir("/mnt/SC9000/storagePool2/")
if err != nil { if err != nil {
return err return err
} }
@ -156,6 +156,7 @@ func bla() error {
}() }()
for _, chunkName := range chunkNames { for _, chunkName := range chunkNames {
wg.Add(1) wg.Add(1)
TotalTaskCount <- 1 TotalTaskCount <- 1
go func(job string, wg *sync.WaitGroup) (err error) { go func(job string, wg *sync.WaitGroup) (err error) {
sem.Acquire() // Wait for worker to have slot open sem.Acquire() // Wait for worker to have slot open
@ -164,16 +165,15 @@ func bla() error {
defer func() { defer func() {
DoneTaskCount <- 1 DoneTaskCount <- 1
}() }()
//fmt.Printf("Scanning For Local Pools, found %s:", job) //fmt.Printf("Scanning For Local Pools, found %s:", job)
tarFinalPath := filepath.Join("/mnt/SC9000/storagePools/", job) tarFinalPath := filepath.Join("/mnt/SC9000/storagePool2/", job)
_, err = os.Stat(tarFinalPath) _, err = os.Stat(tarFinalPath)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return err return err
} }
parts := strings.Split(job, ".") parts := strings.Split(job, ".")
jsonPath := filepath.Join("/mnt/SC9000/storagePools/", fmt.Sprintf("%s.json", parts[0])) jsonPath := filepath.Join("/mnt/SC9000/storagePool2/", fmt.Sprintf("%s.json", parts[0]))
_, err = os.Stat(jsonPath) _, err = os.Stat(jsonPath)
if err != nil { if err != nil {
log.Println(err) log.Println(err)

@ -149,9 +149,71 @@ func main() {
modeIngress(*folderPathP, *skipNameP) modeIngress(*folderPathP, *skipNameP)
case "rebuild": case "rebuild":
flag.Parse() flag.Parse()
modeRebuild(*rebuildIDP) err = modeRebuild(*rebuildIDP)
if err != nil {
panic(err)
}
case "test":
modeTest()
}
}
func modeTest() (err error) {
filePath := "/mnt/worksucc/san1/gma/2/5/0/0/2500735732.1623884796.gma"
gmaReader, err := gma.NewReader(filePath)
if err != nil {
return err
}
defer gmaReader.Close()
hash, err := gmaReader.GetSHA256()
if err != nil {
return err
}
fmt.Printf("GMA Hash: %s\n", hash)
gmaReader.FileHandle.Seek(0, 0)
header, err := gmaReader.ReadHeader()
if err != nil {
return err
}
log.Printf("Name=%s\n", header.Title)
log.Printf("Desc=%s\n", header.Description)
log.Printf("AddonVersion=%d\n", header.AddonVersion)
log.Printf("FormatVersion=%d\n", header.FormatVersion)
log.Printf("FormatVersionDiscardByte=%d\n", header.FormatVersionDiscardByte)
firstType, files, err := gmaReader.ReadFiles()
if err != nil {
return err
} }
fmt.Printf("firstType = %d\n", firstType)
for _, file := range files {
if file.FileSize < 0 { // Something is fucked
return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber)
}
//fmt.Printf("%s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", file.FileName, file.CRC, file.Offset, file.FileSize, file.NextType, file.FileNumber)
if file.NextType > uint32(file.FileNumber+10) { // Something is fucked
/*log.Printf("Current Cursor %d", gmaReader.GetOffset())
for _, otherFile := range files[file.FileNumber:] {
log.Printf("OTHERFILE %s CRC: %d Offset: %d Size: %d NextType: %d FileNumber: %d\n", otherFile.FileName, otherFile.CRC, otherFile.Offset, otherFile.FileSize, otherFile.NextType, otherFile.FileNumber)
}*/
return fmt.Errorf("GMA Header corrupted, NextType %d, FileNumber %d", file.NextType, file.FileNumber)
} }
extractMeta, err := gmaReader.ExtractFileTo(file, io.Discard)
if err != nil {
return err
}
if extractMeta.ExtractedCRC != extractMeta.OriginalMeta.CRC {
fmt.Printf("gma(%s) checksum in meta (%d) differs from read (%d) [%s]\n", filePath, extractMeta.OriginalMeta.CRC, extractMeta.ExtractedCRC, extractMeta.OriginalMeta.FileName)
}
fmt.Println("#%d = %s -%d bytes [%s]\n", extractMeta.OriginalMeta.FileNumber, extractMeta.OriginalMeta.FileName, extractMeta.OriginalMeta.FileSize, extractMeta.ExtractedSHA256)
//fmt.Printf("Extra
}
return nil
}
func modeRebuild(id string) (err error) { func modeRebuild(id string) (err error) {
var ( var (
dboGMA common.DB_GMA dboGMA common.DB_GMA
@ -545,9 +607,10 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
Size: file.FileSize, Size: file.FileSize,
Hash: extractMeta.ExtractedSHA256, Hash: extractMeta.ExtractedSHA256,
Extension: filepath.Ext(file.FileName), Extension: filepath.Ext(file.FileName),
G2FRef: fmt.Sprintf("%s_%s_%d", dboGMA.ID, extractMeta.ExtractedSHA256, extractMeta.OriginalMeta.FileNumber), // reference for the GMA2File Thing so that we can find it agian in the list
} }
dboGMA2File := common.DB_GMA2File{ dboGMA2File := common.DB_GMA2File{
ID: fmt.Sprintf("%s_%s", dboGMA.ID, extractMeta.ExtractedSHA256), ID: dboFile.G2FRef,
BatchID: dboGMA.BatchID, BatchID: dboGMA.BatchID,
File: fmt.Sprintf("file/%s", extractMeta.ExtractedSHA256), File: fmt.Sprintf("file/%s", extractMeta.ExtractedSHA256),
GMA: fmt.Sprintf("gma/%s", dboGMA.ID), GMA: fmt.Sprintf("gma/%s", dboGMA.ID),
@ -562,7 +625,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
LocalFileName: destPath, LocalFileName: destPath,
UploadID: extractMeta.ExtractedSHA256, UploadID: extractMeta.ExtractedSHA256,
} }
//fmt.Println(dboFile) //fmt.Println(dboGMA2File)
// Add fileIDs from new unknowns // Add fileIDs from new unknowns
dboFiles = append(dboFiles, dboFile) dboFiles = append(dboFiles, dboFile)
//fmt.Println(dboGMA2File) //fmt.Println(dboGMA2File)
@ -638,7 +701,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
trackerUpload.UpdateMessage(fmt.Sprintf("Skipping %s", niceName)) trackerUpload.UpdateMessage(fmt.Sprintf("Skipping %s", niceName))
for _, dboGMA2File := range dboGMA2Files { for _, dboGMA2File := range dboGMA2Files {
if dboFileID == dboGMA2File.File { if dboFile.G2FRef == dboGMA2File.ID {
// Create File and dboGMA2File Object // Create File and dboGMA2File Object
exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID) exists, err := colGMA2File.DocumentExists(arangoCTX, dboGMA2File.ID)
if err != nil { if err != nil {
@ -657,6 +720,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
trackerUpload.MarkAsErrored() trackerUpload.MarkAsErrored()
return err return err
} }
//} else {
//log.Println("already exists... weird")
} }
break break
} }
@ -685,6 +750,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
log.Println(err) log.Println(err)
if strings.Contains(err.Error(), "cannot assign requested address") { if strings.Contains(err.Error(), "cannot assign requested address") {
uploadSuccess = false uploadSuccess = false
} else if strings.Contains(err.Error(), "refused") {
panic(err)
} else { } else {
log.Println("oopsie") log.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
@ -711,6 +778,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
trackerUpload.MarkAsErrored() trackerUpload.MarkAsErrored()
return err return err
} }
//} else {
//log.Println("already exists... weird")
} }
trackerUpload.Increment(1) trackerUpload.Increment(1)
break break
@ -732,6 +801,32 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
// TODO : write new gma from arangoinfo // TODO : write new gma from arangoinfo
// TODO : compare hashes // TODO : compare hashes
{ {
var (
rw_dboGMA2Files []common.DB_GMA2File
)
cursor, err := arangoDB.Query(arangoCTX, fmt.Sprintf("FOR gf IN gma_file_map FILTER gf._from == 'gma/%s' RETURN gf", dboGMA.ID), nil)
if err != nil {
return err
}
defer cursor.Close()
if cursor.Count() > 0 || cursor.HasMore() {
for {
gma2File := common.DB_GMA2File{}
_, err = cursor.ReadDocument(arangoCTX, &gma2File)
if driver.IsNoMoreDocuments(err) {
break
} else if err != nil {
return err
}
gma2File.UploadID = gma2File.File[5:]
rw_dboGMA2Files = append(rw_dboGMA2Files, gma2File)
}
} else {
return fmt.Errorf("no files for gma available")
}
trackerRewriteDoneMarker := sync.Once{} trackerRewriteDoneMarker := sync.Once{}
trackerRewrite := progress.Tracker{Message: fmt.Sprintf("Rewriting %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault} trackerRewrite := progress.Tracker{Message: fmt.Sprintf("Rewriting %s", niceName), Total: int64(len(dboFiles)), Units: progress.UnitsDefault}
pw.AppendTracker(&trackerRewrite) pw.AppendTracker(&trackerRewrite)
@ -739,7 +834,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
destPath := filepath.Join(gmaTempPath, "rewrite.gma") destPath := filepath.Join(gmaTempPath, "rewrite.gma")
dir := filepath.Dir(destPath) dir := filepath.Dir(destPath)
err := os.MkdirAll(dir, os.ModePerm) err = os.MkdirAll(dir, os.ModePerm)
if err != nil { if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err return err
@ -764,8 +859,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
return err return err
} }
sort.SliceStable(dboGMA2Files, func(i, j int) bool { return dboGMA2Files[i].FileNumber < dboGMA2Files[j].FileNumber }) sort.SliceStable(rw_dboGMA2Files, func(i, j int) bool { return rw_dboGMA2Files[i].FileNumber < rw_dboGMA2Files[j].FileNumber })
for _, dboGMA2File := range dboGMA2Files { for _, dboGMA2File := range rw_dboGMA2Files {
//fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber) //fmt.Printf("WriteFileIndex for %s number %d\n", dboGMA2File.FileName, dboGMA2File.FileNumber)
err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType) err = gmaWriter.WriteFileIndex(dboGMA2File.FileName, dboGMA2File.FileSize, dboGMA2File.CRC, dboGMA2File.NextType)
if err != nil { if err != nil {
@ -777,7 +872,7 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
Timeout: 15 * time.Minute, Timeout: 15 * time.Minute,
} }
for _, dboGMA2File := range dboGMA2Files { for _, dboGMA2File := range rw_dboGMA2Files {
//fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID) //fmt.Printf("WriteFile for %s number %d = %s\n", dboGMA2File.FileName, dboGMA2File.FileNumber, dboGMA2File.UploadID)
resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID)) resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:13371/fetch/%s", dboGMA2File.UploadID))
if err != nil { if err != nil {
@ -814,6 +909,8 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
writeSize := writeStat.Size() writeSize := writeStat.Size()
if writeSize != dboGMA.GMASize { if writeSize != dboGMA.GMASize {
//fail //fail
//createDebugInformation
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
trackerRewrite.MarkAsErrored() trackerRewrite.MarkAsErrored()
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
@ -825,6 +922,33 @@ func ProcessGMA(pw progress.Writer, filePath string) (err error) {
return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize) return fmt.Errorf("RewriteCheck failed, original=%s (%d bytes), rewrite=%s (%d bytes)", dboGMA.GMAHash, dboGMA.GMASize, writeHash, writeSize)
} }
trackerRewriteDoneMarker.Do(trackerRewrite.MarkAsDone) trackerRewriteDoneMarker.Do(trackerRewrite.MarkAsDone)
recoveryData := common.JSON_GMARecovery{
GMA: dboGMA,
Refs: rw_dboGMA2Files,
}
recoveryBytes, err := json.MarshalIndent(recoveryData, "", "\t")
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
recoveryPath := filepath.Join("/mnt/SC9000/gmaRecovery", fmt.Sprintf("%s.json", dboGMA.OriginalPath))
err = os.MkdirAll(filepath.Dir(recoveryPath), os.ModePerm)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
recoveryFile, err := os.Create(recoveryPath)
if err != nil {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err
}
_, err = recoveryFile.Write(recoveryBytes)
if err != nil {
return fmt.Errorf("error @recoveryFile.Write %v", err)
}
} }
// TODO: 4... profit? // TODO: 4... profit?

@ -29,8 +29,12 @@ import (
) )
var ( var (
FastCacheEnabled = true
FastCachePath = "/mnt/SC9000/fastCache2"
WORMCachePath = FastCachePath
PoolMaxItems = 500 PoolMaxItems = 500
PoolPathFinal = "/mnt/SC9000/storagePools" PoolPathFinal = "/mnt/SC9000/storagePool2"
PoolPathTemp = "/mnt/ramfs/" PoolPathTemp = "/mnt/ramfs/"
) )
@ -165,7 +169,7 @@ func InitDatabase() (err error) {
func (p *Pool) OpenTar() (err error) { func (p *Pool) OpenTar() (err error) {
p.wormMode = true p.wormMode = true
outputDir := filepath.Join(poolMaster.cachePath, "worm", p.PoolID) outputDir := filepath.Join(WORMCachePath, "worm", p.PoolID)
err = os.MkdirAll(outputDir, os.ModePerm) err = os.MkdirAll(outputDir, os.ModePerm)
if err != nil { if err != nil {
return err return err
@ -215,7 +219,7 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) {
if poolItem == id { if poolItem == id {
//fmt.Printf("Fetch WORMPool %s\n", id) //fmt.Printf("Fetch WORMPool %s\n", id)
p.LastTouchy = time.Now() p.LastTouchy = time.Now()
poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id) poolLocalFilePath := filepath.Join(WORMCachePath, "worm", p.PoolID, id)
srcLocalFile, err := os.Open(poolLocalFilePath) srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil { if err != nil {
return err return err
@ -715,6 +719,7 @@ func (p *PoolMaster) Lookup(id string) (exists bool) {
} }
} }
} }
dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id) dboFile2ChunkExists, err := colFile2Chunk.DocumentExists(arangoCTX, id)
if err != nil { if err != nil {
return false return false
@ -962,11 +967,44 @@ func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
} }
//FetchFromPoolPack //FetchFromPoolPack
apparentPoolID := dboFile2Chunk.Chunk[6:]
fastCachePath := filepath.Join(WORMCachePath, "worm", apparentPoolID, id)
if checkFileExists(fastCachePath) {
fastCacheDir := filepath.Join(WORMCachePath, "worm", apparentPoolID)
srcLocalFile, err := os.Open(fastCachePath)
if err != nil {
return err
}
defer srcLocalFile.Close()
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
// at this point acquire lock til end
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.WORMPools[apparentPoolID]; !ok {
// create virtual worm
fastCachePool, err := RestorePoolFromFolder(fastCacheDir)
if err != nil {
return err
}
// fastCachePool
p.WORMPools[fastCachePool.PoolID] = fastCachePool
}
return nil
}
//dboFile2Chunk.Chunk <- which chunk i need to find //dboFile2Chunk.Chunk <- which chunk i need to find
return p.FetchLoadWORM(dboFile2Chunk.Chunk[6:], id, writer) return p.FetchLoadWORM(dboFile2Chunk.Chunk[6:], id, writer)
} }
return nil return nil
} }
func checkFileExists(filePath string) bool {
_, error := os.Stat(filePath)
//return !os.IsNotExist(err)
return !errors.Is(error, os.ErrNotExist)
}
func (p *PoolMaster) Store(id string, workerID string, src io.Reader, targetSize int64) (err error) { func (p *PoolMaster) Store(id string, workerID string, src io.Reader, targetSize int64) (err error) {
pool, err := p.GetCurrentWriteablePool(workerID) pool, err := p.GetCurrentWriteablePool(workerID)
if err != nil { if err != nil {
@ -1058,7 +1096,7 @@ func main() {
go func() { go func() {
for { for {
poolMaster.PackFullPools() poolMaster.PackFullPools()
poolMaster.CleanWORMTemp() //poolMaster.CleanWORMTemp()
time.Sleep(time.Minute * 2) time.Sleep(time.Minute * 2)
} }
}() }()

Loading…
Cancel
Save