Are there some packages that are not compatible with prefect2?

Using pikepdf I can open a pdf in about .5 seconds and read/decrypt/save it in 12 seconds. In the example below if I run the underlying task function it completes in …5 seconds. However if I run it in prefect2 as a flow/task then it takes over 3 minutes. If I also try to save the file it takes more than 10 minutes and the task says it is still running.

Are there some packages that are not compatible with prefect2 perhaps due to their use of threading or multiprocessing?

from prefect.flows import flow
from prefect.tasks import task
import pikepdf

import logging

log = logging.getLogger(__name__)


@flow
def testflow(path):
    decrypt(path)


@task
def decrypt(path):
    log.info("starting pike")
    pdf = pikepdf.Pdf.open(path)
    log.info("finished pike")

Can you share the PDF you’re trying to process?

I ran this:

!wget https://www.prefect.io/slate-data-case-study.pdf

from prefect import flow, task, get_run_logger

import logging
import pikepdf


@flow
def testflow(path):
    decrypt(path)


@task
def decrypt(path):
    log = get_run_logger()
    log.info("starting pike")
    pdf = pikepdf.Pdf.open(path)
    log.info("finished pike")

testflow("slate-data-case-study.pdf")

And it finished pretty fast:

10:32:10.576 | INFO    | prefect.engine - Created flow run 'spectral-orca' for flow 'testflow'
10:32:10.577 | INFO    | Flow run 'spectral-orca' - Using task runner 'ConcurrentTaskRunner'
10:32:11.176 | INFO    | Flow run 'spectral-orca' - Created task run 'decrypt-c5e96fd6-0' for task 'decrypt'
10:32:11.713 | INFO    | Task run 'decrypt-c5e96fd6-0' - starting pike
10:32:11.719 | INFO    | Task run 'decrypt-c5e96fd6-0' - finished pike
10:32:11.970 | INFO    | Task run 'decrypt-c5e96fd6-0' - Finished in state Completed()
10:32:12.147 | INFO    | Flow run 'spectral-orca' - Finished in state Completed('All states completed.')
Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=None, task_run_id=e35c3bfd-fc38-444a-823e-038dcb679ce1)], flow_run_id=8aaed321-e46e-481b-8539-f9ca168f70cc)

I tried it on your test file and it did complete though it took 15 seconds versus your 6 milliseconds. I guess my own pdf is 282 pages so it is probably taking a long time rather than hanging. I wonder why yours is so much faster.

20:15:38.842 | INFO    | prefect.engine - Created flow run 'delectable-robin' for flow 'testflow'
20:15:38.843 | INFO    | Flow run 'delectable-robin' - Using task runner 'ConcurrentTaskRunner'
20:15:38.914 | WARNING | Flow run 'delectable-robin' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
20:15:39.168 | INFO    | Flow run 'delectable-robin' - Created task run 'decrypt-c5e96fd6-0' for task 'decrypt'
20:15:39.983 | INFO    | Task run 'decrypt-c5e96fd6-0' - starting pike
20:15:53.175 | INFO    | Task run 'decrypt-c5e96fd6-0' - finished pike
20:15:53.645 | INFO    | Task run 'decrypt-c5e96fd6-0' - Finished in state Completed()
20:15:53.747 | INFO    | Flow run 'delectable-robin' - Finished in state Completed('All states completed.')

Which version of prefect are you using?

Also, are you using a remote storage or local?

Thanks. It was the storage. I was in wsl2 using a windows file path. Moving the file to wsl2 local reduces it to 5 seconds. Remaining gap is probably because orion is in a docker container.

I did not realise how slow that storage link is :grinning:

2 Likes

Whilst this is related to the file system being used I am still unsure why this is slower in prefect than running directly as in both cases I am accessing files from wsl to windows file system.

Probably related I have another task that loads some data. It takes 8 seconds to run the function directly; 9 seconds using prefect sequential task runner; 35 seconds on dask (including setting up dask cluster; and 2’35 using concurrent task runner when it is the only task running.

The second task I mentioned above I investigated further and this was not due to the file system. Example below is not using the file system. The task runs its loop in 2 seconds using SequentialTaskRunner; but takes 30 seconds in ConcurrentTaskRunner. I wonder why?

from prefect.flows import flow
from prefect.tasks import task

from prefect.task_runners import ConcurrentTaskRunner as runner
#from prefect.task_runners import SequentialTaskRunner as runner

from prefect import get_run_logger
import spacy

@flow(task_runner=runner())
def testflow():
    testtask()

@task
def testtask():
    log = get_run_logger()
    nlp = spacy.load("en_core_web_sm")
    text = "the cat sat on the mat"
    for n in range(100):
        nlp(text)
        if n % 10 == 0:
            log.info(n)

if __name__ == "__main__":
    testflow()

there are so many variables at play that it’s hard to pinpoint a single root cause

We will be working in the next releases on streamlining the storage and deployment story - perhaps you can revisit this after GA at the end of the month?

You may then investigate various conditions and pick the task runner that bests suits your use case

:point_right: Note to anyone using DaskTaskRunner or RayTaskRunner:

from prefect version 2.0b8 onwards, those task runners were moved to the respective Prefect Collections for better code dependency management (the core library no longer requires dask or ray as dependencies - now, those can be installed sepataely when needed).

The correct imports are now:

from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner

This issue was caused by conflicting multiprocessing.

This worked with spacy. However when I run a pytorch model I have the same issue. It worked in previous version of prefect2 but in the new version it is very slow even with SequentialTaskRunner running just a single task.

also here, I would wait with performance optimizations until the stable release next week:

All fixed now in b12 :grinning:

Looking forward to the release version

1 Like