View in #prefect-community on Slack
@richard-social_812: Hello,
Is this the correct way to ensure a task only runs once a week, in a flow scheduled to run daily?
@task(result=LocalResult(dir=f'{os.getcwd()}/storage/pre_delinquency_models',
serializer=H2OModelSerializer()),
checkpoint=True, target='{date:%Y}-Week {date:%U}', log_stdout=True)
def train_new_model(data: pd.DataFrame):
I have it running on prefect cloud with a run_config that looks like below:
run_config = LocalRun(env={'PREFECT__FLOWS__CHECKPOINTING':os.environ.get('PREFECT__FLOWS__CHECKPOINTING', 'true')})
However, looking at the results folder I see that the task result file is created a new with each daily run . What am I missing?
@emre: Hi richard,
I just made a similar setup as follows, and it works with both prefect core runs and server. Can’t test with cloud atm.
@task(log_stdout=True)
def out_sumthin():
x = random.randint(0, 20)
print(x)
return x
@task(
result=LocalResult(dir=f"{os.getcwd()}/storage/pre_delinquency_models"),
checkpoint=True,
target="{date:%Y}-Week {date:%U}",
log_stdout=True,
)
def train_new_model(x):
return 5
with Flow("Checkpoint") as f:
out = out_sumthin()
train_new_model(out)
f.run_config = LocalRun(
env={
"PREFECT__FLOWS__CHECKPOINTING": os.environ.get(
"PREFECT__FLOWS__CHECKPOINTING", "true"
)
}
)
f.run()
So I couldn’t find the error in your setup. Could you post the debug logs of your flow run? You could grab those by running on an agent with the following settings:
prefect agent local start --show-flow-logs --log-level DEBUG -e PREFECT__LOGGING__LEVEL=DEBUG
Finally, you could try using cache_for
and cache_validator
kwargs for task definition, those might end up working.
@Anna_Geller: The easiest way to tell Prefect to run this task only once per week would be to leverage caching rather than targets. Targets are file-based persistence and they entail more risk in your use case - e.g. someone could (accidentally) delete the file, resulting in your task being recomputed again.
With cache_for
, Prefect will store that information in the backend and will prevent this task from rerunning again after it ran successfully for the given time duration.
@task(cache_for=datetime.timedelta(days=7))