Concurrent subflows with conditionnal execution at runtime

Hello everyone,

I’m learning about Prefect and trying to implement it into my current workflow.

I’m trying to run concurrent subflows with conditional execution at runtime. Indeed, some of the subflows I want concurrent may have to wait for another subflow to finish, but I only know that at runtime, after a json variable has been loaded.

So far I was only able to use a flow and tasks, to fit my workflow.
Here is my code (shortened for better understanding) :

from my_code import functionA, functionB, functionC, ParserFactory
from prefect import flow, Task

modules_json = {
    "moduleA": {
        "arg1": "foo",
        "arg2": "bar"
    },
    "moduleB": {
        "arg1": "foofoo",
        "arg2": "barbar"
    },
    "moduleC": {
        "arg1": "foobar",
        "arg2": "barfoo",
        "wait_for": "moduleA"
    }
}


def proc_mod(module):
    module.functionA()
    module.functionB()
    module.functionC()


@flow
def send_to_processing():
    for parser_name in modules_json.keys():
        # module being a class creator of a module class from a factory
        module = ParserFactory.get_parsers(parser_name)
        if module:
            if "wait_for" in modules_json[parser_name]:
                mod = module(parser_name)
                module_to_wait = modules_json[parser_name]["wait_for"]
                future = Task(proc_mod).submit(mod, wait_for=futures[module_to_wait])
            else:
                mod = module(parser_name)
                future = Task(proc_mod).submit(mod)
            futures[parser_name] = future
        else:
            pass

send_to_processing()

modules_json is hard-written for testing purposes, but in production it will be input that might change at each run.

To get more granulatiry, I would like functionA, functionB and functionC to be tasks. But that would involve tranforming proc_mod into a flow which will make me lose :

  • The easy concurrency between the modules’ execution (easy thanks to ConcurrentTaskRunner), since I’ll have to go with AnyIO
  • The ability to easily wait for the end of module before starting another one (wait_for from task)

So I was wondering if :

  • Will concurrency on subflows be implemented ? A statement, back in Jan 2022, said that it is likely to be but I wonder if it is still the case or not
  • Is there anyway to link two subflows together in the same way tasks can do (wait_for), or is this going to be implemented ?

Thanks for reading and your answers