Out-of-memory (OOM) error after all tasks completed

Hi everyone,

I’m running a flow which triggers about 50 subflows by looping over a date range that again consist of tasks reading data from a database, performing transformations and loading them back into another database.

While monitoring the flow execution of the agent, I noticed that the all subflow and their tasks run flawlessly an finish completed.

However, as soon as everything is done (except for the parent flow) the memory usage increases dramatically and fast up until I finally run out of memory.

The only message that is displayed in the agent says: __prefect.infrastructure.process - Process exited with Status Code: -9 __

Now, I’m wondering what kind of (clean up) actions are taking place before the parent flow is completes that can cause this error.

Many thanks for your help. I’m looking forward to get some advice.

Hi @Butcher, could you provide more info about the infrastructure you were using and how exactly did you conduct those benchmarks?

I suspect that by the end of a run, some results are written to the local storage which may cause more work on your infrastructure end, but it shouldn’t be that heavy

Hey @anna_geller

I’m using a relatively small Ubuntu 20.04 LTS OS in a virtualized environment with 8 cores and 16GB of RAM.

Unfortunately, prefect-memory-profiling is not yet available for version 2.0 to get better insights, so I only monitor it through the flow’s log and htop on the machien.

I also suspected that writing to the local storage causes the error. Is there a way to prove this? And get more details?

1 Like

You can configure those two variables:

PREFECT_HOME='~/.prefect' (from defaults)
PREFECT_LOCAL_STORAGE_PATH='${PREFECT_HOME}/storage' (from defaults)

but other than that, it’s not configurable yet, we are working on that

Could you perhaps share an MRE showing what your flows are doing? Perhaps checking that on the flow-level might be helpful. Do you get the same when you run this without Prefect (i.e., when you remove Prefect decorators)?

Hi Anna,

please see the MRE below.
I figured out that the issue occures from the read_data task in the subflow, when it’s assigned to df. For each task run, prefect will store the blob data in the storage path. At the end of the main flow, all of them are again packed into one big file to store the flow’s data. That’s when the OOM happens.

When I write a task that reads, transforms and writes within one function and call it in the subflow, ie not referencing to the data within the subflow, almost no data is written to the storage.

Am I misunsing the .result() method somehow?

import pandas as pd
from prefect import task, flow
from prefect.orion.schemas.states import Cancelled, Completed
from prefect.task_runners import SequentialTaskRunner


@task(name='Read Data')
def read_data(db_con, start_date, end_date):
    df = pd.read_sql(sql='SELECT * FROM TABLE_NAME WHERE DATE_COLUMN BETWEEN :START AND :END',
                     con=db_con,
                     params={'START': start_date, 'END': end_date})
    if df.empty:
        return Cancelled(message=f'No new records.')
    return df


@task(name='Transform Data')
def transform_data():
    """DO SOME PANDAS CLEANING AND TRANSFORMATIONS"""
    pass


@task(name='Write Data')
def write_data(db_con, df):
    df.to_sql(name='TABLE_NAME_2', con=db_con, if_exists='append')


@flow(name='Data Subflow', task_runner=SequentialTaskRunner())
def update_table(db_con, range_start, range_end):
    df = read_data(db_con=db_con, start_date=range_start, end_date=range_end, return_state=True)

    if df.is_cancelled():
        return Completed()

    df_transformed = transform_data(df.result())
    write_data(df=df_transformed)


@flow(name='Data Flow', task_runner=SequentialTaskRunner())
def main(start_date='2022-01-01', end_date='2022-09-01'):

    db_con = "some database connection"
    date_range = pd.date_range(start_date, end_date, freq='W-MON')

    for s, e in zip(date_range, date_range[1:]):
        update_table(db_con, range_start=s, range_end=e)


if __name__ == "__main__":
    main()
1 Like

Generally, you don’t need to worry about task runner and submitting tasks to it unless you need to – Prefect returns data by default rather than future or state (returned only when you do task.submit()).

I rewrote your MRE a bit by removing the task runner, .result() and moving the df.empty check to the flow - could you try it and see if this fixes the issue?

import pandas as pd
from prefect import task, flow
from prefect.orion.schemas.states import Cancelled, Completed
from prefect.task_runners import SequentialTaskRunner


@task(name="Read Data")
def read_data(db_con, start_date, end_date):
    df = pd.read_sql(
        sql="SELECT * FROM TABLE_NAME WHERE DATE_COLUMN BETWEEN :START AND :END",
        con=db_con,
        params={"START": start_date, "END": end_date},
    )
    return df


@task(name="Transform Data")
def transform_data():
    """DO SOME PANDAS CLEANING AND TRANSFORMATIONS"""
    pass


@task(name="Write Data")
def write_data(db_con, df):
    df.to_sql(name="TABLE_NAME_2", con=db_con, if_exists="append")


@flow(name="Data Subflow")
def update_table(db_con, range_start, range_end):
    df = read_data(
        db_con=db_con, start_date=range_start, end_date=range_end, return_state=True
    )
    if df.empty:
        return Completed(message=f"No new records.")

    df_transformed = transform_data(df)
    write_data(df=df_transformed)


@flow(name="Data Flow")
def main(start_date="2022-01-01", end_date="2022-09-01"):

    db_con = "some database connection"
    date_range = pd.date_range(start_date, end_date, freq="W-MON")

    for s, e in zip(date_range, date_range[1:]):
        update_table(db_con, range_start=s, range_end=e)


if __name__ == "__main__":
    main()
1 Like

Makes sense, thanks for clarification.

Unfortunately, this didn’t help to solve the issue.
I assume the following: In the states docu it’s mentioned that By default, running a task will return data. The same rule applies for a subflow. Hence, the main flow will persist all data from the subflows.

Doesn’t it make more sense to explicitly return a state at the end of each subflow (instead of the data)?

import pandas as pd
from prefect import task, flow
from prefect.orion.schemas.states import Cancelled, Completed
from prefect.task_runners import SequentialTaskRunner


@task(name="Read Data")
def read_data(db_con, start_date, end_date):
    df = pd.read_sql(
        sql="SELECT * FROM TABLE_NAME WHERE DATE_COLUMN BETWEEN :START AND :END",
        con=db_con,
        params={"START": start_date, "END": end_date},
    )
    return df


@task(name="Transform Data")
def transform_data():
    """DO SOME PANDAS CLEANING AND TRANSFORMATIONS"""
    pass


@task(name="Write Data")
def write_data(db_con, df):
    df.to_sql(name="TABLE_NAME_2", con=db_con, if_exists="append")


@flow(name="Data Subflow")
def update_table(db_con, range_start, range_end):
    df = read_data(
        db_con=db_con, start_date=range_start, end_date=range_end, return_state=True
    )
    if df.empty:
        return Completed(message=f"No new records.")

    df_transformed = transform_data(df)
    write_data(df=df_transformed)

    return Completed()


@flow(name="Data Flow")
def main(start_date="2022-01-01", end_date="2022-09-01"):

    db_con = "some database connection"
    date_range = pd.date_range(start_date, end_date, freq="W-MON")

    for s, e in zip(date_range, date_range[1:]):
        update_table(db_con, range_start=s, range_end=e)


if __name__ == "__main__":
    main()
1 Like

This is possible when you instead do:

your_task.submit()

btw configuring result behavior and disabling persisting results is on immediate roadmap - I’d expect it to be tackled by the end of the month

Thanks Anna, this helped. I’ll be looking forward to the new feature though.

1 Like