Example code:
from prefect import task, flow
class ResultObject():
def __init__(self, x):
self.x = x
@task
async def test_task():
return ResultObject(25)
@task
async def test_task_downstream(x):
return x + 5
@flow
async def test_flow():
obj = await (await test_task()).result()
return await test_task_downstream(obj.x)
await test_flow()
Is there a way to simplify the double await
in one line?
1 Like
If the obj isn’t a class, it’s much cleaner, but curious if there’s still a way to make the above cleaner.
from prefect import task, flow
@task
async def test_task():
return 25
@task
async def test_task_downstream(x):
return x + 5
@flow
async def test_flow():
obj = await test_task()
return await test_task_downstream(obj)
await test_flow()
This is another way; calling x in downstream task, rather than the flow.
from prefect import task, flow
class ResultObject():
def __init__(self, x):
self.x = x
@task
async def test_task():
return ResultObject(25)
@task
async def test_task_downstream(obj):
return obj.x + 5
@flow
async def test_flow():
obj = await test_task()
return await test_task_downstream(obj)
Another solution is simply to save the intermediate result.
from prefect import task, flow
class ResultObject():
def __init__(self, x):
self.x = x
@task
async def test_task():
return ResultObject(25)
@task
async def test_task_downstream(x):
return x + 5
@flow
async def test_flow():
task_future = await test_task()
result_obj = await task_future.result()
return await test_task_downstream(result_obj.x)
1 Like
Given that this class is not a real task - it doesn’t perform any action, I would personally mark this one as a Solution.
But given that your original question was using a class, I can understand why you picked the other one @ahuang11.
1 Like
There was apparently a bug; in a future release, the flow doesn’t have to be marked as async and should still work with much less messy syntax:
from prefect import task, flow
class ResultObject():
def __init__(self, x):
self.x = x
@task
async def test_task():
return ResultObject(25)
@task
async def test_task_downstream(x):
return x + 5
@flow
def test_flow():
task_result = test_task().result().x
return test_task_downstream(task_result)
1 Like
That’s so cool, way cleaner. Thanks so much for sharing!