You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
116 lines
2.9 KiB
Go
116 lines
2.9 KiB
Go
9 months ago
|
package grabber
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"git.cheetah.cat/archivinator-ng/workerclient/db"
|
||
|
"github.com/lrstanley/go-ytdlp"
|
||
|
"go.temporal.io/sdk/activity"
|
||
|
)
|
||
|
|
||
|
// returns all database request ids that need a refresh
|
||
|
func DBSelectOutdatedRequests(ctx context.Context) (requestIDs []int64, err error) {
|
||
|
logger := activity.GetLogger(ctx)
|
||
|
if db.BunDB == nil {
|
||
|
db.Initialize()
|
||
|
ytdlp.MustInstall(context.TODO(), nil)
|
||
|
}
|
||
|
|
||
|
var requests []db.Request
|
||
|
count, err := db.BunDB.NewSelect().Model(&requests).Where("chipselect = true").ScanAndCount(ctx)
|
||
|
if err != nil {
|
||
|
return requestIDs, err
|
||
|
}
|
||
|
activity.RecordHeartbeat(ctx)
|
||
|
|
||
|
logger.Info("Processing %d Requests", count)
|
||
|
for _, requestItem := range requests {
|
||
|
if time.Since(requestItem.LastCheck).Minutes() > float64(requestItem.Interval) {
|
||
|
requestIDs = append(requestIDs, requestItem.ID)
|
||
|
}
|
||
|
}
|
||
|
return requestIDs, nil
|
||
|
}
|
||
|
|
||
|
// Returns the Request Entry from the DB
|
||
|
func DBLoadRequestEntry(ctx context.Context, requestId int64) (request db.Request, err error) {
|
||
|
if db.BunDB == nil {
|
||
|
db.Initialize()
|
||
|
}
|
||
|
activity.RecordHeartbeat(ctx)
|
||
|
|
||
|
logger := activity.GetLogger(ctx)
|
||
|
logger.Info(fmt.Sprintf("started DBLoadRequestEntry for ID=%d", requestId))
|
||
|
var requestObj db.Request
|
||
|
err = db.BunDB.NewSelect().
|
||
|
Model(&requestObj).
|
||
|
Where("id = ?", requestId).
|
||
|
Limit(1).
|
||
|
Scan(ctx)
|
||
|
if err != nil {
|
||
|
return request, err
|
||
|
}
|
||
|
return requestObj, nil
|
||
|
}
|
||
|
|
||
|
// Grabs Data from YT-DLP
|
||
|
func FetchRequestContents(ctx context.Context, workingRequest db.Request) (decision string, err error) {
|
||
|
logger := activity.GetLogger(ctx)
|
||
|
logger.Info("Starting YT-DLP with Activity Context")
|
||
|
|
||
|
playlistGrabber := ytdlp.New().FlatPlaylist().PrintJSON().UnsetPrintToFile()
|
||
|
|
||
|
// sends regular heartbeats to temporal
|
||
|
keepAliveKiller := make(chan struct{})
|
||
|
keepAliveTimer := time.NewTicker(time.Second * 1)
|
||
|
go func() {
|
||
|
for {
|
||
|
select {
|
||
|
case <-keepAliveTimer.C:
|
||
|
activity.RecordHeartbeat(ctx)
|
||
|
case <-keepAliveKiller:
|
||
|
keepAliveTimer.Stop()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
defer keepAliveTimer.Stop()
|
||
|
|
||
|
output, err := playlistGrabber.Run(ctx, workingRequest.URL)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
|
||
|
info, err := output.GetExtractedInfo()
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
for _, infoobj := range info {
|
||
|
logger.Info(fmt.Sprint(*infoobj.Title, *infoobj.Duration, infoobj.Type, *infoobj.Extractor, infoobj.Thumbnails))
|
||
|
}
|
||
|
|
||
|
return fmt.Sprintf("Request returned %d objects", len(info)), nil
|
||
|
}
|
||
|
|
||
|
func DBUpdateRequestLastChecked(ctx context.Context, workingRequest db.Request) (err error) {
|
||
|
if db.BunDB == nil {
|
||
|
db.Initialize()
|
||
|
}
|
||
|
activity.RecordHeartbeat(ctx)
|
||
|
|
||
|
logger := activity.GetLogger(ctx)
|
||
|
logger.Info(fmt.Sprintf("started DBUpdateRequestLastChecked for ID=%d", workingRequest.ID))
|
||
|
var requestObj db.Request
|
||
|
_, err = db.BunDB.NewUpdate().
|
||
|
Model(&requestObj).
|
||
|
Where("id = ?", workingRequest.ID).
|
||
|
Set("lastcheck = ?", time.Now()).
|
||
|
Exec(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|