I’d like to create a wrapper for tasks, which includes the functionality to for retry mechanisms on specific types of errors. What I have so far is this.
from prefect import flow, task
import prefect
from datetime import datetime
@task(retries=3,retry_delay_seconds=5)
def task_wrapper(fn,ignore_errors_of_type):
try:
call_api()
except Exception as ex:
if(type(ex).__name__ in ignore_errors_of_type):
return prefect.orion.schemas.states.Failed()
else:
return prefect.orion.schemas.states.Cancelled()
def call_api():
print(len(-1))
@flow
def task_retry_bypass():
fact_json = task_wrapper(call_api,ignore_errors_of_type=["ZeroDivisionError"])
task_retry_bypass()
This works. However, I would like to create a new wrapper, using the @ definition. I expect the behaviour to be as follows:
@task_wrapper
def call_api(some_args):
"""code goes here"""
@flow
def task_retry_bypass():
fact_json = call_api(some_args)
Issue being that I am unable to use arguments in the former case, as I’d like to in the latter. How would I go about implementing this? I have tried to, but am facing an error which states that tasks cannot be called outside of flows.