You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
2.9 KiB
Go

package grabber
import (
"context"
"fmt"
"time"
"git.cheetah.cat/archivinator-ng/workerclient/db"
"github.com/lrstanley/go-ytdlp"
"go.temporal.io/sdk/activity"
)
// returns all database request ids that need a refresh
func DBSelectOutdatedRequests(ctx context.Context) (requestIDs []int64, err error) {
logger := activity.GetLogger(ctx)
if db.BunDB == nil {
db.Initialize()
ytdlp.MustInstall(context.TODO(), nil)
}
var requests []db.Request
count, err := db.BunDB.NewSelect().Model(&requests).Where("chipselect = true").ScanAndCount(ctx)
if err != nil {
return requestIDs, err
}
activity.RecordHeartbeat(ctx)
logger.Info("Processing %d Requests", count)
for _, requestItem := range requests {
if time.Since(requestItem.LastCheck).Minutes() > float64(requestItem.Interval) {
requestIDs = append(requestIDs, requestItem.ID)
}
}
return requestIDs, nil
}
// Returns the Request Entry from the DB
func DBLoadRequestEntry(ctx context.Context, requestId int64) (request db.Request, err error) {
if db.BunDB == nil {
db.Initialize()
}
activity.RecordHeartbeat(ctx)
logger := activity.GetLogger(ctx)
logger.Info(fmt.Sprintf("started DBLoadRequestEntry for ID=%d", requestId))
var requestObj db.Request
err = db.BunDB.NewSelect().
Model(&requestObj).
Where("id = ?", requestId).
Limit(1).
Scan(ctx)
if err != nil {
return request, err
}
return requestObj, nil
}
// Grabs Data from YT-DLP
func FetchRequestContents(ctx context.Context, workingRequest db.Request) (decision string, err error) {
logger := activity.GetLogger(ctx)
logger.Info("Starting YT-DLP with Activity Context")
playlistGrabber := ytdlp.New().FlatPlaylist().PrintJSON().UnsetPrintToFile()
// sends regular heartbeats to temporal
keepAliveKiller := make(chan struct{})
keepAliveTimer := time.NewTicker(time.Second * 1)
go func() {
for {
select {
case <-keepAliveTimer.C:
activity.RecordHeartbeat(ctx)
case <-keepAliveKiller:
keepAliveTimer.Stop()
return
}
}
}()
defer keepAliveTimer.Stop()
output, err := playlistGrabber.Run(ctx, workingRequest.URL)
if err != nil {
return "", err
}
info, err := output.GetExtractedInfo()
if err != nil {
return "", err
}
for _, infoobj := range info {
logger.Info(fmt.Sprint(*infoobj.Title, *infoobj.Duration, infoobj.Type, *infoobj.Extractor, infoobj.Thumbnails))
}
return fmt.Sprintf("Request returned %d objects", len(info)), nil
}
func DBUpdateRequestLastChecked(ctx context.Context, workingRequest db.Request) (err error) {
if db.BunDB == nil {
db.Initialize()
}
activity.RecordHeartbeat(ctx)
logger := activity.GetLogger(ctx)
logger.Info(fmt.Sprintf("started DBUpdateRequestLastChecked for ID=%d", workingRequest.ID))
var requestObj db.Request
_, err = db.BunDB.NewUpdate().
Model(&requestObj).
Where("id = ?", workingRequest.ID).
Set("lastcheck = ?", time.Now()).
Exec(ctx)
if err != nil {
return err
}
return nil
}