View in #prefect-community on Slack
@Nick_Hart: Hi, I’m trying to create a conditional Flow of Flows that runs a flow and then based on that flow’s final state either runs flow_if_success
or flow_if_failure
. Basically, I want to know how I can take the final state signal from conditional-flow
and decide whether I want to run flow_if_success
or flow_if_failure
. I was having trouble matching the task signal with a value so that the case works properly. I was hoping you would be able to help me out. Would I need to use the get_task_run_result
or is there a better way I can just grab the task result? Here is a snippet of my current test code:
var1 = Parameter("var1", default = 4)
var2 = Parameter("var2", default = 12)
conditional_id = StartFlowRun(flow_name="Conditional-Flow", project_name="Test", wait=True)
flow_if_success_id = StartFlowRun(flow_name="Flow_if_success", project_name="Test", wait=True)
flow_if_failure_id = StartFlowRun(flow_name="Flow_if_failure", project_name="Test", wait=True)
# Idea: Conditional flow runs first. If it succeeds with Success signal, run flow_if_success, if it fails with failure signal, run flow_if_failure
with Flow("Conditional FoF") as parent_flow:
conditional_run = conditional_id()
with case (conditional_run, Success): #Conditional_run never matches Success even if successful task run
flow_if_success_run = flow_if_success_id(parameters=dict(num=var1))
with case (conditional_run, Failed):
flow_if_failure_run = flow_if_failure_id(parameters=dict(number=var2))
@Anna_Geller: You would need to switch to create_flow_run + wait_for_flow_run and include a case task to implement this condition. Here is a bit different use case, but you may use a similar syntax and building blocks: Can I run a flow-of-flows that triggers each child flow on a different machine?
@Nick_Hart: Alright I’ll try this out. Thank you!
@Anna_Geller: and if you have any questions about it, let us know, happy to provide more examples when needed
@Nick_Hart: So I’m wondering is there a way to not have a case task in my parent flow and just have a way to match the cond_flow final signal to something. Basically the problem I get is that it skips the case because of the following output:
SKIP signal raised: SKIP('Provided value "FlowRunView(flow_run_id=\'d0bc26fe-9693-4890-832e-182d5035a90e\', name=\'Conditional-Flow_Run_2022-02-07 19:22:47.235833\', state=<Success: "All reference tasks succeeded.">, labels=<BoxList: []>, cached_task_runs=0)" did not match "<class \'prefect.engine.state.Success\'>"')
@Anna_Geller: to get a state from a FlowRunView, you need to add .state
Example:
with Flow("parent_flow") as flow:
infra = check_the_infrastructure()
with case(infra, "Docker with label machine-1"):
child_flow_run_id = create_flow_run(
flow_name="child_flow_name",
run_config=DockerRun(
labels=["machine-1"]
), # with a specific condition like image
)
child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True
)
child_flowrunview.state # state of the task you can use in some downstream task to determine what should happen next based on this task
@Nick_Hart: I was trying that before but it gives me AttributeError: 'FunctionTask' object has no attribute 'state'
error. Is there something I need to import to get that to work?
@Anna_Geller: Can you share your flow?
@Nick_Hart:
import prefect
from prefect import Flow, Parameter, case
from prefect.engine.state import Failed, Success
from prefect.storage import Docker
from prefect.tasks.prefect import RenameFlowRun, create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
import datetime
logger = prefect.context.get("logger")
var1 = Parameter("var1", default = 4)
var2 = Parameter("var2", default = 12)
def renameFlowRun(obj, new_state, old_state):
d = datetime.datetime.now() #
if new_state.is_running():
param_value = "Conditional_FoF_{}".format(d)
RenameFlowRun().run(flow_run_name=f"{param_value}")
return
with Flow("Conditional FoF", state_handlers=[renameFlowRun]) as parent_flow:
conditional_id = create_flow_run(flow_name="Conditional-Flow", project_name="Test")
conditional_run = wait_for_flow_run(conditional_id, raise_final_state=True, stream_logs=True)
<http://logger.info|logger.info>("Conditional_task State: "+str(conditional_run.state))
with case (conditional_run.state, Success):
flow_if_success_id = create_flow_run(flow_name="Flow_if_success", project_name="Test", parameters=dict(num=var1))
flow_if_success_run = wait_for_flow_run(flow_if_success_id, raise_final_state=True, stream_logs=True)
with case (conditional_run.state, Failed):
flow_if_failure_id = create_flow_run(flow_name="Flow_if_failure", project_name="Test", parameters=dict(num=var2))
flow_if_failure_run = wait_for_flow_run(flow_if_failure_id, raise_final_state=True, stream_logs=True)
parent_flow.executor = LocalDaskExecutor()
parent_flow.storage = Docker(image_name="prefect/conditionalfof", image_tag="7feb2022", files={"/home/test/Documents/conditional-fof.py" : "/usr/local/share/prefect/conditional-fof.py"}, stored_as_script=True, local_image=True, path="/usr/local/share/prefect/conditional-fof.py", prefect_directory="/usr/local/share/prefect")
if __name__ == "__main__":
parent_flow.run()
@Anna_Geller: This is not as straightforward so I can totally understand why this is difficult to configure. Here is one idea of how you can approach this:
import prefect
from prefect import Flow, Parameter, task, Task
from prefect.engine.state import Success
from prefect.storage import Docker
from prefect.tasks.prefect import RenameFlowRun, create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
import datetime
def rename_flow_run(obj, new_state, old_state):
d = datetime.datetime.now()
if new_state.is_running():
param_value = "Conditional_FoF_{}".format(d)
RenameFlowRun().run(flow_run_name=f"{param_value}")
return
@task
def trigger_conditional_flow_runs(flow_run_view):
if isinstance(flow_run_view.state, Success):
flow_if_success_id = create_flow_run(
flow_name="Flow_if_success",
project_name="Test",
parameters=dict(num=prefect.context["parameters"].get("var1")),
).run()
flow_if_success_run = wait_for_flow_run(
flow_if_success_id, raise_final_state=True, stream_logs=True
).run()
else:
flow_if_failure_id = create_flow_run(
flow_name="Flow_if_failure",
project_name="Test",
parameters=dict(num=prefect.context["parameters"].get("var2")),
)
flow_if_failure_run = wait_for_flow_run(
flow_if_failure_id, raise_final_state=True, stream_logs=True
)
with Flow(
"Conditional FoF",
state_handlers=[rename_flow_run],
storage=Docker(
image_name="prefect/conditionalfof",
image_tag="7feb2022",
files={
"/home/test/Documents/conditional-fof.py": "/usr/local/share/prefect/conditional-fof.py"
},
stored_as_script=True,
local_image=True,
path="/usr/local/share/prefect/conditional-fof.py",
prefect_directory="/usr/local/share/prefect",
),
executor=LocalDaskExecutor(),
) as parent_flow:
var1 = Parameter("var1", default=4)
var2 = Parameter("var2", default=12)
conditional_id = create_flow_run(flow_name="Conditional-Flow", project_name="Test")
conditional_run = wait_for_flow_run(
conditional_id, raise_final_state=True, stream_logs=True
)
trigger_conditional_flow_runs(conditional_run)
parent_flow.add_task(var1)
parent_flow.add_task(var2)
if __name__ == "__main__":
parent_flow.visualize()
Does this pattern work for you?
@Nick_Hart: It looks like it should, but for some reason it gave me ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>. This often means you called a task outside a
with Flow(…)block. If you're trying to run this task outside of a Flow context, you need to call
create_flow_run.run(…)`` and I tried putting .run() in different spots in the trigger conditional flows task but it didn’t seem to help. I’ll keep trying again tomorrow morning. Thank you for all your help i appreciate it! I’ll let you know if I get it working
@Anna_Geller: @Nick_Hart with a help from @Kevin_Kho I found a much better solution! Here is an example flow you may try.
from prefect import Flow, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.triggers import all_successful, any_failed
from random import random
@task(log_stdout=True)
def say_hi(flow_name):
print(f"Hi from {flow_name}")
@task(log_stdout=True)
def randomly_fail():
nr = random()
print(f"Number: {nr}")
if nr > 0.5:
raise ValueError("Big number!")
with Flow("child1") as child1:
say_hello = say_hi("child1")
randomly_fail(upstream_tasks=[say_hello])
with Flow("child2") as child2:
say_hi("child2")
with Flow("child3") as child3:
say_hi("child3")
with Flow("conditional_flow_of_flows",) as parent_flow:
hi = say_hi("Parent flow")
child1_id = create_flow_run(
flow_name="child1",
project_name="xyz",
task_args=dict(name="Main Flow Run"),
upstream_tasks=[hi],
)
conditional_run = wait_for_flow_run(
child1_id,
raise_final_state=True,
stream_logs=True,
task_args=dict(name="Condition"),
)
# on success of conditional_run:
child2_id = create_flow_run(
flow_name="child2",
project_name="xyz",
upstream_tasks=[conditional_run],
task_args=dict(name="Flow run on Success", trigger=all_successful),
)
conditional_run_child2 = wait_for_flow_run(
child2_id, raise_final_state=True, stream_logs=True
)
# on failure of conditional_run:
child3_id = create_flow_run(
flow_name="child3",
project_name="xyz",
upstream_tasks=[conditional_run],
task_args=dict(name="Flow run on Failure", trigger=any_failed),
)
conditional_run_child3 = wait_for_flow_run(
child3_id, raise_final_state=True, stream_logs=True
)
parent_flow.set_reference_tasks([hi])
if __name__ == "__main__":
# parent_flow.visualize()
child1.register("xyz")
child2.register("xyz")
child3.register("xyz")
parent_flow.register("xyz")
It works exactly as you wanted - based on a condition, it runs the respective child flow: