-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: enhanced processor system with retry, workflow orchestration, and tripwire improvements #10947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: enhanced processor system with retry, workflow orchestration, and tripwire improvements #10947
Changes from all commits
7df83b6
12e5d2e
eeaf923
6de603b
946e264
0e1c99a
b357213
fd20038
d216181
3672baf
43e5989
6fa1984
73d32e5
6dbd64c
de5899d
31f1a1b
e357357
0242c3b
c1d45d0
382d3d1
8a1545c
4f8fe9a
d1fe0dc
208ece0
fbb7694
58d1208
2b96ecf
c93d7fd
4250136
65a9ef7
778a49e
a358e2e
b49d156
f78f05b
644420f
273dd3b
7a043eb
3e4653a
a9da2a9
5efae23
1d95c64
f404ea7
710d747
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| --- | ||
| '@mastra/playground-ui': patch | ||
| '@mastra/client-js': patch | ||
| '@mastra/ai-sdk': patch | ||
| '@mastra/react': patch | ||
| '@mastra/server': patch | ||
| '@mastra/inngest': patch | ||
| --- | ||
|
|
||
| Support new Workflow tripwire run status. Tripwires that are thrown from within a workflow will now bubble up and return a graceful state with information about tripwires. | ||
|
|
||
| When a workflow contains an agent step that triggers a tripwire, the workflow returns with `status: 'tripwire'` and includes tripwire details: | ||
|
|
||
| ```typescript showLineNumbers copy | ||
| const run = await workflow.createRun(); | ||
| const result = await run.start({ inputData: { message: 'Hello' } }); | ||
|
|
||
| if (result.status === 'tripwire') { | ||
| console.log('Workflow terminated by tripwire:', result.tripwire?.reason); | ||
| console.log('Processor ID:', result.tripwire?.processorId); | ||
| console.log('Retry requested:', result.tripwire?.retry); | ||
| } | ||
| ``` | ||
|
|
||
| Adds new UI state for tripwire in agent chat and workflow UI. | ||
|
|
||
| This is distinct from `status: 'failed'` which indicates an unexpected error. A tripwire status means a processor intentionally stopped execution (e.g., for content moderation). | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| --- | ||
| '@mastra/core': patch | ||
| --- | ||
|
|
||
| Multiple Processor improvements including: | ||
|
|
||
| - Workflows can now return tripwires, they bubble up from agents that return tripwires in a step | ||
| - You can write processors as workflows using the existing Workflow primitive, every processor flow is now a workflow. | ||
| - tripwires that you throw can now return additional information including ability to retry the step | ||
| - New processor method `processOutputStep` added which runs after every step. | ||
|
|
||
| **What's new:** | ||
|
|
||
| **1. Retry mechanism with LLM feedback** - Processors can now request retries with feedback that gets sent back to the LLM: | ||
|
|
||
| ```typescript | ||
| processOutputStep: async ({ text, abort, retryCount }) => { | ||
| if (isLowQuality(text)) { | ||
| abort('Response quality too low', { retry: true, metadata: { score: 0.6 } }); | ||
| } | ||
| return []; | ||
| }; | ||
| ``` | ||
|
|
||
| Configure with `maxProcessorRetries` (default: 3). Rejected steps are preserved in `result.steps[n].tripwire`. Retries are only available in `processOutputStep` and `processInputStep`. It will replay the step with additional context added. | ||
|
|
||
| **2. Workflow orchestration for processors** - Processors can now be composed using workflow primitives: | ||
|
|
||
| ```typescript | ||
| import { createStep, createWorkflow } from '@mastra/core/workflows'; | ||
| import { | ||
| ProcessorStepSchema, | ||
| } from '@mastra/core/processors'; | ||
|
|
||
| const moderationWorkflow = createWorkflow({ id: 'moderation', inputSchema: ProcessorStepSchema, outputSchema: ProcessorStepSchema }) | ||
| .then(createStep(new lengthValidator({...}))) | ||
| .parallel([createStep(new piiDetector({...}), createStep(new toxicityChecker({...}))]) | ||
| .commit(); | ||
|
|
||
| const agent = new Agent({ inputProcessors: [moderationWorkflow] }); | ||
| ``` | ||
|
|
||
| Every processor array that gets passed to an agent gets added as a workflow | ||
| <img width="614" height="673" alt="image" src="https://github.com/user-attachments/assets/0d79f1fd-8fca-4d86-8b45-22fddea984a8" /> | ||
|
|
||
| **3. Extended tripwire API** - `abort()` now accepts options for retry control and typed metadata: | ||
|
|
||
| ```typescript | ||
| abort('reason', { retry: true, metadata: { score: 0.8, category: 'quality' } }); | ||
| ``` | ||
|
|
||
| **4. New `processOutputStep` method** - Per-step output processing with access to step number, finish reason, tool calls, and retry count. | ||
|
|
||
| **5. Workflow tripwire status** - Workflows now have a `'tripwire'` status distinct from `'failed'`, properly bubbling up processor rejections. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,7 +42,9 @@ export const mapWorkflowStreamChunkToWatchResult = ( | |
| ? { result: lastStep?.output } | ||
| : finalStatus === 'failed' && lastStep?.status === 'failed' | ||
| ? { error: lastStep?.error } | ||
| : {}), | ||
| : finalStatus === 'tripwire' && chunk.payload.tripwire | ||
| ? { tripwire: chunk.payload.tripwire } | ||
| : {}), | ||
|
Comment on lines
+45
to
+47
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: fd -e ts -e tsx packages/core/src | head -20Repository: mastra-ai/mastra Length of output: 433 🏁 Script executed: # Find ChunkType definition
rg -nP --type=ts 'type\s+ChunkType|interface\s+ChunkType' --max-count=10Repository: mastra-ai/mastra Length of output: 186 🏁 Script executed: # Search for ChunkType and tripwire definitions more broadly
rg -nP --type=ts 'ChunkType.*=|payload.*tripwire' --max-count=20Repository: mastra-ai/mastra Length of output: 22628 🏁 Script executed: cat -n packages/core/src/stream/types.ts | sed -n '630,660p'Repository: mastra-ai/mastra Length of output: 1659 🏁 Script executed: # Find TypedChunkType definition
cat -n packages/core/src/stream/types.ts | sed -n '639,645p'Repository: mastra-ai/mastra Length of output: 437 🏁 Script executed: # Search for payload structure and tripwire in the types file
rg -n 'payload.*tripwire|type.*Payload|interface.*Payload' packages/core/src/stream/types.tsRepository: mastra-ai/mastra Length of output: 6248 🏁 Script executed: cat -n packages/core/src/stream/types.ts | sed -n '328,340p'Repository: mastra-ai/mastra Length of output: 591 🏁 Script executed: # Also check StepTripwireData which is referenced in BaseChunkType
rg -n 'type StepTripwireData|interface StepTripwireData' packages/core/src/stream/types.tsRepository: mastra-ai/mastra Length of output: 99 🏁 Script executed: cat -n client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.ts | sed -n '30,60p'Repository: mastra-ai/mastra Length of output: 1154 🏁 Script executed: # Also check around line 150-173 mentioned in the original comment
cat -n client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.ts | sed -n '140,180p'Repository: mastra-ai/mastra Length of output: 1426 🏁 Script executed: rg -n 'WorkflowFinishPayload|workflow-finish' packages/core/src/stream/types.ts | head -20Repository: mastra-ai/mastra Length of output: 94 🏁 Script executed: # Find the payload structure for workflow-finish
cat -n packages/core/src/stream/types.ts | sed -n '415,435p'Repository: mastra-ai/mastra Length of output: 608 🏁 Script executed: cat -n packages/core/src/stream/types.ts | sed -n '560,575p'Repository: mastra-ai/mastra Length of output: 546 🏁 Script executed: # Also search for any payload type that has tripwire field
rg -n 'tripwire:' packages/core/src/stream/types.tsRepository: mastra-ai/mastra Length of output: 42 🏁 Script executed: cat -n packages/core/src/stream/types.ts | sed -n '564,585p'Repository: mastra-ai/mastra Length of output: 718 🏁 Script executed: # Search for tripwire in workflow-finish context more broadly
rg -B5 -A5 'workflow-finish' packages/core/src/stream/types.tsRepository: mastra-ai/mastra Length of output: 304 🏁 Script executed: rg -n 'workflow-finish.*tripwire|tripwire.*workflow-finish' client-sdks/react/src/lib/ai-sdk/utils/Repository: mastra-ai/mastra Length of output: 42 🏁 Script executed: # Check the test file for toUIMessage
rg -B3 -A3 'workflow-finish' client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.test.ts | head -40Repository: mastra-ai/mastra Length of output: 1064 🏁 Script executed: rg -n "workflowStatus.*tripwire|'tripwire'" client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.test.tsRepository: mastra-ai/mastra Length of output: 187 🏁 Script executed: # Find WorkflowRunStatus type definition
rg -n 'type WorkflowRunStatus|enum WorkflowRunStatus' packages/core/src/Repository: mastra-ai/mastra Length of output: 132 🏁 Script executed: # Search for tripwire in workflow context
rg -n "workflowStatus.*=.*'tripwire'" packages/core/src/Repository: mastra-ai/mastra Length of output: 42 🏁 Script executed: cat -n packages/core/src/workflows/types.ts | sed -n '215,225p'Repository: mastra-ai/mastra Length of output: 298 🏁 Script executed: # Also check the tripwire test cases in toUIMessage.test.ts
rg -B5 -A10 "type: 'tripwire'" client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.test.ts | head -50Repository: mastra-ai/mastra Length of output: 1036 🏁 Script executed: rg -B5 -A15 "workflow-finish.*tripwire|tripwire.*workflow-finish" client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.test.tsRepository: mastra-ai/mastra Length of output: 42 🏁 Script executed: # Search more broadly for workflow-finish with tripwire in tests
rg -B10 "workflowStatus.*'tripwire'|'tripwire'.*workflow" client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.test.tsRepository: mastra-ai/mastra Length of output: 42 🏁 Script executed: cat -n client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.ts | sed -n '34,50p'Repository: mastra-ai/mastra Length of output: 803 🏁 Script executed: # Check if there are any recent changes to workflow-finish handling
git log --oneline -n 20 -- client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.tsRepository: mastra-ai/mastra Length of output: 73 🏁 Script executed: # Check git diff to see what was changed in this PR
git diff HEAD~1 client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.ts | head -100Repository: mastra-ai/mastra Length of output: 246 🏁 Script executed: # Look at the workflow-finish in the actual code to see what metadata contains
rg -B5 -A5 'workflowStatus.*tripwire' packages/core/src/Repository: mastra-ai/mastra Length of output: 42 🏁 Script executed: # Check if there's any tripwire data being added to workflow-finish metadata
rg -n 'workflow-finish' packages/core/src/workflows/ | grep -i tripwireRepository: mastra-ai/mastra Length of output: 42 🏁 Script executed: # Look at step-finish handling which might show the pattern
cat -n client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.ts | sed -n '60,120p'Repository: mastra-ai/mastra Length of output: 2088 🏁 Script executed: # Get the full workflow-finish payload structure including all fields
cat -n packages/core/src/stream/types.ts | sed -n '564,580p'Repository: mastra-ai/mastra Length of output: 583 🏁 Script executed: # Check if tripwire was supposed to be added to WorkflowFinishPayload
rg -A20 "type: 'workflow-finish'" packages/core/src/stream/types.tsRepository: mastra-ai/mastra Length of output: 541 🏁 Script executed: # Verify what the actual current state of the file is
head -1 client-sdks/react/src/lib/ai-sdk/utils/toUIMessage.tsRepository: mastra-ai/mastra Length of output: 124 🏁 Script executed: # Verify the tripwire chunk type definition
rg -B2 -A2 "type: 'tripwire'" packages/core/src/stream/types.tsRepository: mastra-ai/mastra Length of output: 301 Remove the non-existent The Lines 45-46 should be simplified to: The condition 🤖 Prompt for AI Agents |
||
| }; | ||
| } | ||
|
|
||
|
|
@@ -146,19 +148,24 @@ export const toUIMessage = ({ chunk, conversation, metadata }: ToUIMessageArgs): | |
|
|
||
| switch (chunk.type) { | ||
| case 'tripwire': { | ||
| // Create a new assistant message | ||
| // Create a new assistant message with tripwire-specific metadata | ||
| const newMessage: MastraUIMessage = { | ||
| id: `tripwire-${chunk.runId + Date.now()}`, | ||
| role: 'assistant', | ||
| parts: [ | ||
| { | ||
| type: 'text', | ||
| text: chunk.payload.tripwireReason, | ||
| text: chunk.payload.reason, | ||
| }, | ||
| ], | ||
| metadata: { | ||
| ...metadata, | ||
| status: 'warning', | ||
| status: 'tripwire', | ||
| tripwire: { | ||
| retry: chunk.payload.retry, | ||
| tripwirePayload: chunk.payload.metadata, | ||
| processorId: chunk.payload.processorId, | ||
| }, | ||
| }, | ||
| }; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Read the full changeset file content cat -n .changeset/three-falcons-pump.mdRepository: mastra-ai/mastra
Length of output: 1356
🏁 Script executed:
Repository: mastra-ai/mastra
Length of output: 7300
🏁 Script executed:
Repository: mastra-ai/mastra
Length of output: 87
🏁 Script executed:
Repository: mastra-ai/mastra
Length of output: 461
🏁 Script executed:
Repository: mastra-ai/mastra
Length of output: 8621
🏁 Script executed:
Repository: mastra-ai/mastra
Length of output: 42
Add a link to the related issue or PR for context.
The frontmatter correctly lists all six packages, including
@mastra/inngest. The changes toworkflows/inngest/src/execution-engine.tsconfirm that the inngest package has legitimate tripwire support modifications (handling tripwire status and emitting workflow step results). However, per the coding guidelines, changeset files should include links to related issues or PRs when relevant. Consider adding a reference to issue #7923 or the associated PR.🤖 Prompt for AI Agents