Singleton task dependence

I have tasks D, E, F that can not be run in parallel, but I would like that either D, E, F runs as soon as it’s respective upstream conditions allow, but I can not allow them to run at the same time. Let’s say that this is the upstream dependence:
A upstream of D
A, B upstream of E, F.

One way I solved that without Prefect was to add a Singleton type lock file decorator such to each task needs to acquire a “shared” lock before it can proceed, otherwise it waits. I would like to abandon that type of solution using Prefect solution if possible because Prefect allows some other goodies like task timeout, notification etc, but I’m not sure how.

Another solution is to run the tasks serially. However, that approach always yields a situation where other tasks could have been done, but they are not because one upstream conditions is not set. I can not allow not running something if it is ready to be run.

It seems that what you need is to ensure sequential order of execution to ensure they don’t run in parallel, or another way of defining state dependencies, say run E and F after D is finished - all that can be accomplished with state dependencies.

This topic answers your question:

Thank you Anna. Having E, F run after D does not always work. There could be instances where E, F could be run before D and I can not allow E and F to run in parallel either. I could have made my example dependence a bit more clear.

Could an alternative be to put these tasks to run on a single process agent?

Agents are responsible for polling for and deploying flow runs, they are not used to set dependencies

How would you design such a workflow in Python (without Prefect) to ensure it runs in the right order?

Thank you Anna. This is what I have done at the moment:

  1. created decorator that I apply to tasks that need to get a lock
  2. run those functions serially
    This is suboptimal and I’d like to use your solution. But I can easily add @lock_exclusive prefect tasks. But I was hoping you could make that part of your solution. I provide links to stackoverflow that have the details. I can provide my class as well.

running example

while True: # wait for delivery
task1()

while True: # wait for delivery
task2()

creating tasks

@lock_exclusive
def task1():

@lock_exclusive
def task2():

where

def lock_exclusive(orig):
“”"
Decorator to allow one process to execute at a time and let other ones wait
Mutex Decorator
locking - mutex decorator in python - Stack Overflow

decorator library:
https://wiki.python.org/moin/PythonDecoratorLibrary

"""
lock = Lock()  # simple class that I can provided if needed

def inner(*args, **kwargs):
    lock.acquire() # creates temp file
    try:
        ret = orig(*args, **kwargs)
        return ret
    except Exception as err:
        error_msg(err, "could not acquire lock.")
    finally:
        lock.release()

return inner

Since you mentioned running a while loop, perhaps this is what you need?

and if you are looking for event-driven processing, you may look at the pattern shown here: