Parameter defaults are supposed to be used as static default values that can be optionally overridden at runtime. Therefore, dynamic date intervals should be computed from separate tasks rather than being used through a Parameter.
This is how you should NOT do that:
with Flow("backfilling_flow") as flow:
start_date = Parameter('start_date', default='2022-02-01')
end_date = Parameter('end_date', default='2022-02-02')
fetch_data(start_date=start_date, end_date=end_date)
This is what you can do instead:
import pendulum
from prefect import task, Flow, Parameter
@task
def get_start_date(param_start_date: str) -> str:
if param_start_date is None:
param_start_date = pendulum.yesterday(tz="America/New_York").isoformat()
return param_start_date
@task
def get_end_date(param_end_date: str) -> str:
if param_end_date is None:
param_end_date = pendulum.today(tz="America/New_York").isoformat()
return param_end_date
@task(log_stdout=True)
def extract_data(start_date: str, end_date: str):
# your extract logic based on those dates
print(f"Backloading data for the interval {start_date} - {end_date}")
with Flow("backfilling_flow") as flow:
custom_start_date = Parameter("start_date", default=None)
custom_end_date = Parameter("end_date", default=None)
start_date = get_start_date(custom_start_date)
end_date = get_end_date(custom_end_date)
extract_data(start_date=start_date, end_date=end_date)
And to backfill specific dates, you can use
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
with Flow("trigger") as flow:
backfill_intervals = [
dict(
start_date="2022-02-11T00:00:00-05:00", end_date="2022-02-12T00:00:00-05:00"
),
dict(
start_date="2022-02-12T00:00:00-05:00", end_date="2022-02-13T00:00:00-05:00"
),
dict(
start_date="2022-02-13T00:00:00-05:00", end_date="2022-02-14T00:00:00-05:00"
),
dict(
start_date="2022-02-14T00:00:00-05:00", end_date="2022-02-15T00:00:00-05:00"
),
dict(
start_date="2022-02-15T00:00:00-05:00", end_date="2022-02-16T00:00:00-05:00"
),
dict(
start_date="2022-02-16T00:00:00-05:00", end_date="2022-02-17T00:00:00-05:00"
),
]
create_flow_run.map(
flow_name=unmapped("backfilling_flow"),
project_name=unmapped("p"),
parameters=backfill_intervals,
)
Additional resources
- An even better approach than using Parameters is to leverage the KV store for that purpose, as shown on this documentation page.
-
This tutorial provides more information about the
Parameter
task. - StackOverflow question about that answered by Chris White:
etl - Is there a way to backfill historical data (once) for a new Flow in Prefect? - Stack Overflow