Hello Team,
I am encountering an issue while running Prefect tasks concurrently in my pipeline (Prefect version: 2.16.5). Below is a brief explanation of the process I’m following:
Pipeline Overview:
- I run tasks in batches, with each batch containing 100 individual tasks.
- I execute 8 tasks concurrently using the
concurrency_limit
parameter. - After a batch of 100 tasks is completed, the next 100 tasks begin processing.
- I have set the
timeout_seconds
parameter to 3600 seconds (60 minutes), expecting a TimeoutError to be raised if any task exceeds this duration. - The core task function makes a POST request to an API which is running in a separate m/c and returns the corresponding output.
Issue:
Although the Prefect UI reflects the expected behavior — each flow processes 100 tasks concurrently — in some cases, one or more tasks get stuck at the point where the API call is made. These tasks continue to run for more than an hour but are not being canceled by the timeout function, which in turn blocks the activation of the next batch of tasks.
I have already verified that there are no issues with the API itself.
Current Workaround:
As a temporary solution, I manually cancel the entire process and rerun it. However, this approach is time-consuming and requires constant monitoring to detect when a task gets stuck.
Could you help identify why the timeout function is not working as expected in this case? Below is the relevant code, along with a screenshot of the UI showing the stuck task.
Deployment Code
from prefect import flow, serve, get_run_logger, task
@flow(log_prints=True)
def process_flow():
process_docs(dict_docs, 100)
def run():
deployments = [
process_flow.to_deployment(name="process_flow/deployment_test_1"),
]
serve(*deployments)
if __name__ == "__main__":
run()
Task Execution and Batch Processing Code
import asyncio
from prefect import get_client
def process_tasks(doc):
# Execute the API call with POST request and return response
pass
# Process documents in batches and submit tasks to Prefect
def process_batch(batch_docs, batch_fnames):
append_tasks = []
for doc, task_fname in zip(batch_docs, batch_fnames):
task_run = task(
process_docs,
task_run_name=task_fname,
timeout_seconds=60 * 60, # Set timeout to 60 minutes
log_prints=True,
)
append_tasks.append(task_run.submit(doc))
return append_tasks
def process_docs(dict_docs, que_batch_size=100):
# Split documents into batches
for i in range(0, len(dict_docs), que_batch_size):
batch_docs = dict_docs[i : i + que_batch_size]
batch_fnames = fnames[i : i + que_batch_size]
# Submit the current batch of tasks
batch_tasks = process_batch(batch_docs, batch_fnames)
# Wait for the completion of all tasks in the batch
for task in batch_tasks:
try:
task.result()
except Exception as e:
print(f"Task failed with exception: {e}")
# Asynchronously trigger the deployment
async def arun_flow(deployment_name):
async with get_client() as client:
deployment = await client.read_deployment_by_name(deployment_name)
response = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
name="test_flow_run",
)
return response
if __name__ == "__main__":
response = asyncio.run(arun_flow("process_flow/deployment_test_1"))
Prefect UI Task Still Running picture
@Jeff_Hale_Prefect please share your thoughts