One Flow with Multiple Parameter Sets and Schedules

View in #prefect-community on Slack

Ethan_Veres @Ethan_Veres: Hi all! My company is just starting to use Prefect as our orchestration tool for our data pipeline for end customers and I have a few questions on how best to set it all up.

Is there a way to reuse a single flow for every company using our product dynamically? We want to use Prefect to orchestrate each company’s data/ETL pipeline and we have the configuration for the pipeline (which system they’re integrating with etc…) stored in our database. The ideal is to dynamically create flows per company on their own schedule. Is that possible? Am I thinking about this the correct way?

Kevin_Kho @Kevin_Kho: Hey @Ethan_Veres, we definitely have some people doing it. There are a few ways to achieve so let me walk you through was DOES NOT work first.

Some people try to have one flow, and then add clocks like this to modify the default parameters and create a new clock to the schedule. This is fine for a small number of parameter sets, but it doesn’t scale when you have more than 10 companies you have to create flows from. For any given flow, the Prefect scheduler only schedules the next 10 flow runs. This job runs on a 90 seconds interval. So if you have 15 jobs schedules for 8 AM with different companies, 10 will be schedules and 5 will disappear. This can be fixed by doing something like 5 flow runs at 8 AM and 5 flow runs at 8:10 AM and 5 flow runs at 8:20 AM but I think at this point, you may as well look into other options.

So the end goal is to create one Flow and register it for each company with different parameter values. For this you have a function to register flows:

def registration_func(company):
    with Flow(..) as flow:
        company = Parameter("company", company)
        ...
    flow.schedule = ...
    flow.register()

and then you call this whenever you want to register a new function. And then if you use the default pickle-based storage that serializes the flow, this whole thing gets serialized and a copy will be uploaded (to S3 or GCS for example). This makes that file independent now of changes for any other company.

And then if the Flow inside the registration_func changes, just re-register all of them for it to take effect with some kind of loop

Schedules | Prefect Docs

Ethan_Veres @Ethan_Veres: Thanks for the detailed answer @Kevin_Kho!
To spell it out even further, once I wrap the flow in its own function and make that callable, how do I deploy it?

Kevin_Kho @Kevin_Kho: It’s a bit open-ended depending on your CI/CD process, but as long have you have a script that imports this and calls it with the appropriate parameters you want to pass, then it will work. For example, I could see something like you have this registration script and call it like:

python registration_flow.py --arg1 company1 --arg2 parameter2

I don’t know the exact syntax but think you can accept arguments to this script through the command line. You can even go so far to build your own CLI, and then now you can call this from CI/CD right? I think there are a lot of possibilities and this is just one.

Ethan_Veres @Ethan_Veres: I like that. This seems really straightforward. Thank you and appreciate your time!

Kevin_Kho @Kevin_Kho: Of course!

Donnchadh_McAuliffe @Donnchadh_McAuliffe: @Kevin_Kho

“For any given flow, the Prefect scheduler only schedules the next 10 flow runs. This job runs on a 90 seconds interval. So if you have 15 jobs schedules for 8 AM with different companies, 10 will be schedules and 5 will disappear.”

Can you deploy a flow and then send multiple api requests to schedule flow runs (of that flow) at the same time. For example, can you send 20 api requests to schedule a flow run for 10am? If so, will they all execute or will 10 be missed?

Kevin_Kho @Kevin_Kho: You can do it with create_flow_run because those are not runs created by the scheduler service. Those are runs created to “run now”. This is all for 1.0 though, not Orion

1 Like