You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
214 lines
6.9 KiB
Go
214 lines
6.9 KiB
Go
package grabber
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.cheetah.cat/archivinator-ng/workerclient/db"
|
|
"github.com/ttacon/chalk"
|
|
"go.temporal.io/sdk/converter"
|
|
"go.temporal.io/sdk/workflow"
|
|
)
|
|
|
|
func RequestProcessingWorkflow(ctx workflow.Context) error {
|
|
logger := workflow.GetLogger(ctx)
|
|
execution := workflow.GetInfo(ctx).WorkflowExecution
|
|
|
|
ao := workflow.ActivityOptions{
|
|
StartToCloseTimeout: 10 * time.Second,
|
|
HeartbeatTimeout: 2 * time.Second,
|
|
}
|
|
ctx1 := workflow.WithActivityOptions(ctx, ao)
|
|
|
|
info := workflow.GetInfo(ctx1)
|
|
|
|
// Workflow Executions started by a Schedule have the following additional properties appended to their search attributes
|
|
scheduledByIDPayload := info.SearchAttributes.IndexedFields["TemporalScheduledById"]
|
|
var scheduledByID string
|
|
err := converter.GetDefaultDataConverter().FromPayload(scheduledByIDPayload, &scheduledByID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
startTimePayload := info.SearchAttributes.IndexedFields["TemporalScheduledStartTime"]
|
|
var startTime time.Time
|
|
err = converter.GetDefaultDataConverter().FromPayload(startTimePayload, &startTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
logger.Warn(chalk.Red.String(), "RequestProcessingWorkflow started.", scheduledByIDPayload, startTime, chalk.Reset)
|
|
//
|
|
|
|
var dueRequestIDs []int64
|
|
err = workflow.ExecuteActivity(ctx1, DBSelectOutdatedRequests).Get(ctx1, &dueRequestIDs)
|
|
if err != nil {
|
|
workflow.GetLogger(ctx).Error("schedule workflow failed.", "Error", err)
|
|
return err
|
|
}
|
|
|
|
if len(dueRequestIDs) > 0 {
|
|
logger.Info(fmt.Sprintf("Processing %d Requests", len(dueRequestIDs)))
|
|
childID := fmt.Sprintf("FetchRequestContentsWorkflow:%v", execution.RunID)
|
|
cwo := workflow.ChildWorkflowOptions{
|
|
WorkflowID: childID,
|
|
}
|
|
ctx2 := workflow.WithChildOptions(ctx1, cwo)
|
|
var result string
|
|
err := workflow.ExecuteChildWorkflow(ctx2, FetchRequestContentsWorkflow, dueRequestIDs).Get(ctx2, &result)
|
|
if err != nil {
|
|
logger.Error("Parent execution received child execution failure.", "Error", err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
// SampleFileProcessingWorkflow workflow definition
|
|
func SampleFileProcessingWorkflow(ctx workflow.Context, fileName string) (err error) {
|
|
ao := workflow.ActivityOptions{
|
|
StartToCloseTimeout: time.Minute,
|
|
HeartbeatTimeout: 2 * time.Second, // such a short timeout to make sample fail over very fast
|
|
RetryPolicy: &temporal.RetryPolicy{
|
|
InitialInterval: time.Second,
|
|
BackoffCoefficient: 2.0,
|
|
MaximumInterval: time.Minute,
|
|
},
|
|
}
|
|
ctx = workflow.WithActivityOptions(ctx, ao)
|
|
|
|
// Retry the whole sequence from the first activity on any error
|
|
// to retry it on a different host. In a real application it might be reasonable to
|
|
// retry individual activities as well as the whole sequence discriminating between different types of errors.
|
|
// See the retryactivity sample for a more sophisticated retry implementation.
|
|
for i := 1; i < 5; i++ {
|
|
err = processFile(ctx, fileName)
|
|
if err == nil {
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
workflow.GetLogger(ctx).Error("Workflow failed.", "Error", err.Error())
|
|
} else {
|
|
workflow.GetLogger(ctx).Info("Workflow completed.")
|
|
}
|
|
return err
|
|
}
|
|
|
|
func processFile(ctx workflow.Context, fileName string) (err error) {
|
|
so := &workflow.SessionOptions{
|
|
CreationTimeout: time.Minute,
|
|
ExecutionTimeout: time.Minute,
|
|
}
|
|
sessionCtx, err := workflow.CreateSession(ctx, so)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer workflow.CompleteSession(sessionCtx)
|
|
|
|
var downloadedName string
|
|
var a *Activities
|
|
err = workflow.ExecuteActivity(sessionCtx, a.DownloadFileActivity, fileName).Get(sessionCtx, &downloadedName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var processedFileName string
|
|
err = workflow.ExecuteActivity(sessionCtx, a.ProcessFileActivity, downloadedName).Get(sessionCtx, &processedFileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = workflow.ExecuteActivity(sessionCtx, a.UploadFileActivity, processedFileName).Get(sessionCtx, nil)
|
|
return err
|
|
}
|
|
*/
|
|
|
|
func FetchRequestContentsWorkflow(ctx workflow.Context, leftoverRequests []int64) error {
|
|
logger := workflow.GetLogger(ctx)
|
|
ao := workflow.ActivityOptions{
|
|
StartToCloseTimeout: 10 * time.Second,
|
|
HeartbeatTimeout: 2 * time.Second,
|
|
}
|
|
ctx1 := workflow.WithActivityOptions(ctx, ao)
|
|
so := &workflow.SessionOptions{
|
|
CreationTimeout: time.Minute,
|
|
ExecutionTimeout: 15 * time.Minute,
|
|
}
|
|
logger.Warn(chalk.Green.String(), "FetchRequestContentsWorkflow started.", chalk.Reset)
|
|
sessionCtx, err := workflow.CreateSession(ctx1, so)
|
|
if err != nil {
|
|
logger.Error(chalk.Green.String(), "FetchRequestContentsWorkflow started.", err, chalk.Reset)
|
|
return err
|
|
}
|
|
defer workflow.CompleteSession(sessionCtx)
|
|
|
|
if len(leftoverRequests) > 0 {
|
|
var workingRequest db.Request
|
|
err := workflow.ExecuteActivity(sessionCtx, DBLoadRequestEntry, leftoverRequests[0]).Get(sessionCtx, &workingRequest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = workflow.ExecuteActivity(sessionCtx, FetchRequestContents, workingRequest).Get(sessionCtx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = workflow.ExecuteActivity(sessionCtx, DBUpdateRequestLastChecked, workingRequest).Get(sessionCtx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(leftoverRequests) > 1 {
|
|
return workflow.NewContinueAsNewError(ctx1, FetchRequestContentsWorkflow, leftoverRequests[1:])
|
|
} else {
|
|
logger.Info("Child workflow completed.")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func SampleParentWorkflow(ctx workflow.Context) error {
|
|
logger := workflow.GetLogger(ctx)
|
|
execution := workflow.GetInfo(ctx).WorkflowExecution
|
|
// Parent Workflows can choose to specify Ids for child executions.
|
|
// Make sure Ids are unique for each execution.
|
|
// Do not specify if you want the Temporal Server to generate a unique ID for the child execution.
|
|
childID := fmt.Sprintf("child_workflow:%v", execution.RunID)
|
|
cwo := workflow.ChildWorkflowOptions{
|
|
WorkflowID: childID,
|
|
}
|
|
ctx = workflow.WithChildOptions(ctx, cwo)
|
|
var result string
|
|
err := workflow.ExecuteChildWorkflow(ctx, SampleChildWorkflow, 0, 5).Get(ctx, &result)
|
|
if err != nil {
|
|
logger.Error("Parent execution received child execution failure.", "Error", err)
|
|
return err
|
|
}
|
|
|
|
logger.Info("Parent execution completed.", "Result", result)
|
|
return nil
|
|
}
|
|
func SampleChildWorkflow(ctx workflow.Context, totalCount, runCount int) (string, error) {
|
|
logger := workflow.GetLogger(ctx)
|
|
|
|
logger.Info(fmt.Sprintf("Processing %d Request", totalCount))
|
|
logger.Info("Child workflow execution started.")
|
|
if runCount <= 0 {
|
|
logger.Error("Invalid valid for run count.", "RunCount", runCount)
|
|
return "", errors.New("invalid run count")
|
|
}
|
|
workflow.Sleep(ctx, 5*time.Second)
|
|
totalCount++
|
|
runCount--
|
|
if runCount == 0 {
|
|
result := fmt.Sprintf("Child workflow execution completed after %v runs", totalCount)
|
|
logger.Info("Child workflow completed.", "Result", result)
|
|
return result, nil
|
|
}
|
|
|
|
logger.Info("Child workflow starting new run.", "RunCount", runCount, "TotalCount", totalCount)
|
|
return "", workflow.NewContinueAsNewError(ctx, SampleChildWorkflow, totalCount, runCount)
|
|
}
|