Anytime a non-Task input is provided to a task, Prefect automatically converts it to a special task representing the collection type e.g. List
, Tuple
, Dict
. If you want to avoid it, you could wrap it into another task and pass it as data dependency.
This flow visualizes the problem:
import random
from prefect import Flow, task
@task
def a_number():
return random.randint(0, 100)
@task
def get_sum(x):
return sum(x)
with Flow("Using Collections") as flow:
a = a_number()
b = a_number()
s = get_sum([a, b])
if __name__ == "__main__":
flow.visualize()
And here is how you can solve it by using an intermittent task - in this example, we use a task called merge_into_list
:
import random
from prefect import Flow, task
@task
def a_number():
return random.randint(0, 100)
@task
def merge_into_list(x, y):
return [x, y]
@task
def get_sum(x):
return sum(x)
with Flow("Using Collections") as flow:
a = a_number()
b = a_number()
a_list = merge_into_list(a, b)
s = get_sum(a_list)
if __name__ == "__main__":
flow.visualize()