package grabber import ( "errors" "fmt" "time" "git.cheetah.cat/archivinator-ng/workerclient/db" "github.com/ttacon/chalk" "go.temporal.io/sdk/converter" "go.temporal.io/sdk/workflow" ) func RequestProcessingWorkflow(ctx workflow.Context) error { logger := workflow.GetLogger(ctx) execution := workflow.GetInfo(ctx).WorkflowExecution ao := workflow.ActivityOptions{ StartToCloseTimeout: 10 * time.Second, HeartbeatTimeout: 2 * time.Second, } ctx1 := workflow.WithActivityOptions(ctx, ao) info := workflow.GetInfo(ctx1) // Workflow Executions started by a Schedule have the following additional properties appended to their search attributes scheduledByIDPayload := info.SearchAttributes.IndexedFields["TemporalScheduledById"] var scheduledByID string err := converter.GetDefaultDataConverter().FromPayload(scheduledByIDPayload, &scheduledByID) if err != nil { return err } startTimePayload := info.SearchAttributes.IndexedFields["TemporalScheduledStartTime"] var startTime time.Time err = converter.GetDefaultDataConverter().FromPayload(startTimePayload, &startTime) if err != nil { return err } logger.Warn(chalk.Red.String(), "RequestProcessingWorkflow started.", scheduledByIDPayload, startTime, chalk.Reset) // var dueRequestIDs []int64 err = workflow.ExecuteActivity(ctx1, DBSelectOutdatedRequests).Get(ctx1, &dueRequestIDs) if err != nil { workflow.GetLogger(ctx).Error("schedule workflow failed.", "Error", err) return err } if len(dueRequestIDs) > 0 { logger.Info(fmt.Sprintf("Processing %d Requests", len(dueRequestIDs))) childID := fmt.Sprintf("FetchRequestContentsWorkflow:%v", execution.RunID) cwo := workflow.ChildWorkflowOptions{ WorkflowID: childID, } ctx2 := workflow.WithChildOptions(ctx1, cwo) var result string err := workflow.ExecuteChildWorkflow(ctx2, FetchRequestContentsWorkflow, dueRequestIDs).Get(ctx2, &result) if err != nil { logger.Error("Parent execution received child execution failure.", "Error", err) return err } } return nil } /* // SampleFileProcessingWorkflow workflow definition func SampleFileProcessingWorkflow(ctx workflow.Context, fileName string) (err error) { ao := workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, HeartbeatTimeout: 2 * time.Second, // such a short timeout to make sample fail over very fast RetryPolicy: &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, }, } ctx = workflow.WithActivityOptions(ctx, ao) // Retry the whole sequence from the first activity on any error // to retry it on a different host. In a real application it might be reasonable to // retry individual activities as well as the whole sequence discriminating between different types of errors. // See the retryactivity sample for a more sophisticated retry implementation. for i := 1; i < 5; i++ { err = processFile(ctx, fileName) if err == nil { break } } if err != nil { workflow.GetLogger(ctx).Error("Workflow failed.", "Error", err.Error()) } else { workflow.GetLogger(ctx).Info("Workflow completed.") } return err } func processFile(ctx workflow.Context, fileName string) (err error) { so := &workflow.SessionOptions{ CreationTimeout: time.Minute, ExecutionTimeout: time.Minute, } sessionCtx, err := workflow.CreateSession(ctx, so) if err != nil { return err } defer workflow.CompleteSession(sessionCtx) var downloadedName string var a *Activities err = workflow.ExecuteActivity(sessionCtx, a.DownloadFileActivity, fileName).Get(sessionCtx, &downloadedName) if err != nil { return err } var processedFileName string err = workflow.ExecuteActivity(sessionCtx, a.ProcessFileActivity, downloadedName).Get(sessionCtx, &processedFileName) if err != nil { return err } err = workflow.ExecuteActivity(sessionCtx, a.UploadFileActivity, processedFileName).Get(sessionCtx, nil) return err } */ func FetchRequestContentsWorkflow(ctx workflow.Context, leftoverRequests []int64) error { logger := workflow.GetLogger(ctx) ao := workflow.ActivityOptions{ StartToCloseTimeout: 10 * time.Second, HeartbeatTimeout: 2 * time.Second, } ctx1 := workflow.WithActivityOptions(ctx, ao) so := &workflow.SessionOptions{ CreationTimeout: time.Minute, ExecutionTimeout: 15 * time.Minute, } logger.Warn(chalk.Green.String(), "FetchRequestContentsWorkflow started.", chalk.Reset) sessionCtx, err := workflow.CreateSession(ctx1, so) if err != nil { logger.Error(chalk.Green.String(), "FetchRequestContentsWorkflow started.", err, chalk.Reset) return err } defer workflow.CompleteSession(sessionCtx) if len(leftoverRequests) > 0 { var workingRequest db.Request err := workflow.ExecuteActivity(sessionCtx, DBLoadRequestEntry, leftoverRequests[0]).Get(sessionCtx, &workingRequest) if err != nil { return err } err = workflow.ExecuteActivity(sessionCtx, FetchRequestContents, workingRequest).Get(sessionCtx, nil) if err != nil { return err } err = workflow.ExecuteActivity(sessionCtx, DBUpdateRequestLastChecked, workingRequest).Get(sessionCtx, nil) if err != nil { return err } } if len(leftoverRequests) > 1 { return workflow.NewContinueAsNewError(ctx1, FetchRequestContentsWorkflow, leftoverRequests[1:]) } else { logger.Info("Child workflow completed.") return nil } } func SampleParentWorkflow(ctx workflow.Context) error { logger := workflow.GetLogger(ctx) execution := workflow.GetInfo(ctx).WorkflowExecution // Parent Workflows can choose to specify Ids for child executions. // Make sure Ids are unique for each execution. // Do not specify if you want the Temporal Server to generate a unique ID for the child execution. childID := fmt.Sprintf("child_workflow:%v", execution.RunID) cwo := workflow.ChildWorkflowOptions{ WorkflowID: childID, } ctx = workflow.WithChildOptions(ctx, cwo) var result string err := workflow.ExecuteChildWorkflow(ctx, SampleChildWorkflow, 0, 5).Get(ctx, &result) if err != nil { logger.Error("Parent execution received child execution failure.", "Error", err) return err } logger.Info("Parent execution completed.", "Result", result) return nil } func SampleChildWorkflow(ctx workflow.Context, totalCount, runCount int) (string, error) { logger := workflow.GetLogger(ctx) logger.Info(fmt.Sprintf("Processing %d Request", totalCount)) logger.Info("Child workflow execution started.") if runCount <= 0 { logger.Error("Invalid valid for run count.", "RunCount", runCount) return "", errors.New("invalid run count") } workflow.Sleep(ctx, 5*time.Second) totalCount++ runCount-- if runCount == 0 { result := fmt.Sprintf("Child workflow execution completed after %v runs", totalCount) logger.Info("Child workflow completed.", "Result", result) return result, nil } logger.Info("Child workflow starting new run.", "RunCount", runCount, "TotalCount", totalCount) return "", workflow.NewContinueAsNewError(ctx, SampleChildWorkflow, totalCount, runCount) }