How to set upstream task dependencies between a ShellTask and downstream mapped tasks?

View in #prefect-community on Slack

@Philip_MacMenamin: Strange behaviour with multiple upstream_tasks on a ShellTask
This works:

        brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
        brt_commands_logged = log(item=brt_commands, desc="BRT commands")
        brts = shell_task.map(
            command=brt_commands, upstream_tasks=[tomogram_fps]
        )

This fails:

        brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
        brt_commands_logged = log(item=brt_commands, desc="BRT commands")
        brts = shell_task.map(
            command=brt_commands, upstream_tasks=[tomogram_fps, brt_commands_logged]
        )

Kyle_McChesney @Kyle_McChesney: Not 100% sure what the issue is, but the top example likely works if you add unmapped(tomogram_fps) to the upstream_task. Its kind of an interesting edge case between .map and upstream_tasks

@Nate: can you describe the failure or share logs?

@Philip_MacMenamin: Just to be clear - the top example appears to work already.

[2022-04-19 12:56:09-0600] INFO - prefect.TaskRunner | Task 'create_brt_command[0]': Starting task run...
[2022-04-19 12:56:09-0600] INFO - prefect.TaskRunner | Task 'create_brt_command[0]': Finished task run for task with final state: 'Success'
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Starting task run...
[2022-04-19 12:56:10-0600] INFO - prefect.log | BRT commands : ['/opt/rml/imod/bin/batchruntomo -di /gs1/home/macmenaminpe/tmp/tmpovrvi7nb/dirTemplate.adoc -cp 8 -gpu 1']
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Finished task run for task with final state: 'Success'
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Failed'
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Starting task run...
[2022-04-19 12:56:10-0600] INFO - prefect.log | updated Adoc files : [PosixPath('/gs1/home/macmenaminpe/tmp/tmpovrvi7nb/dirTemplate.adoc')]
[2022-04-19 12:56:10-0600] INFO - prefect.TaskRunner | Task 'log': Finished task run for task with final state: 'Success'
[2022-04-19 12:56:10-0600] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

@Kyle_McChesney - this idea of doing unmapped() stuff in the upstream_tasks is interesting. I think I have to get my head around that for a second.

Kyle_McChesney @Kyle_McChesney: @Kevin_Kho we discussed this at somepoint, do you happen to have the link around

@Philip_MacMenamin: ahh… ok, so the upstream_tasks should not be included within the map

@Nate: hold on, because if your first example works then I’m wrong, one sec

Kyle_McChesney @Kyle_McChesney: upstream_tasks=[unmnapped(tomogram_fps), unmapped(brt_commands_logged)]?
does that work? If I recall the issue was that an upstream task parameter needed to get passed to each mapped instance. By specifying unmapped, you ensure that the same one is used.

@Philip_MacMenamin: right - within the map? OK, hold on. I have to get into an annoying VPN and it kills Slack. BRB.

Kevin_Kho @Kevin_Kho: Kyle is right

mapped_task = task_one.map(.., upstream_tasks=[unmnapped(tomogram_fps), unmapped(brt_commands_logged)])

should work if tomogram_fps and brt_commands_logged are not mapped tasks
Making them mapped will map element 1 of the upstream list to element 1 of the downstream list as a dependency so you get weirdness is the lists are not the same length

@Philip_MacMenamin: can confirm. @Kyle_McChesney is correct. Nice