You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

460 lines
11 KiB
Go

1 year ago
package main
import (
"archive/tar"
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"sync"
"time"
adriver "github.com/arangodb/go-driver"
ahttp "github.com/arangodb/go-driver/http"
"github.com/labstack/echo/v4"
"github.com/twinj/uuid"
)
var (
PoolMaxItems = 2500
PoolPathFinal = "/mnt/SC9000/storagePools"
PoolPathTemp = "/mnt/SC9000/storageTemp"
)
type Pool struct {
PoolID string `json:"_key"`
Finalized bool `json:"finalized"`
ReadOnly bool `json:"readOnly"`
Size uint64 `json:"size"`
folder string `json:"-"`
itemCount int
items []string
file *os.File
tarWriter *tar.Writer
tarReader *tar.Reader
}
type PoolFile struct {
FileID string
Size uint64
}
type PoolMaster struct {
cachePath string
finalPath string
CurrentPool *Pool
lock sync.Mutex
LocalPools []Pool
FullPools []Pool
}
func NewPoolMaster(finalPath string, cachePath string) (poolMaster PoolMaster, err error) {
poolMaster.finalPath = finalPath
poolMaster.cachePath = cachePath
//poolMaster.lock = sync.Mutex{}
destPath := filepath.Join(poolMaster.cachePath, "pool")
err = os.MkdirAll(destPath, os.ModePerm)
if err != nil {
return poolMaster, err
}
err = os.MkdirAll(poolMaster.finalPath, os.ModePerm)
if err != nil {
return poolMaster, err
}
return poolMaster, nil
}
func (p *PoolMaster) NewPool() (pool *Pool, err error) {
pool = &Pool{}
pool.PoolID = uuid.NewV4().String()
pool.Finalized = false
pool.ReadOnly = false
//TODO : Sync to DB
destPath := filepath.Join(p.cachePath, "pool", pool.PoolID)
//dir := filepath.Dir(destPath)
err = os.MkdirAll(destPath, os.ModePerm)
if err != nil {
return pool, err
}
return pool, nil
}
func (p *PoolMaster) GetCurrentWriteablePool() (pool *Pool, err error) {
if p.CurrentPool != nil && p.CurrentPool.itemCount >= PoolMaxItems {
p.lock.Lock()
defer p.lock.Unlock()
p.CurrentPool.ReadOnly = true
p.FullPools = append(p.FullPools, *p.CurrentPool)
// queue for compression
p.CurrentPool = nil
}
if p.CurrentPool == nil {
pool, err = p.AcquireNewOrRecoverPool()
if err != nil {
return pool, err
}
p.CurrentPool = pool
return pool, nil
}
return p.CurrentPool, nil
}
func RestorePoolFromFolder(folderPath string) (pool Pool, err error) {
pool.PoolID = path.Base(folderPath)
entries, err := os.ReadDir(folderPath)
if err != nil {
return pool, err
}
for _, e := range entries {
if !e.IsDir() {
pool.items = append(pool.items, e.Name())
pool.itemCount++
}
}
pool.ReadOnly = pool.itemCount >= PoolMaxItems
pool.Finalized = false // we are still local
return pool, err
}
func (p *PoolMaster) ScanForLocalPools() (err error) {
p.lock.Lock()
defer p.lock.Unlock()
entries, err := os.ReadDir(filepath.Join(p.cachePath, "pool"))
if err != nil {
return err
}
for _, e := range entries {
if e.IsDir() {
fmt.Printf("Scanning For Local Pools, found %s:\n", e.Name())
poolDirPath := filepath.Join(p.cachePath, "pool", e.Name())
restoredPool, err := RestorePoolFromFolder(poolDirPath)
if err != nil {
return err
}
fmt.Printf("is readonly %v itemCount=%d\n", restoredPool.ReadOnly, restoredPool.itemCount)
p.LocalPools = append(p.LocalPools, restoredPool)
}
}
return nil
}
func (p *PoolMaster) AcquireNewOrRecoverPool() (pool *Pool, err error) {
// p.NewPool()
for _, localPool := range p.LocalPools {
if !localPool.ReadOnly {
return &localPool, nil
}
}
return p.NewPool()
}
func (p *PoolMaster) Lookup(id string) (exists bool) {
if p.CurrentPool != nil {
for _, poolItem := range p.CurrentPool.items {
if poolItem == id {
return true
}
}
}
for _, fullPool := range p.FullPools {
for _, poolItem := range fullPool.items {
if poolItem == id {
return true
}
}
}
for _, localPool := range p.LocalPools {
for _, poolItem := range localPool.items {
if poolItem == id {
return true
}
}
}
// TODO : DB Check
return false
}
func (p *PoolMaster) Fetch(id string, writer io.Writer) (err error) {
if p.CurrentPool != nil {
for _, poolItem := range p.CurrentPool.items {
if poolItem == id {
fmt.Printf("Fetch CurrentPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", p.CurrentPool.PoolID, id)
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", p.CurrentPool.PoolID, poolItem)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
//fmt.Println("Closer")
defer srcLocalFile.Close()
//fmt.Println("io.Copy")
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
}
for _, fullPool := range p.FullPools {
for _, poolItem := range fullPool.items {
if poolItem == id {
fmt.Printf("Fetch FullPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", fullPool.PoolID, id)
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", fullPool.PoolID, poolItem)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
//fmt.Println("Closer")
defer srcLocalFile.Close()
//fmt.Println("io.Copy")
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
}
for _, localPool := range p.LocalPools {
for _, poolItem := range localPool.items {
if poolItem == id {
fmt.Printf("Fetch LocalPool %s\n", id)
poolLocalFilePath := filepath.Join(p.cachePath, "pool", localPool.PoolID, id)
//fmt.Println(poolLocalFilePath)
//fmt.Printf("%s %s\n", localPool.PoolID, poolItem)
srcLocalFile, err := os.Open(poolLocalFilePath)
if err != nil {
return err
}
//fmt.Println("Closer")
defer srcLocalFile.Close()
//fmt.Println("io.Copy")
if _, err = io.Copy(writer, srcLocalFile); err != nil {
return err
}
return nil
}
}
}
return nil
}
func (p *PoolMaster) Store(id string, src io.Reader, targetSize int64) (err error) {
pool, err := p.GetCurrentWriteablePool()
if err != nil {
return err
}
p.lock.Lock()
defer p.lock.Unlock()
poolFolder := filepath.Join(p.cachePath, "pool", pool.PoolID)
destPath := filepath.Join(poolFolder, id)
dst, err := os.Create(destPath)
if err != nil {
_ = os.Remove(destPath)
return err
}
defer dst.Close()
writtenBytes, err := io.Copy(dst, src)
if err != nil {
_ = os.Remove(destPath)
return err
}
if writtenBytes != targetSize {
_ = os.Remove(destPath)
return err
}
pool.itemCount++
pool.items = append(pool.items, id)
fmt.Printf("Current Pool %s, ItemCount = %d\n", pool.PoolID, pool.itemCount)
entries, err := os.ReadDir(poolFolder)
if err != nil {
return err
}
newItemCount := 0
for _, e := range entries {
if !e.IsDir() {
pool.items = append(pool.items, e.Name())
newItemCount++
}
}
pool.itemCount = newItemCount
fmt.Printf("Current Pool %s, Recounted ItemCount = %d\n", pool.PoolID, pool.itemCount)
if pool.itemCount >= PoolMaxItems {
pool.ReadOnly = true
}
return nil
}
var (
arangoDB adriver.Database
arangoCTX context.Context
colChunk adriver.Collection
colFile adriver.Collection
colFile2Chunk adriver.Collection
)
func ConnectDB(baseURL string, arangoUser string, arangoPWD string, arangoDatabase string) (driver adriver.Database, ctx context.Context, err error) {
log.Println("connectDB:", "Starting Connection Process...")
// Retry Loop for Failed Connections
for i := 0; i < 6; i++ {
if i == 5 {
return driver, ctx, fmt.Errorf("connectdb: unable to connect to database %d times!", i)
} else if i > 0 {
time.Sleep(30 * time.Second)
}
// Connect to ArangoDB URL
conn, err := ahttp.NewConnection(ahttp.ConnectionConfig{
Endpoints: []string{baseURL},
TLSConfig: &tls.Config{ /*...*/ },
})
if err != nil {
log.Println("connectDB:", "Cannot Connect to ArangoDB!", err)
continue
}
// Connect Driver to User
client, err := adriver.NewClient(adriver.ClientConfig{
Connection: conn,
Authentication: adriver.BasicAuthentication(arangoUser, arangoPWD),
})
if err != nil {
log.Println("connectDB:", "Cannot Authenticate ArangoDB User!", err)
continue
}
// Create Context for Database Access
ctx = context.Background()
driver, err = client.Database(ctx, arangoDatabase)
if err != nil {
log.Println("connectDB:", "Cannot Load ArangoDB Database!", err)
continue
}
log.Println("connectDB:", "Connection Sucessful!")
return driver, ctx, nil
}
return driver, ctx, fmt.Errorf("connectDB: FUCK HOW DID THIS EXCUTE?")
}
func InitDatabase() (err error) {
arangoDB, arangoCTX, err = ConnectDB("http://192.168.45.8:8529/", "gma-inator", "gma-inator", "gma-inator")
colChunk, err = arangoDB.Collection(arangoCTX, "chunk")
if err != nil {
return err
}
colFile, err = arangoDB.Collection(arangoCTX, "file")
if err != nil {
return err
}
colFile2Chunk, err = arangoDB.Collection(arangoCTX, "file_chunk_map")
if err != nil {
return err
}
return nil
}
var (
poolMaster PoolMaster
//poolFiles = []PoolFile{}
//seq = 1
//lock = sync.Mutex{}
)
func main() {
err := InitDatabase()
if err != nil {
panic(err)
}
poolMaster, err = NewPoolMaster(PoolPathFinal, PoolPathTemp)
if err != nil {
panic(err)
}
// Scan for local existing Pools
err = poolMaster.ScanForLocalPools()
if err != nil {
panic(err)
}
e := echo.New()
//e.Use(middleware.Logger())
e.GET("/", func(c echo.Context) error {
return c.String(http.StatusOK, "Hello, World!")
})
e.GET("/fetch/:id", func(c echo.Context) error {
id := c.Param("id")
exists := poolMaster.Lookup(id)
if exists {
c.Response().Header().Set(echo.HeaderContentType, echo.MIMEOctetStream)
c.Response().WriteHeader(http.StatusOK)
err = poolMaster.Fetch(id, c.Response())
if err != nil {
fmt.Printf("%v", err)
return c.String(http.StatusInternalServerError, err.Error())
}
c.Response().Flush()
} else {
fmt.Printf("/fetch/%s does not exist\n", id)
return c.String(http.StatusNotFound, "Not Found")
}
return nil
//return c.Stream(200, "application/x-octet-stream", nil)
})
e.POST("/stash/:id/:size", func(c echo.Context) error {
id := c.Param("id")
sizeStr := c.Param("size")
sizeVal, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
exists := poolMaster.Lookup(id)
if exists {
fmt.Printf("/stash/%s exists already\n", id)
return c.String(http.StatusAlreadyReported, "Exists already")
}
fmt.Printf("stashing %s", id)
file, err := c.FormFile("file")
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
formStream, err := file.Open()
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
defer formStream.Close()
err = poolMaster.Store(id, formStream, sizeVal)
if err != nil {
fmt.Println(err)
return c.String(http.StatusExpectationFailed, "Error")
}
fmt.Println("...stashed")
return c.JSON(http.StatusOK, true)
})
e.Logger.Fatal(e.Start(":13371"))
}