deadlooooooock

master
cheetah 2 years ago
parent f004a8eb3e
commit e19548a0b6

@ -136,3 +136,55 @@ func MultipartUpload(client *http.Client, url string, path string) (err error) {
} }
return return
} }
func MoveFile(sourcePath, destPath string) error {
inputFile, err := os.Open(sourcePath)
if err != nil {
return fmt.Errorf("couldn't open source file: %s", err)
}
outputFile, err := os.Create(destPath)
if err != nil {
inputFile.Close()
return fmt.Errorf("couldn't open dest file: %s", err)
}
defer outputFile.Close()
_, err = io.Copy(outputFile, inputFile)
inputFile.Close()
if err != nil {
return fmt.Errorf("writing to output file failed: %s", err)
}
// The copy was successful, so now delete the original file
err = os.Remove(sourcePath)
if err != nil {
return fmt.Errorf("failed removing original file: %s", err)
}
return nil
}
type Semaphore interface {
Acquire()
Release()
Close()
}
type semaphore struct {
semC chan struct{}
}
func NewSemaphore(maxConcurrency int) Semaphore {
return &semaphore{
semC: make(chan struct{}, maxConcurrency),
}
}
func (s *semaphore) Acquire() {
s.semC <- struct{}{}
}
func (s *semaphore) Release() {
<-s.semC
}
func (s *semaphore) Close() {
close(s.semC)
}

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"flag"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
@ -25,9 +26,9 @@ import (
var ( var (
arangoDB adriver.Database arangoDB adriver.Database
arangoCTX context.Context arangoCTX context.Context
colChunk adriver.Collection //colChunk adriver.Collection
colFile adriver.Collection colFile adriver.Collection
colFile2Chunk adriver.Collection //colFile2Chunk adriver.Collection
colGMA adriver.Collection colGMA adriver.Collection
colGMA2File adriver.Collection colGMA2File adriver.Collection
) )
@ -38,7 +39,7 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba
// Retry Loop for Failed Connections // Retry Loop for Failed Connections
for i := 0; i < 6; i++ { for i := 0; i < 6; i++ {
if i == 5 { if i == 5 {
return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i) return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times!", i)
} else if i > 0 { } else if i > 0 {
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
} }
@ -79,10 +80,10 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba
func InitDatabase() (err error) { func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator") arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator")
colChunk, err = arangoDB.Collection(arangoCTX, "chunk") /*colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
if err != nil { if err != nil {
return err return err
} }*/
colFile, err = arangoDB.Collection(arangoCTX, "file") colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil { if err != nil {
return err return err
@ -91,10 +92,10 @@ func InitDatabase() (err error) {
if err != nil { if err != nil {
return err return err
} }
colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map") /*colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map")
if err != nil { if err != nil {
return err return err
} }*/
colGMA2File, err = arangoDB.Collection(arangoCTX, "gma_file_map") colGMA2File, err = arangoDB.Collection(arangoCTX, "gma_file_map")
if err != nil { if err != nil {
return err return err
@ -102,8 +103,15 @@ func InitDatabase() (err error) {
return nil return nil
} }
var JobPoolSize int = 5
var ConcurrencyLimit int = 5
var WorkerJobPool chan string
func main() { func main() {
folderPathP := flag.String("path", "/mnt/SC9000/TemporaryTestingShit2", "a string")
debug.SetMemoryLimit(6e9) debug.SetMemoryLimit(6e9)
flag.Parse()
folderPath := *folderPathP
go func() { go func() {
log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
@ -132,14 +140,17 @@ func main() {
//fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin") //fileHandle, err := os.Open("2143898000.1593250551.bin.gma") //2143898000.1593250551.bin")
//gma, err := gma.NewReader("2143898000.1593250551.bin.gma") //gma, err := gma.NewReader("2143898000.1593250551.bin.gma")
folderPath := "/mnt/SC9000/TemporaryTestingShit/" //"/mnt/worksucc/san1/gma/2/5/4/8/" //folderPath := "/mnt/SC9000/TemporaryTestingShit2/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/" folderPathTarget := "/mnt/SC9000/ProcessedGMATest/" //"/mnt/worksucc/san1/gma/2/5/4/8/"
//0 //
entries, err := os.ReadDir(folderPath) entries, err := os.ReadDir(folderPath)
if err != nil { if err != nil {
panic(err) panic(err)
} }
skipBla := false skipBla := false
var WorkerJobPool []string
for _, e := range entries { for _, e := range entries {
if !e.IsDir() && skipBla == true { if !e.IsDir() && skipBla == true {
if e.Name() == "2547463094.1626322945.gma" { if e.Name() == "2547463094.1626322945.gma" {
@ -149,15 +160,35 @@ func main() {
} }
} }
if !e.IsDir() { if !e.IsDir() {
err = ProcessGMA(filepath.Join(folderPath, e.Name())) WorkerJobPool = append(WorkerJobPool, filepath.Join(folderPath, e.Name()))
}
}
/*
sem := common.NewSemaphore(ConcurrencyLimit)
wg := sync.WaitGroup{}
*/
for _, jobFile := range WorkerJobPool {
//wg.Add(1)
//go func(jobFile string, wg *sync.WaitGroup) {
// sem.Acquire() // Wait for worker to have slot open
err = ProcessGMA(jobFile)
if err != nil { if err != nil {
fmt.Printf("\nERROR: %v\n", err) fmt.Printf("\nERROR: %v\n", err)
//panic(err) //panic(err)
continue continue
} }
os.Rename(filepath.Join(folderPath, e.Name()), filepath.Join(folderPathTarget, e.Name())) os.Rename(jobFile, filepath.Join(folderPathTarget, filepath.Base(jobFile)))
}
// sem.Release() // Release the slot
// wg.Done() // Finish job
//}(job, &wg)
} }
// Wait for all jobs to finish
//wg.Wait()
} }
func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) { func undoBatch(undoBatch bool, gmaID string, fileIDs []string, gma2FileIDs []string) (err error) {
@ -339,8 +370,8 @@ func ProcessGMA(filePath string) (err error) {
// process and work withj // process and work withj
metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize]) metaSlice, errorSlice, _ := colFile.CreateDocuments(arangoCTX, dboFiles[0:chunkSize])
//fmt.Println("Metaslice") fmt.Println("Metaslice")
//fmt.Println(metaSlice) fmt.Println(metaSlice)
for _, meta := range metaSlice { for _, meta := range metaSlice {
if !meta.ID.IsEmpty() { if !meta.ID.IsEmpty() {
newUnknownFiles = append(newUnknownFiles, meta.Key) newUnknownFiles = append(newUnknownFiles, meta.Key)
@ -394,9 +425,10 @@ func ProcessGMA(filePath string) (err error) {
unknownFileID := fmt.Sprintf("file/%s", unknownFile) unknownFileID := fmt.Sprintf("file/%s", unknownFile)
for _, dboGMA2File := range dboGMA2Files { for _, dboGMA2File := range dboGMA2Files {
if unknownFileID == dboGMA2File.File { if unknownFileID == dboGMA2File.File {
//fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName) fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName)
err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName) err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName)
if err != nil { if err != nil {
fmt.Println("oopsie")
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err return err
} }
@ -407,6 +439,7 @@ func ProcessGMA(filePath string) (err error) {
// TODO : write new gma from arangoinfo // TODO : write new gma from arangoinfo
// TODO : compare hashes // TODO : compare hashes
{ {
fmt.Println("rewriting gma")
destPath := filepath.Join(gmaTempPath, "rewrite.gma") destPath := filepath.Join(gmaTempPath, "rewrite.gma")
dir := filepath.Dir(destPath) dir := filepath.Dir(destPath)
@ -504,6 +537,5 @@ func ProcessGMA(filePath string) (err error) {
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs) undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
return err return err
} }
0
return nil return nil
} }

@ -26,9 +26,9 @@ import (
) )
var ( var (
PoolMaxItems = 2500 PoolMaxItems = 500
PoolPathFinal = "/mnt/SC9000/storagePools" PoolPathFinal = "/mnt/SC9000/storagePools"
PoolPathTemp = "/mnt/SC9000/storageTemp" PoolPathTemp = "/mnt/ramfs/"
) )
type Pool struct { type Pool struct {
@ -58,9 +58,9 @@ type PoolMaster struct {
CurrentPool *Pool CurrentPool *Pool
lock sync.Mutex lock sync.Mutex
LocalPools []Pool LocalPools []*Pool
FullPools []Pool FullPools []*Pool
WORMPools map[string]Pool WORMPools map[string]*Pool
} }
type PoolPackResult struct { type PoolPackResult struct {
PoolID string PoolID string
@ -207,7 +207,7 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) {
func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) { func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) {
poolMaster.finalPath = finalPath poolMaster.finalPath = finalPath
poolMaster.cachePath = cachePath poolMaster.cachePath = cachePath
poolMaster.WORMPools = make(map[string]Pool) poolMaster.WORMPools = make(map[string]*Pool)
//poolMaster.lock = sync.Mutex{} //poolMaster.lock = sync.Mutex{}
destPath := filepath.Join(poolMaster.cachePath, "pool") destPath := filepath.Join(poolMaster.cachePath, "pool")
@ -228,8 +228,9 @@ func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, e
} }
return poolMaster, nil return poolMaster, nil
} }
func (p *PoolMaster) NewPool() (pool *Pool, err error) { func (p *PoolMaster) NewPool() (*Pool, error) {
pool = &Pool{} var err error
pool := Pool{}
pool.PoolID = uuid.NewV4().String() pool.PoolID = uuid.NewV4().String()
pool.Finalized = false pool.Finalized = false
pool.ReadOnly = false pool.ReadOnly = false
@ -238,35 +239,39 @@ func (p *PoolMaster) NewPool() (pool *Pool, err error) {
//dir := filepath.Dir(destPath) //dir := filepath.Dir(destPath)
err = os.MkdirAll(destPath, os.ModePerm) err = os.MkdirAll(destPath, os.ModePerm)
if err != nil { if err != nil {
return pool, err return &pool, err
} }
return pool, nil return &pool, nil
} }
func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) { func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) {
//fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount) //fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems { if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems {
fmt.Printf("Aquiring Lock for GetCurrentWriteablepool\n")
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
defer fmt.Printf("unlock GetCurrentWriteablepool\n")
fmt.Printf("Aquired Lock success for GetCurrentWriteablepool\n")
p.CurrentPool.ReadOnly = true p.CurrentPool.ReadOnly = true
p.FullPools = append(p.FullPools, *p.CurrentPool) p.FullPools = append(p.FullPools, p.CurrentPool)
// queue for compression // queue for compression
fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one", p.CurrentPool.PoolID, p.CurrentPool.itemCount) fmt.Printf("GetCurrentWriteablePool(): current Pool (%s) is full (%d), creating new one\n", p.CurrentPool.PoolID, p.CurrentPool.itemCount)
p.CurrentPool = nil p.CurrentPool = nil
} }
if p.CurrentPool == nil { if p.CurrentPool == nil {
pool, err = p.AcquireNewOrRecoverPool() fmt.Printf("Creating new Pool")
p.CurrentPool, err = p.AcquireNewOrRecoverPool()
fmt.Printf("... got [%s]\n", p.CurrentPool.PoolID)
if err != nil { if err != nil {
return pool, err return pool, err
} }
p.CurrentPool = pool
return pool, nil
} }
return p.CurrentPool, nil return p.CurrentPool, nil
} }
func RestorePoolFromFolder(folderPath string) (pool Pool, err error) { func RestorePoolFromFolder(folderPath string) (pool *Pool, err error) {
pool = &Pool{}
pool.PoolID = path.Base(folderPath) pool.PoolID = path.Base(folderPath)
entries, err := os.ReadDir(folderPath) entries, err := os.ReadDir(folderPath)
@ -285,8 +290,11 @@ func RestorePoolFromFolder(folderPath string) (pool Pool, err error) {
return pool, err return pool, err
} }
func (p *PoolMaster) ScanForLocalPools() (err error) { func (p *PoolMaster) ScanForLocalPools() (err error) {
fmt.Printf("Aquiring Lock for ScanForLocalPools\n")
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
defer fmt.Printf("unlock ScanForLocalPools\n")
fmt.Printf("Aquired Lock success for ScanForLocalPools\n")
entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool")) entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool"))
if err != nil { if err != nil {
return err return err
@ -401,7 +409,10 @@ func (p *PoolMaster) MovePoolPackToWORM(packResult PoolPackResult) (err error) {
startTime := time.Now() startTime := time.Now()
targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", packResult.PoolID)) targetFileName := filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", packResult.PoolID))
os.Rename(packResult.outputFileName, targetFileName) err = common.MoveFile(packResult.outputFileName, targetFileName)
if err != nil {
return err
}
tarFileCheck, err := os.Open(targetFileName) tarFileCheck, err := os.Open(targetFileName)
if err != nil { if err != nil {
@ -427,8 +438,11 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
startTime := time.Now() startTime := time.Now()
packResult.PoolID = poolID packResult.PoolID = poolID
fmt.Printf("Aquiring Lock for PackPool(%s)\n", poolID)
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
defer fmt.Printf("unlock PackPool\n")
fmt.Printf("Aquired Lock success for PackPool(%s)\n", poolID)
packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID)) packResult.outputFileName = filepath.Join(p.cachePath, "pool", fmt.Sprintf("%s.tar", poolID))
tarFile, err := os.Create(packResult.outputFileName) tarFile, err := os.Create(packResult.outputFileName)
@ -521,8 +535,8 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) { func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
// p.NewPool() // p.NewPool()
for _, localPool := range p.LocalPools { for _, localPool := range p.LocalPools {
if !localPool.ReadOnly { if !localPool.ReadOnly && localPool.itemCount < 500 {
return &localPool, nil return localPool, nil
} }
} }
return p.NewPool() return p.NewPool()
@ -585,8 +599,11 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
// else load wormPool into disk-cache extract to "worm" // else load wormPool into disk-cache extract to "worm"
// wormMode // wormMode
fmt.Printf("Aquiring Lock for FetchLoadWORM\n")
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
defer fmt.Printf("unlock FetchLoadWORM\n")
fmt.Printf("Aquired Lock success for FetchLoadWORM\n")
var dboChunk common.DB_Chunk var dboChunk common.DB_Chunk
_, err = colChunk.ReadDocument(arangoCTX, chunkID, &dboChunk) _, err = colChunk.ReadDocument(arangoCTX, chunkID, &dboChunk)
@ -607,7 +624,7 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
return err return err
} }
fmt.Println("extracted") fmt.Println("extracted")
p.WORMPools[loadedWormPool.PoolID] = loadedWormPool p.WORMPools[loadedWormPool.PoolID] = &loadedWormPool
return loadedWormPool.Fetch(fileID, writer) return loadedWormPool.Fetch(fileID, writer)
//return nil //return nil
} }
@ -707,9 +724,11 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro
} }
fmt.Printf("Store(%s)\n", id) fmt.Printf("Store(%s)\n", id)
fmt.Printf("Aquiring Lock for Store\n")
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
defer fmt.Printf("unlock Store\n")
fmt.Printf("Aquired Lock success for Store\n")
// figuring out paths // figuring out paths
poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID) poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID)
destPath := filepath.Join(poolFolder, id) destPath := filepath.Join(poolFolder, id)
@ -764,7 +783,9 @@ func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err erro
return nil return nil
} }
func removeFromSlice(slice []*Pool, s int) []*Pool {
return append(slice[:s], slice[s+1:]...)
}
func main() { func main() {
err := InitDatabase() err := InitDatabase()
if err != nil { if err != nil {
@ -781,8 +802,39 @@ func main() {
panic(err) panic(err)
} }
go func() {
for {
for index, fullPool := range poolMaster.FullPools {
poolMaster.lock.Lock()
packResult, err := poolMaster.PackPool(fullPool.PoolID)
if err != nil {
panic(err)
}
err = poolMaster.ImportPoolPackResult(packResult)
if err != nil {
panic(err)
}
err = poolMaster.MovePoolPackToWORM(packResult)
if err != nil {
panic(err)
}
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
poolMaster.FullPools = removeFromSlice(poolMaster.FullPools, index)
poolMaster.lock.Unlock()
}
time.Sleep(time.Second * 10)
}
}()
for _, localPool := range poolMaster.LocalPools { for _, localPool := range poolMaster.LocalPools {
if localPool.ReadOnly { if localPool.ReadOnly {
dboChunkExists, err := colChunk.DocumentExists(arangoCTX, localPool.PoolID)
if err != nil {
panic(err)
}
if !dboChunkExists {
fmt.Printf("Packing Pool %s\n", localPool.PoolID) fmt.Printf("Packing Pool %s\n", localPool.PoolID)
packResult, err := poolMaster.PackPool(localPool.PoolID) packResult, err := poolMaster.PackPool(localPool.PoolID)
if err != nil { if err != nil {
@ -796,6 +848,9 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
}
//os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", localPool.PoolID))
} }
// packResult.FileCount // packResult.FileCount
} }

Loading…
Cancel
Save