You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
6.9 KiB
Go

9 months ago
package grabber
import (
"errors"
"fmt"
"time"
"git.cheetah.cat/archivinator-ng/workerclient/db"
"github.com/ttacon/chalk"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/workflow"
)
func RequestProcessingWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
execution := workflow.GetInfo(ctx).WorkflowExecution
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
HeartbeatTimeout: 2 * time.Second,
}
ctx1 := workflow.WithActivityOptions(ctx, ao)
info := workflow.GetInfo(ctx1)
// Workflow Executions started by a Schedule have the following additional properties appended to their search attributes
scheduledByIDPayload := info.SearchAttributes.IndexedFields["TemporalScheduledById"]
var scheduledByID string
err := converter.GetDefaultDataConverter().FromPayload(scheduledByIDPayload, &scheduledByID)
if err != nil {
return err
}
startTimePayload := info.SearchAttributes.IndexedFields["TemporalScheduledStartTime"]
var startTime time.Time
err = converter.GetDefaultDataConverter().FromPayload(startTimePayload, &startTime)
if err != nil {
return err
}
logger.Warn(chalk.Red.String(), "RequestProcessingWorkflow started.", scheduledByIDPayload, startTime, chalk.Reset)
//
var dueRequestIDs []int64
err = workflow.ExecuteActivity(ctx1, DBSelectOutdatedRequests).Get(ctx1, &dueRequestIDs)
if err != nil {
workflow.GetLogger(ctx).Error("schedule workflow failed.", "Error", err)
return err
}
if len(dueRequestIDs) > 0 {
logger.Info(fmt.Sprintf("Processing %d Requests", len(dueRequestIDs)))
childID := fmt.Sprintf("FetchRequestContentsWorkflow:%v", execution.RunID)
cwo := workflow.ChildWorkflowOptions{
WorkflowID: childID,
}
ctx2 := workflow.WithChildOptions(ctx1, cwo)
var result string
err := workflow.ExecuteChildWorkflow(ctx2, FetchRequestContentsWorkflow, dueRequestIDs).Get(ctx2, &result)
if err != nil {
logger.Error("Parent execution received child execution failure.", "Error", err)
return err
}
}
return nil
}
/*
// SampleFileProcessingWorkflow workflow definition
func SampleFileProcessingWorkflow(ctx workflow.Context, fileName string) (err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
HeartbeatTimeout: 2 * time.Second, // such a short timeout to make sample fail over very fast
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Retry the whole sequence from the first activity on any error
// to retry it on a different host. In a real application it might be reasonable to
// retry individual activities as well as the whole sequence discriminating between different types of errors.
// See the retryactivity sample for a more sophisticated retry implementation.
for i := 1; i < 5; i++ {
err = processFile(ctx, fileName)
if err == nil {
break
}
}
if err != nil {
workflow.GetLogger(ctx).Error("Workflow failed.", "Error", err.Error())
} else {
workflow.GetLogger(ctx).Info("Workflow completed.")
}
return err
}
func processFile(ctx workflow.Context, fileName string) (err error) {
so := &workflow.SessionOptions{
CreationTimeout: time.Minute,
ExecutionTimeout: time.Minute,
}
sessionCtx, err := workflow.CreateSession(ctx, so)
if err != nil {
return err
}
defer workflow.CompleteSession(sessionCtx)
var downloadedName string
var a *Activities
err = workflow.ExecuteActivity(sessionCtx, a.DownloadFileActivity, fileName).Get(sessionCtx, &downloadedName)
if err != nil {
return err
}
var processedFileName string
err = workflow.ExecuteActivity(sessionCtx, a.ProcessFileActivity, downloadedName).Get(sessionCtx, &processedFileName)
if err != nil {
return err
}
err = workflow.ExecuteActivity(sessionCtx, a.UploadFileActivity, processedFileName).Get(sessionCtx, nil)
return err
}
*/
func FetchRequestContentsWorkflow(ctx workflow.Context, leftoverRequests []int64) error {
logger := workflow.GetLogger(ctx)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
HeartbeatTimeout: 2 * time.Second,
}
ctx1 := workflow.WithActivityOptions(ctx, ao)
so := &workflow.SessionOptions{
CreationTimeout: time.Minute,
ExecutionTimeout: 15 * time.Minute,
}
logger.Warn(chalk.Green.String(), "FetchRequestContentsWorkflow started.", chalk.Reset)
sessionCtx, err := workflow.CreateSession(ctx1, so)
if err != nil {
logger.Error(chalk.Green.String(), "FetchRequestContentsWorkflow started.", err, chalk.Reset)
return err
}
defer workflow.CompleteSession(sessionCtx)
if len(leftoverRequests) > 0 {
var workingRequest db.Request
err := workflow.ExecuteActivity(sessionCtx, DBLoadRequestEntry, leftoverRequests[0]).Get(sessionCtx, &workingRequest)
if err != nil {
return err
}
err = workflow.ExecuteActivity(sessionCtx, FetchRequestContents, workingRequest).Get(sessionCtx, nil)
if err != nil {
return err
}
err = workflow.ExecuteActivity(sessionCtx, DBUpdateRequestLastChecked, workingRequest).Get(sessionCtx, nil)
if err != nil {
return err
}
}
if len(leftoverRequests) > 1 {
return workflow.NewContinueAsNewError(ctx1, FetchRequestContentsWorkflow, leftoverRequests[1:])
} else {
logger.Info("Child workflow completed.")
return nil
}
}
func SampleParentWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
execution := workflow.GetInfo(ctx).WorkflowExecution
// Parent Workflows can choose to specify Ids for child executions.
// Make sure Ids are unique for each execution.
// Do not specify if you want the Temporal Server to generate a unique ID for the child execution.
childID := fmt.Sprintf("child_workflow:%v", execution.RunID)
cwo := workflow.ChildWorkflowOptions{
WorkflowID: childID,
}
ctx = workflow.WithChildOptions(ctx, cwo)
var result string
err := workflow.ExecuteChildWorkflow(ctx, SampleChildWorkflow, 0, 5).Get(ctx, &result)
if err != nil {
logger.Error("Parent execution received child execution failure.", "Error", err)
return err
}
logger.Info("Parent execution completed.", "Result", result)
return nil
}
func SampleChildWorkflow(ctx workflow.Context, totalCount, runCount int) (string, error) {
logger := workflow.GetLogger(ctx)
logger.Info(fmt.Sprintf("Processing %d Request", totalCount))
logger.Info("Child workflow execution started.")
if runCount <= 0 {
logger.Error("Invalid valid for run count.", "RunCount", runCount)
return "", errors.New("invalid run count")
}
workflow.Sleep(ctx, 5*time.Second)
totalCount++
runCount--
if runCount == 0 {
result := fmt.Sprintf("Child workflow execution completed after %v runs", totalCount)
logger.Info("Child workflow completed.", "Result", result)
return result, nil
}
logger.Info("Child workflow starting new run.", "RunCount", runCount, "TotalCount", totalCount)
return "", workflow.NewContinueAsNewError(ctx, SampleChildWorkflow, totalCount, runCount)
}