Can I define my tasks as classes rather than functions?

Prefect 2.0

While you can build your task logic within a class, we recommend defining tasks as functions. Since the flow’s computational graph is discovered at runtime, there is no need for a rigid class-based task definition.

Classes are stateful, while tasks are executed in a stateless manner to support concurrency and distributed parallelism. Additionally, splitting the orchestration framework into two APIs confuses many users.

If you need to define some logic in classes, you can still either:

Prefect 1.0

Prefect 1.0 is primarily based on a functional API with a task decorator, but it additionally allows the definition of tasks as classes with a run() method.

Example flow using the imperative API:

from prefect import task, Task, Flow

@task  # functional
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

class AddTask(Task):  # imperative
    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default

    def run(self, x: int, y: int = None) -> int:
        if y is None:
            y = self.default
        return x + y

# initialize the task instance
add = AddTask(default=1)  # imperative

with Flow("My first flow!") as flow:  # functional
    first_result = add(1, y=2)
    second_result = add(x=first_result, y=100)

I came to discourse to ask this question from scratch, but I think you’ve answered most of it here.

We have an internal library where we build task classes that inherit from Task(). Will we need to rebuild these (as functions) to work correctly with Prefect 2.0?

I actually prefer using functions – so if we’ll need to change them I’d like to adjust our practice now rather than later (still assuming we may still need to make some small adjustments in the future as well).

Thanks!

1 Like

There will be some work involved to make those tasks written in imperative API work because even though a Task class exists, the logic is different due to changes in the backend.

I will ask the team what is the easiest approach here and will get back to you. It could involve writing some wrapper base class called Task that will under the hood create a Prefect 2.0 task and changing imports to use that class instead of Prefect 1.0 Task class.

1 Like

Thanks.

One of the reasons that we went the route we did is because it made it easier to assign tags and other arguments that typically would go into the @task() decorator when using the imperative api to initialize the tasks (that said, we build flow logic using the functional api). Our assumption is that it would make it so that we could use our established logic without having to add extra boilerplate-like code when we wanted to, say, assign a different tag or retry value.

# import function containing no prefect dependencies
from some_library import oracle_query

@task(tags=['db-source'], max_retries=2)
def oracle_query_1(sql, user, password, dsn):
    return oracle_query(sql, user, password, dsn)

@task(tags=['db-target'], max_retries=3)
def oracle_query_2(sql, user, password, dsn):
    return oracle_query(sql, user, password, dsn)

with Flow('name') as flow:
    result1 = oracle_query_1(sql, user, password, dsn1)
    result2 = oracle_query_2(sql, user, password, dsn2)
    ...

vs

# import task class that inherits from Task()
from some_library import OracleQuery

oracle_query_1 = OracleQuery(tags=['db-source'], max_retries=2)
oracle_query_2 = OracleQuery(tags=['db-target'], max_retries=3)

with Flow('name') as flow:
    result1 = oracle_query_1(sql, user, password, dsn1)
    result2 = oracle_query_2(sql, user, password, dsn2)
    ...

Is there anyway to add task initialization level parameters after a function/task is already created using the task() decorator?

My ideal would be to import a function that was built using the task decorator, but then be able to adjust tags, etc on a per task basis if I were to use it more than once in a flow (similar to what I can do with task classes). Similar to how we are doing that with task classes. However, my assumption is that’s not possible, since the **kwargs argument in task classes is just used at the constructor level.

If that’s not easily possible, maybe it would be better to move to our own internal library being stand alone (since we do use it for non-prefect based work), and just always wrap our internal re-usable code in @task() decorated functions in our flow scripts. That scenario would also decouple our internal re-usable code further from Prefect library changes as well.

I’m also very open to the fact that perhaps there is a better approach to our general way or doing things to begin with.

1 Like

I think the problem/user story you are describing is not as much tied to imperative or functional API but rather: "As a user, I would like to set defaults, such as default max retries or default tags, that can be optionally overridden on a per-task basis". Is that true or did I misinterpret that?

In Prefect 1.0 in theory this is possible by setting:

prefect.config.tasks.defaults.max_retries=3

The same could be configured using environment variables or be set in config.toml:

export PREFECT__TASKS_DEFAULTS_MAX_RETRIES=3

I created issues for both:

Hello @anna_geller,

Do you have any news/recommendations about:

  • migrating custom tasks from the imperative API to Orion (for this I created an internal issue)

Yes, 100%! Our recommendation is to leverage Prefect Collections or Prefect Blocks or both!

Check out:

for building custom blocks: