How to use the task library?

What is the task library?

The Prefect task library is a constantly growing list of pre-defined tasks that provide off-the-shelf functionality for working with a wide range of tools.

  • A majority of the task library is community supported.
  • Tasks marked with the Verified badge have been reviewed and approved by Prefect.
  • For a full list of tasks in the library and more information on how to use them visit the API reference documentation for the prefect.tasks module.

How to use the task library in a flow?

The tasks in the task library are built as Python class objects. Therefore, to use them, you need to perform two actions:

  • instantiate the task i.e. initialize the class with all __init__ class attributes,
  • call the instantiated task object in your flow - this calls the task’s .run() method.

There are two ways how you can instantiate and call tasks in your flow. Below you can find both ways on the example of ShellTask.

Instantiating the task at the module level

from prefect import task, Flow
from prefect.tasks.shell import ShellTask

ls_task = ShellTask(command="ls", return_all=True, stream_output=True)

@task
def show_output(std_out):
    print(std_out)

with Flow("list_files") as flow:
    ls = ls_task()
    show_output(ls)

Instantiating the task within the flow

Note how the first round brackets initialize the class with all __init__ class attributes, while another pair of brackets is used to call the .run() method of the task, thereby calling the task.

from prefect import task, Flow
from prefect.tasks.shell import ShellTask

@task
def show_output(std_out):
    print(std_out)

with Flow("list_files") as flow:
    ls = ShellTask(command="ls", return_all=True, stream_output=True)()
    show_output(ls)
1 Like

In Orion (Prefect 2.0), tasks are now in Prefect Collections.

Here’s an example usage for prefect-shell:

from prefect import flow
from prefect_shell import shell_run_command

@flow
def example_shell_run_command_flow():
    return shell_run_command(command="ls .", return_all=True)

example_shell_run_command_flow()

For all collections available, check out the Collections Catalog.

1 Like

Hi Anna,
What are the main differences of using a prefect provided task from a task library vs a Non-prefect Python package that would have similar functions or other functions that are needed for my flow?

the difference would be: you are using a different library/modules :smiley: it’s all just Python code in the end

Oh ok. So when Airflow says they have a lot more integrations/providers/modules , what is the advantage developers are getting compared to using different library /modules? Is it just that they don’t have to find the relevant modules themselves and import it? Or does it also have anything to be able to manage /maintain these library usage (like versions etc.,( which are automatically handled if using providers/task libraries)?

1 Like

they need to have way more integrations due to Airflow’s limitations of not being able to pass data between tasks other than XCOMs

to explain it: if you want to move data between arbitrary systems in every direction e.g. S3, Postgres and Redshift, in Prefect you need only 3 tasks for extract and 3 tasks for load and then you can combine those by passing data between them. In contrast in Airflow, you would need a separate operator for every combination and each * 2 to make it work in reverse order

Got it.
Also, we have a use case where a task needs to start only after an external approval happens in another system. I have seen your examples of RETRY where you poll until your desired state is ready and then proceed with processing. Is there an alternative way of registering a callback with the external system where they invoke the task when they are ready?

I think the easiest would be to trigger this as a flow run via an API call - this topic provides many actionable examples