View in #prefect-community on Slack
@Emma_Rizzi: Hello! I have a flow that uses DateTimeParameters, I would like to schedule it with one date being default to the day of the schedule, or first day of the month for example, is there a way to do it through schedules or should I handle this in my code ?
@Stéphan_Taljaard: Hi Emma
I also have ETL flows, where I need to give a start and end date. These are the tasks I wrote to generate the dates from parameters when the task is run:
from datetime import datetime
from typing import Tuple
import pendulum
import prefect
from prefect import task
TIMEZONE = pendulum.timezone("Africa/Johannesburg")
@task
def generate_date(date_parameter, hours_delta=None) -> datetime: # pendulum.datetime.DateTime
"""Generate a date value from parameter inputs"""
if not hours_delta:
minutes_delta_log = ""
if date_parameter == "scheduled_start_time":
date_ = prefect.context.get("scheduled_start_time")
elif date_parameter == "yesterday":
date_ = pendulum.today(TIMEZONE).subtract(days=1)
elif date_parameter == "today":
date_ = pendulum.today(TIMEZONE)
elif date_parameter == "now":
date_ = pendulum.now(TIMEZONE)
else:
date_ = pendulum.parse(date_parameter, tz=TIMEZONE)
else:
if not isinstance(date_parameter, datetime):
raise TypeError("date_parameter should be a DateTime object")
minutes_delta = round(abs(float(hours_delta)) * 60)
minutes_delta_log = f" - {minutes_delta} minutes"
date_ = pendulum.instance(date_parameter).subtract(minutes=minutes_delta)
date_ = date_.in_timezone(TIMEZONE)
prefect.context.get("logger").info(f"Generated date for `{date_parameter}`{minutes_delta_log}: {date_}")
return date_
@task
def check_if_start_hours_delta_given(start_hours_delta_parameter):
return bool(start_hours_delta_parameter)
@task
def generate_start_and_end_dates(
start_date, start_date_hours_delta, end_date
) -> Tuple[pendulum.DateTime, pendulum.DateTime]:
"""
Generates a start and end date that can be used in queries.
The start date can be a fixed value, or amount of hours to offset the end date with.
If `hours` is given, it will be used instead of a fixed start date.
"""
date_end = generate_date.run(end_date)
use_start_hours_delta = check_if_start_hours_delta_given.run(start_date_hours_delta)
if use_start_hours_delta:
prefect.context.get("logger").info("Using hours delta")
date_start = generate_date.run(date_end, start_date_hours_delta)
else:
date_start = generate_date.run(start_date)
return date_start, date_end
I then use generate_start_and_end_dates
in all my flows
@Anna_Geller: While it’s possible to retrieve the date of your schedule using the context:
prefect.context.get("scheduled_start_time")
You need to be careful to call it only within a task since the context information is populated at runtime, while the Flow gets built at registration time. That’s why passing it as the default parameter value may not work as you intend it and you may need to pass it as data dependency as here:
import prefect
from prefect import Flow, task
from prefect.core.parameter import DateTimeParameter
@task
def get_scheduled_start_time():
return prefect.context.get("scheduled_start_time")
@task(log_stdout=True)
def print_val(x):
print(x)
@task(log_stdout=True)
def determine_parameter_to_use(sched_start_time, dt_param_val):
if dt_param_val:
print(dt_param_value)
else:
print(sched_start_time)
with Flow("parametrized_dt_flow") as flow:
sched_start_time = get_scheduled_start_time()
print_val(sched_start_time)
dt_param_value = DateTimeParameter("some_dt", required=False)
determine_parameter_to_use(sched_start_time, dt_param_value)
if __name__ == "__main__":
flow.run()
@Emma_Rizzi: @Stéphan_Taljaard @Anna_Geller thank you both for your inputs ! looks like exactly what I’m trying to do
I didnt know about the pendulum library also, it’s very interesting thanks for sharing!