Set upstream to shell task

Hi all,
migrating from Prefect 0.15 to Prefect 2.0 I runned into huge amount of improvements but right now I am bit stuck with ShellTask.

My code was like:

import prefect
from prefect import Flow, Client, task
from prefect.tasks.shell import ShellTask

move_data_shell_tsk = ShellTask(name="Move data", command="sudo mv src dst")
start_db_shell_tsk = ShellTask(name="Start db", command="make start-db")

with Flow() as flow:
    move_data_tsk = move_data_shell_tsk()
    start_db_tsk = start_db_shell_tsk()
    start_db_tsk.set_upstream(move_data_tsk)

I was used to use .set_upstream() to control the order of tasks.

Now I use shell_run_command():

def mv_data(src, dst):
    return shell_run_command(command=f'sudo mv src dst')

def start_db():
    return shell_run_command(command='make start-db')

@flow
def flow1():
    mv_data_tsk = mv_data(src, dst)
    start_db_tsk = start_db()

I checked with_option() method but it does not seem possible to set upstream of one task imperatively. Do I have to use sub-flow for each shell task to control the order or execution or is it possible on task level within one flow.

Thanks for your help.

Does the kwarg, wait_for work?
e.g.
start_db_tsk = start_db(wait_for=[mv_data_tsk])

Also, I think if you do not use the submit method, e.g. mv_data.submit(src, dst), they will run sequentially, so wait_for might be unnecessary.

1 Like

Yes, you are right. Without using submit method they run sequentially. Also I found there is SequentialTaskRunner which forces tasks to run sequentially even with submit() method. Thanks a lot!

1 Like