I find reasoning about a Flow's final state confusing. #18342
Replies: 2 comments 1 reply
-
|
I don't know if it helps, but I am having similar issues #18373 I noticed that I had unexpected behaviours when a Now, what it looks like is doing the trick (fingers crossed 🤞) is to add a Something like this: Prefect 2 code: @flow()
def my_flow():
foo.map(...)to Prefect 3 code: from prefect.futures import wait
@flow()
def my_flow():
futures = foo.map(...)
wait(futures) |
Beta Was this translation helpful? Give feedback.
-
|
Appreciate the feedback, I totally get it. We made these design choices because Prefect over time has leaned more and more into a workflows-as-code philosophy where your Python code defines exactly what happens, including how failures propagate. When you use .submit(), you're explicitly choosing asynchronous execution and taking responsibility for handling the futures - this gives you fine-grained control over error handling, retry logic, and partial failure scenarios that many real-world workflows require. All distributed workflow engines require this explicit resolution of futures because implicit handling creates significant problems, particularly with memory management in long-running workflows with thousands of tasks. Integrating with these systems and layering in this implicit future resolution caused a lot of hidden complexity. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I've been working on migrating our code from Prefect 1 -> Prefect 2 -> Prefect 3. I'm about done with the final migration however I'm left confused about knowing what the final state of my flow will be when one of it's constituent tasks fails. I'm hoping to start a discussion sharing what I'm experiencing as a user; at minimum, this will serve as my rubber duck and give me a guide I can reference in the future.
Below, I've shown numerous flows which should all (IMHO) terminate in a
failedstate since one of the tasks it has executed raises aValueError. I should note that I'm new to theasyncworld and so it's perfectly conceivable that I'm just not groking something. And yes, I've read this and this doc about terminal state.I should also be explicit that my baseline expectation for the execution of flows are:
failed(this of course assumes no use ofraise_on_failure =Falseor anytry/exceptcode) - it's the flow's job to check error states, especially when a flow has a ton of tasks. This is why I generally don'treturnanything from my flow.My flow examples can be found below, flow methods are named either:
does_failX- when the final state of the flow isfailedas expected ✅should_failX- when the final state of the flow iscompletedas NOT expected 😠The tasks I'll be constructing my flows with are:
On to the flows ...
Nice and simple, my flow does what I expect and fails because:
Flow does not fail, I guess because the concurrent runner is executing it and doesn't raise the error "in the flow function"?
Adding in a return does produce the expected state because:
I feel like this could produce some false-negative situations in cases where my flow has a ton of tasks and a developer forgot to return all needed tasks.
Let's now look at a slightly more complex set of flows.
Fails as expected, presumably because all the work happens inside the flow.
Does not fail I guess for the same reason as
should_fail1. We fixed that flow by adding a return statements, let's do that here.Hmmm does not fail, why?
Let's now look at mapped versions of this flow.
Doesn't fail; let's add a return
Hmm that didn't fail either. Why? Let's ask marvin how to fix it.
Technically does fail and my flow executes all the work I expected, but the failure reason is not what I expect:
What if we use
mapinstead ofasyncio? I thought I had read docs that saidsubmit/mapdon't have an async interface, so I wasn't aware I could trymap. But @zzstoatzz suggested I try it:No dice 😢
Adding return seems to do the trick; again, I'm not a big fan of this because I have to track all my submitted/mapped tasks and make sure I return this. This could potentially lead to errors as the flow grows in task size or as it's modified with new tasks.
In those last few flow examples, I understand that I could call
result()on foos and this might? end up failing the flow, however in that scenario none of mybartasks have been executed, even if some of them could have been. I.e, I expect the flow-run to look like:not
Ok that was a lot! I really enjoy Prefect, but all of the above leaves me feeling less than confident when I author flows. I'm wondering if perhaps a remedy for this is more documentation around scenarios where a failed task will NOT fail a flow. Or a flow arg like
fail_flow_if_any_task_fails🤣Beta Was this translation helpful? Give feedback.
All reactions