I have tasks D, E, F that can not be run in parallel, but I would like that either D, E, F runs as soon as it’s respective upstream conditions allow, but I can not allow them to run at the same time. Let’s say that this is the upstream dependence:
A upstream of D
A, B upstream of E, F.
One way I solved that without Prefect was to add a Singleton type lock file decorator such to each task needs to acquire a “shared” lock before it can proceed, otherwise it waits. I would like to abandon that type of solution using Prefect solution if possible because Prefect allows some other goodies like task timeout, notification etc, but I’m not sure how.
Another solution is to run the tasks serially. However, that approach always yields a situation where other tasks could have been done, but they are not because one upstream conditions is not set. I can not allow not running something if it is ready to be run.
It seems that what you need is to ensure sequential order of execution to ensure they don’t run in parallel, or another way of defining state dependencies, say run E and F after D is finished - all that can be accomplished with state dependencies.
Thank you Anna. Having E, F run after D does not always work. There could be instances where E, F could be run before D and I can not allow E and F to run in parallel either. I could have made my example dependence a bit more clear.
Could an alternative be to put these tasks to run on a single process agent?
Thank you Anna. This is what I have done at the moment:
created decorator that I apply to tasks that need to get a lock
run those functions serially
This is suboptimal and I’d like to use your solution. But I can easily add @lock_exclusive prefect tasks. But I was hoping you could make that part of your solution. I provide links to stackoverflow that have the details. I can provide my class as well.
decorator library:
https://wiki.python.org/moin/PythonDecoratorLibrary
"""
lock = Lock() # simple class that I can provided if needed
def inner(*args, **kwargs):
lock.acquire() # creates temp file
try:
ret = orig(*args, **kwargs)
return ret
except Exception as err:
error_msg(err, "could not acquire lock.")
finally:
lock.release()
return inner