Skip to content

Commit

Permalink
feat(orchestration): Provide hint in workflows error (#9400)
Browse files Browse the repository at this point in the history
* feat(orchestration): Provide hint in workflows error

* remove log

* fix tests

* improve stack

* fix type

* formatting

---------

Co-authored-by: Riqwan Thamir <[email protected]>
  • Loading branch information
adrien2p and riqwan authored Oct 2, 2024
1 parent 7ce9121 commit 0262962
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,16 @@ export class TransactionOrchestrator extends EventEmitter {

step.changeState(TransactionStepState.TIMEOUT)

if (error?.stack) {
const workflowId = transaction.modelId
const stepAction = step.definition.action
const sourcePath = transaction.getFlow().metadata?.sourcePath
const sourceStack = sourcePath
? `\n⮑ \sat ${sourcePath}: [${workflowId} -> ${stepAction} (${TransactionHandlerType.INVOKE})]`
: `\n⮑ \sat [${workflowId} -> ${stepAction} (${TransactionHandlerType.INVOKE})]`
error.stack += sourceStack
}

transaction.addError(
step.definition.action!,
TransactionHandlerType.INVOKE,
Expand Down Expand Up @@ -602,13 +612,21 @@ export class TransactionOrchestrator extends EventEmitter {
step.changeStatus(TransactionStepStatus.PERMANENT_FAILURE)

if (!isTimeout) {
transaction.addError(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
error
)
const handlerType = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE

if (error?.stack) {
const workflowId = transaction.modelId
const stepAction = step.definition.action
const sourcePath = transaction.getFlow().metadata?.sourcePath
const sourceStack = sourcePath
? `\n⮑ \sat ${sourcePath}: [${workflowId} -> ${stepAction} (${TransactionHandlerType.INVOKE})]`
: `\n⮑ \sat [${workflowId} -> ${stepAction} (${TransactionHandlerType.INVOKE})]`
error.stack += sourceStack
}

transaction.addError(step.definition.action!, handlerType, error)
}

if (!step.isCompensating()) {
Expand Down
1 change: 1 addition & 0 deletions packages/core/orchestration/src/transaction/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ export type TransactionFlow = {
metadata?: {
eventGroupId?: string
parentIdempotencyKey?: string
sourcePath?: string
[key: string]: unknown
}
hasAsyncSteps: boolean
Expand Down
3 changes: 3 additions & 0 deletions packages/core/workflows-sdk/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ function createContextualWorkflowRunner<
dataPreparation?: (data: TData) => Promise<unknown>
options?: {
wrappedInput?: boolean
sourcePath?: string
}
container?: LoadedModule[] | MedusaContainer
}): Omit<
Expand Down Expand Up @@ -93,6 +94,7 @@ function createContextualWorkflowRunner<
const flowMetadata = {
eventGroupId,
parentStepIdempotencyKey,
sourcePath: options?.sourcePath,
}

const args = [
Expand Down Expand Up @@ -334,6 +336,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
dataPreparation?: (data: TData) => Promise<unknown>,
options?: {
wrappedInput?: boolean
sourcePath?: string
}
): MainExportedWorkflow<TData, TResult> => {
function exportedWorkflow<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import {
} from "@medusajs/orchestration"
import { IEventBusModuleService } from "@medusajs/types"
import {
Modules,
composeMessage,
createMedusaContainer,
Modules,
promiseAll,
} from "@medusajs/utils"
import { asValue } from "awilix"
import {
StepResponse,
WorkflowResponse,
createStep,
createWorkflow,
parallelize,
StepResponse,
transform,
WorkflowResponse,
} from ".."
import { MedusaWorkflow } from "../../../medusa-workflow"
import { createHook } from "../create-hook"
Expand Down Expand Up @@ -1903,7 +1903,7 @@ describe("Workflow composer", function () {
action: "step1",
handlerType: "invoke",
error: expect.objectContaining({
message: "invoke fail",
message: "invoke fail",
}),
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import {
WorkflowManager,
} from "@medusajs/orchestration"
import { LoadedModule, MedusaContainer } from "@medusajs/types"
import { OrchestrationUtils, isString } from "@medusajs/utils"
import {
getCallerFilePath,
isString,
OrchestrationUtils,
} from "@medusajs/utils"
import { ulid } from "ulid"
import { exportWorkflow } from "../../helper"
import { createStep } from "./create-step"
Expand Down Expand Up @@ -34,7 +38,7 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
* @returns The created workflow. You can later execute the workflow by invoking it, then using its `run` method.
*
* @example
* import {
* import {
* createWorkflow,
* WorkflowResponse
* } from "@medusajs/framework/workflows-sdk"
Expand All @@ -50,7 +54,7 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
* }
*
* const myWorkflow = createWorkflow(
* "my-workflow",
* "my-workflow",
* (input: WorkflowInput) => {
* // Everything here will be executed and resolved later
* // during the execution. Including the data access.
Expand Down Expand Up @@ -92,6 +96,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
input: WorkflowData<TData>
) => void | WorkflowResponse<TResult, THooks>
): ReturnWorkflow<TData, TResult, THooks> {
const fileSourcePath = getCallerFilePath() as string
const name = isString(nameOrConfig) ? nameOrConfig : nameOrConfig.name
const options = isString(nameOrConfig) ? {} : nameOrConfig

Expand Down Expand Up @@ -153,6 +158,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
undefined,
{
wrappedInput: true,
sourcePath: fileSourcePath,
}
)

Expand Down

0 comments on commit 0262962

Please sign in to comment.