I’d like to create a wrapper for tasks, which includes the functionality to for retry mechanisms on specific types of errors. What I have so far is this.
from prefect import flow, task import prefect from datetime import datetime @task(retries=3,retry_delay_seconds=5) def task_wrapper(fn,ignore_errors_of_type): try: call_api() except Exception as ex: if(type(ex).__name__ in ignore_errors_of_type): return prefect.orion.schemas.states.Failed() else: return prefect.orion.schemas.states.Cancelled() def call_api(): print(len(-1)) @flow def task_retry_bypass(): fact_json = task_wrapper(call_api,ignore_errors_of_type=["ZeroDivisionError"]) task_retry_bypass()
This works. However, I would like to create a new wrapper, using the @ definition. I expect the behaviour to be as follows:
@task_wrapper def call_api(some_args): """code goes here""" @flow def task_retry_bypass(): fact_json = call_api(some_args)
Issue being that I am unable to use arguments in the former case, as I’d like to in the latter. How would I go about implementing this? I have tried to, but am facing an error which states that tasks cannot be called outside of flows.