Wrappers for tasks

I’d like to create a wrapper for tasks, which includes the functionality to for retry mechanisms on specific types of errors. What I have so far is this.

from prefect import flow, task
import prefect
from datetime import datetime

def task_wrapper(fn,ignore_errors_of_type):
    except Exception as ex:
        if(type(ex).__name__ in ignore_errors_of_type):
            return prefect.orion.schemas.states.Failed()
            return prefect.orion.schemas.states.Cancelled()

def call_api():

def task_retry_bypass():
    fact_json = task_wrapper(call_api,ignore_errors_of_type=["ZeroDivisionError"])


This works. However, I would like to create a new wrapper, using the @ definition. I expect the behaviour to be as follows:

def call_api(some_args):
    """code goes here"""

def task_retry_bypass():
    fact_json = call_api(some_args)

Issue being that I am unable to use arguments in the former case, as I’d like to in the latter. How would I go about implementing this? I have tried to, but am facing an error which states that tasks cannot be called outside of flows.

Hi Krishna,

I will show Prefect 1 code, but I think it should be very similar. You can check this:

1 Like