Best way to create a DAG with tasks that should occur repeatedly at different time intervals

I would like to create program which does the following:
At the frequency of once a day, generate a list of 100 or so random usernames
After the above task is complete, at the frequency of once every hour, generate a random score and assign it to each of the users total score (by summation to previous total).

How would I go about constructing a DAG for this? Will I necessarily need 2 flows, or sub flows?
Essentially, is there a way to create 2 flows, where one is the parent and another is a child, and have them both run on different schedules, i.e the child flow getting executed more frequently?

1 Like

Hi @krishna_somasundaram, welcome to Discourse!

How would you approach this in Python without Prefect? in Prefect 2.0, DAG is no longer a requirement, you can build your logic in Python and use Prefect only to:

  • operationalize
  • schedule
  • and add observability

to your flows

Are you new to Prefect? Are you asking this question for Prefect 2.0, or are you already a long-term Prefect 1.0 user? I’d need to know since the answer will be different for 2.0 and 1.0

I imagine in python without prefect, I would schedule these tasks as individual scripts running at repeated intervals. The main flow, by my understanding, would write to a database and the sub-flow which is scheduled to happen more frequently would read from this database. I am a beginner with dataflow pipeline tools such as prefect or airflow, and would like to proceed with prefect2.0.

1 Like

great choice!

This seems totally doable. Not sure whether this is the right solution since you didn’t explain the business problem here, but definitely something you can accomplish with Prefect 2.0:

Perhaps I can rephrase my problem. In prefect 1.0, what I would have done in this case would be to have a single flow, with let us say a starter task which happens once a day, and once this task has triggered there would be sub tasks which trigger every 15 minutes. I would set the schedule of this flow to happen once in every 15 minutes, with the starter task having a cache duration of 1 day, hence it is only executed once a day. The positive about this is that the data from the cache persists across multiple flow runs. This does not seem to the case with prefect 2.0, as whenever I schedule a flow, each instance seems to be independent of the previous one, and tasks do not carry over their caches information across flow runs. Should I go about solving this problem in a different way?

Thanks for this explanation, I still struggle to understand what you’re trying to do - could you explain it purely from a business perspective? forget about Prefect, Python and scheduling

This is just from a sample project, but let me give an example maybe. I have 3 pipelines.
i) Sync pipeline - Syncs all user login data from a source
ii) Aggregate pipeline - At regular intervals throughtout the day, we accumulate which users spent time using the app and how much time they spent on it.
iii) Cache pipeline - At the end of the day, we’d like to store this information into a database.

I am just a beginner too, so I apologize if my explanations are vague. How would I go writing a prefect program to do the above?

Thanks, this is helpful. You can do:

this can run even every 1 min and would persist data somewhere - DB, S3, etc

This can run e.g. every 1h and would read from that source and store aggregated data again in some DB e.g. different DB schema

this could run daily and read data from that aggregated schema

So you can have 3 scheduled flows, running completely independent of each other and at different schedules

Can you make it such that the sync pipeline starts the next day only if the cache pipeline was successful? Can you create dependencies across flows in prefect2.0?

You can have dependencies between subflows or you can trigger co-dependent flows using the orchestrator pattern: