first release
This commit is contained in:
parent
ec9bbe8a6c
commit
3d9b5feba9
2 changed files with 64 additions and 22 deletions
25
main.go
25
main.go
|
@ -420,17 +420,30 @@ func ProcessGMA(filePath string) (err error) {
|
|||
fmt.Printf("Import Duration %dms\n", time.Since(importStartTime).Milliseconds())
|
||||
fmt.Println()
|
||||
// TODO: upload all unknownNewFiles to StorageServer
|
||||
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 200
|
||||
var httpClient *http.Client = http.DefaultClient
|
||||
|
||||
for _, unknownFile := range newUnknownFiles {
|
||||
unknownFileID := fmt.Sprintf("file/%s", unknownFile)
|
||||
for _, dboGMA2File := range dboGMA2Files {
|
||||
if unknownFileID == dboGMA2File.File {
|
||||
fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName)
|
||||
err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName)
|
||||
if err != nil {
|
||||
fmt.Println("oopsie")
|
||||
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
|
||||
return err
|
||||
uploadSuccess := true
|
||||
for {
|
||||
fmt.Printf("Uploading %s (local %s) to Storage\n", dboGMA2File.UploadID, dboGMA2File.LocalFileName)
|
||||
err = common.MultipartUpload(httpClient, fmt.Sprintf("http://127.0.0.1:13371/stash/%s/%d", dboGMA2File.UploadID, dboGMA2File.FileSize), dboGMA2File.LocalFileName)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "cannot assign requested address") {
|
||||
uploadSuccess = false
|
||||
} else {
|
||||
fmt.Println("oopsie")
|
||||
undoBatch(true, dboGMA.ID, fileIDs, gma2FileIDs)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if uploadSuccess {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,11 +32,11 @@ var (
|
|||
)
|
||||
|
||||
type Pool struct {
|
||||
PoolID string `json:"_key"`
|
||||
Finalized bool `json:"finalized"`
|
||||
ReadOnly bool `json:"readOnly"`
|
||||
Size uint64 `json:"size"`
|
||||
//folder string `json:"-"`
|
||||
PoolID string `json:"_key"`
|
||||
Finalized bool `json:"finalized"`
|
||||
ReadOnly bool `json:"readOnly"`
|
||||
Size uint64 `json:"size"`
|
||||
LastTouchy time.Time `json:"-"`
|
||||
|
||||
itemCount int
|
||||
items []string
|
||||
|
@ -88,7 +88,7 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba
|
|||
// Retry Loop for Failed Connections
|
||||
for i := 0; i < 6; i++ {
|
||||
if i == 5 {
|
||||
return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i)
|
||||
return driver, ctx, fmt.Errorf("connectdb unable to connect to database %d times", i)
|
||||
} else if i > 0 {
|
||||
time.Sleep(30 * time.Second)
|
||||
}
|
||||
|
@ -128,6 +128,9 @@ func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDataba
|
|||
}
|
||||
func InitDatabase() (err error) {
|
||||
arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
|
||||
if err != nil {
|
||||
|
@ -189,6 +192,7 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) {
|
|||
for _, poolItem := range p.items {
|
||||
if poolItem == id {
|
||||
fmt.Printf("Fetch WORMPool %s\n", id)
|
||||
p.LastTouchy = time.Now()
|
||||
poolLocalFilePath := filepath.Join(poolMaster.cachePath, "worm", p.PoolID, id)
|
||||
srcLocalFile, err := os.Open(poolLocalFilePath)
|
||||
if err != nil {
|
||||
|
@ -203,6 +207,10 @@ func (p *Pool) Fetch(id string, writer io.Writer) (err error) {
|
|||
}
|
||||
return fmt.Errorf("%s not found", id)
|
||||
}
|
||||
func (p *Pool) Unload() {
|
||||
log.Printf("Unloading WORMPool [%s]\n", p.PoolID)
|
||||
p.file.Close()
|
||||
}
|
||||
|
||||
func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) {
|
||||
poolMaster.finalPath = finalPath
|
||||
|
@ -213,20 +221,20 @@ func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, e
|
|||
destPath := filepath.Join(poolMaster.cachePath, "pool")
|
||||
err = os.MkdirAll(destPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return poolMaster, err
|
||||
return PoolMaster{}, err
|
||||
}
|
||||
|
||||
destPath = filepath.Join(poolMaster.cachePath, "worm")
|
||||
err = os.MkdirAll(destPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return poolMaster, err
|
||||
return PoolMaster{}, err
|
||||
}
|
||||
|
||||
err = os.MkdirAll(poolMaster.finalPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return poolMaster, err
|
||||
return PoolMaster{}, err
|
||||
}
|
||||
return poolMaster, nil
|
||||
return
|
||||
}
|
||||
func (p *PoolMaster) NewPool() (*Pool, error) {
|
||||
var err error
|
||||
|
@ -244,7 +252,6 @@ func (p *PoolMaster) NewPool() (*Pool, error) {
|
|||
|
||||
return &pool, nil
|
||||
}
|
||||
|
||||
func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) {
|
||||
//fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
|
||||
if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems {
|
||||
|
@ -536,7 +543,7 @@ func (p *PoolMaster) PackPool(poolID string) (packResult PoolPackResult, err err
|
|||
func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
|
||||
// p.NewPool()
|
||||
for _, localPool := range p.LocalPools {
|
||||
if !localPool.ReadOnly && localPool.itemCount < 500 {
|
||||
if !localPool.ReadOnly && localPool.itemCount < PoolMaxItems {
|
||||
return localPool, nil
|
||||
}
|
||||
}
|
||||
|
@ -612,10 +619,11 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
|
|||
return err
|
||||
}
|
||||
loadedWormPool := Pool{
|
||||
PoolID: dboChunk.ID,
|
||||
Size: uint64(dboChunk.Size),
|
||||
ReadOnly: dboChunk.ReadOnly,
|
||||
Finalized: dboChunk.Finalized,
|
||||
PoolID: dboChunk.ID,
|
||||
Size: uint64(dboChunk.Size),
|
||||
ReadOnly: dboChunk.ReadOnly,
|
||||
Finalized: dboChunk.Finalized,
|
||||
LastTouchy: time.Now(),
|
||||
|
||||
filePath: filepath.Join(p.finalPath, fmt.Sprintf("%s.tar", dboChunk.ID)),
|
||||
}
|
||||
|
@ -629,6 +637,19 @@ func (p *PoolMaster) FetchLoadWORM(chunkID string, fileID string, writer io.Writ
|
|||
return loadedWormPool.Fetch(fileID, writer)
|
||||
//return nil
|
||||
}
|
||||
func (p *PoolMaster) CleanWORMTemp() (err error) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
for _, wormPool := range p.WORMPools {
|
||||
if time.Since(wormPool.LastTouchy).Minutes() > 2 {
|
||||
wormPool.Unload()
|
||||
delete(p.WORMPools, wormPool.PoolID)
|
||||
os.RemoveAll(filepath.Join(poolMaster.cachePath, "worm", wormPool.PoolID))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
|
||||
if p.CurrentPool != nil {
|
||||
|
@ -823,6 +844,12 @@ func main() {
|
|||
}
|
||||
os.RemoveAll(filepath.Join(poolMaster.cachePath, "pool", fullPool.PoolID))
|
||||
deletedPools = append(deletedPools, fullPool.PoolID)
|
||||
_, err = colChunk.UpdateDocument(arangoCTX, packResult.PoolID, common.DB_Chunk{
|
||||
NotReady: false,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
poolMaster.lock.Unlock()
|
||||
}
|
||||
for _, deletedPoolID := range deletedPools {
|
||||
|
@ -833,6 +860,8 @@ func main() {
|
|||
}
|
||||
}
|
||||
}
|
||||
//
|
||||
poolMaster.CleanWORMTemp()
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
||||
}()
|
||||
|
|
Loading…
Add table
Reference in a new issue