Prefect 2.9.0 is here with Artifacts, configurable result storage keys, and runtime enhancements 🎉

Track and manage artifacts :woman_technologist:

Most workflows produce or update an artifact of some kind, whether it’s a table, a file, or a model. With Prefect Artifacts, you can track changes to these outputs and richly display them in the UI as tables, markdown, and links.

Artifacts may be associated with a particular task run, flow run, or even exist outside a flow run context, enabling you to not only observe your flows, but the objects that they interact with as well:

A variety of artifact types are available. To create an artifact that produces a table, for example, you can use the create_table_artifact() function.

from prefect import task, flow
from prefect.artifacts import create_table_artifact

@task
def my_table_task():
    table_data = [
        {"id": 0, "name": "Dublin", "lat": 53.3498, "lon": -6.2603,},
        {"id": 1, "name": "London", "lat": 51.5074, "lon": -0.1278,},
        {"id": 2, "name": "New York", "lat": 40.7128, "lon": -74.0060,},
        {"id": 3, "name": "Oslo", "lat": 59.9139, "lon": 10.7522,},
        {"id": 4, "name": "Paris", "lat": 48.8566, "lon": 2.3522,},
        {"id": 5, "name": "Rome", "lat": 41.9028, "lon": 12.4964,},
        {"id": 6, "name": "Tokyo", "lat": 35.6895, "lon": 139.6917,},
        {"id": 7, "name": "Vancouver", "lat": 49.2827, "lon": -123.1207,}
    ]

    return create_table_artifact(
        key="cities-table",
        table=table_data,
        description="A table of cities and their coordinates",
    )

@flow
def my_flow():
    table = my_table_task()
    return table

if __name__ == "__main__":
    my_flow()

You can view your artifacts in the Artifacts page of the Prefect UI, easily search the data in your new table artifact, and toggle between a rendered and raw version of your data.

See the documentation for more information, as well as the following pull requests for implementation details:

Configure result storage keys

When persisting results, Prefect stores data at a unique, randomly-generated path. While this is convenient for ensuring the result is never overwritten, it limits organization of result files. In this release, we’ve added configuration of result storage keys, which gives you control over the result file path. Result storage keys can be dynamically formatted with access to all of the modules in prefect.runtime and the run’s parameters.

For example, you can name each result to correspond to the flow run that produced it and a parameter it received:

from prefect import flow, task

@flow()
def my_flow():
    hello_world()
    hello_world(name="foo")
    hello_world(name="bar")

@task(
    persist_result=True,
    result_storage_key="hello__{flow_run.name}__{parameters[name]}.json",
)
def hello_world(name: str = "world"):
    return f"hello {name}"

my_flow()

Which will persist three result files in the storage directory:

$ ls ~/.prefect/storage | grep "hello__" 
hello__rousing-mushroom__bar.json
hello__rousing-mushroom__foo.json
hello__rousing-mushroom__world.json

See the documentation for more information.

Expanded prefect.runtime

The prefect.runtime module is now the preferred way to access information about the current run. In this release, we’ve added the following attributes:

  • prefect.runtime.task_run.id
  • prefect.runtime.task_run.name
  • prefect.runtime.task_run.task_name
  • prefect.runtime.task_run.tags
  • prefect.runtime.task_run.parameters
  • prefect.runtime.flow_run.name
  • prefect.runtime.flow_run.flow_name
  • prefect.runtime.flow_run.parameters

See the documentation for more information, and the following pull requests for implementation details:

Contributors

We have several first-time contributors in this release. Let’s give them a round of applause! :clap:

  • @andreadistefano made their first contribution in #8942
  • @knl made their first contribution in #8974
  • @thomas-te made their first contribution in #8959

Here are a few key enhancements and fixes:

Enhancements

  • :rocket: Add unique integers to worker thread names for inspection - #8908
  • :clipboard: Add support to JSONSerializer for serialization of exceptions so they are persisted even on failure - #8922
  • :compression: Add Gzip middleware to the UI and API FastAPI apps for compressing responses - #8931 -:hourglass: Update the runtime to detect flow run information from task run contexts — #8951

Fixes

  • :wrench: Fix imports in copytree backport for Python 3.7 - #8925
  • :repeat: Retry on sqlite operational errors - #8950
  • :alarm_clock: Add 30 second timeout to shutdown of the log worker thread — #8983

See the release notes for full details on all the updates in Prefect 2.9.0!

2 Likes