Is there a cleaner way to pass async task results downstream?

Example code:

from prefect import task, flow

class ResultObject():

    def __init__(self, x):
        self.x = x


@task
async def test_task():
    return ResultObject(25)


@task
async def test_task_downstream(x):
    return x + 5


@flow
async def test_flow():
    obj = await (await test_task()).result()
    return await test_task_downstream(obj.x)


await test_flow()

Is there a way to simplify the double await in one line?

1 Like

If the obj isn’t a class, it’s much cleaner, but curious if there’s still a way to make the above cleaner.

from prefect import task, flow

@task
async def test_task():
    return 25


@task
async def test_task_downstream(x):
    return x + 5


@flow
async def test_flow():
    obj = await test_task()
    return await test_task_downstream(obj)


await test_flow()

This is another way; calling x in downstream task, rather than the flow.

from prefect import task, flow

class ResultObject():

    def __init__(self, x):
        self.x = x


@task
async def test_task():
    return ResultObject(25)


@task
async def test_task_downstream(obj):
    return obj.x + 5


@flow
async def test_flow():
    obj = await test_task()
    return await test_task_downstream(obj)

Another solution is simply to save the intermediate result.

from prefect import task, flow

class ResultObject():

    def __init__(self, x):
        self.x = x


@task
async def test_task():
    return ResultObject(25)


@task
async def test_task_downstream(x):
    return x + 5


@flow
async def test_flow():
    task_future = await test_task()
    result_obj = await task_future.result()
    return await test_task_downstream(result_obj.x)
1 Like

Given that this class is not a real task - it doesn’t perform any action, I would personally mark this one as a Solution.

But given that your original question was using a class, I can understand why you picked the other one @ahuang11.

1 Like

There was apparently a bug; in a future release, the flow doesn’t have to be marked as async and should still work with much less messy syntax:

from prefect import task, flow

class ResultObject():

    def __init__(self, x):
        self.x = x


@task
async def test_task():
    return ResultObject(25)


@task
async def test_task_downstream(x):
    return x + 5


@flow
def test_flow():
    task_result = test_task().result().x
    return test_task_downstream(task_result)
1 Like

That’s so cool, way cleaner. Thanks so much for sharing! :heart: