Wrappers for tasks

I’d like to create a wrapper for tasks, which includes the functionality to for retry mechanisms on specific types of errors. What I have so far is this.

from prefect import flow, task
import prefect
from datetime import datetime

@task(retries=3,retry_delay_seconds=5)
def task_wrapper(fn,ignore_errors_of_type):
    try:
        call_api()
    except Exception as ex:
        if(type(ex).__name__ in ignore_errors_of_type):
            return prefect.orion.schemas.states.Failed()
        else:
            return prefect.orion.schemas.states.Cancelled()


def call_api():
    print(len(-1))

@flow
def task_retry_bypass():
    fact_json = task_wrapper(call_api,ignore_errors_of_type=["ZeroDivisionError"])

task_retry_bypass()

This works. However, I would like to create a new wrapper, using the @ definition. I expect the behaviour to be as follows:

@task_wrapper
def call_api(some_args):
    """code goes here"""

@flow
def task_retry_bypass():
    fact_json = call_api(some_args)

Issue being that I am unable to use arguments in the former case, as I’d like to in the latter. How would I go about implementing this? I have tried to, but am facing an error which states that tasks cannot be called outside of flows.

Hi Krishna,

I will show Prefect 1 code, but I think it should be very similar. You can check this:

1 Like