I'm getting storage errors when running flows from deployments using DockerFlowRunner (FileNotFoundError: [Errno 2] No such file or directory: '/var/folders/tb/xxx/T/prefect/deployment_UUID')

I tried following this:

The first time, I didn’t run prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

The second time, I ran that, but I used local storage, but I saw this line:
" * A remote Storage configuration, not Local Storage or Temporary Local Storage."

The third time, I ran with GCS and meticulously followed the tutorial, but I still get the following:

10:26:02.698 | INFO    | prefect.flow_runner.docker - Flow run 'adamant-longhorn' has container settings = {'image': 'prefecthq/prefect:2.0a13-python3.9', 'network': None, 'command': ['python', '-m', 'prefect.engine', '2b297873-9449-4a9f-b4a1-22921851b594'], 'environment': {'PREFECT_API_URL': 'http://host.docker.internal:4200/api'}, 'auto_remove': False, 'labels': {'io.prefect.flow-run-id': '2b297873-9449-4a9f-b4a1-22921851b594'}, 'extra_hosts': None, 'name': 'adamant-longhorn', 'volumes': []}
10:26:02.902 | INFO    | prefect.agent - Completed submission of flow run '2b297873-9449-4a9f-b4a1-22921851b594'
10:26:02.918 | INFO    | prefect.flow_runner.docker - Flow run container 'adamant-longhorn' has status 'running'
17:26:11.331 | ERROR   | prefect.engine - Engine execution of flow run '2b297873-9449-4a9f-b4a1-22921851b594' exited with unexpected exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 951, in <module>
    enter_flow_run_engine_from_subprocess(flow_run_id)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 134, in enter_flow_run_engine_from_subprocess
    return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 81, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 199, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_deployment(deployment, client=client)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 81, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 329, in load_flow_from_deployment
    maybe_flow = await client.resolve_datadoc(deployment.flow_data)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1695, in resolve_datadoc
    return await resolve_inner(datadoc)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1688, in resolve_inner
    data = await self.retrieve_data(data)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1259, in retrieve_data
    return await storage_block.read(embedded_datadoc)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/storage.py", line 118, in read
    async with await anyio.open_file(storage_path, mode="rb") as fp:
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_fileio.py", line 156, in open_file
    fp = await to_thread.run_sync(open, file, mode, buffering, encoding, errors, newline,
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
FileNotFoundError: [Errno 2] No such file or directory: '/var/folders/tb/fksvh8ms2hl5xnzwrftnx66r0000gq/T/prefect/b52c01e1-972d-46eb-b82b-bce53d68baaa'
10:26:11.800 | INFO    | prefect.flow_runner.docker - Flow run container 'adamant-longhorn' has status 'exited'

b52... is my deployment UUID I think.

Also, to reduce the tediousness of getting started, I feel like there should be a prefect orion quick-start which encompasses all the steps below. There seems to be a lot of steps to just get started with a deployment for docker.

  1. prefect orion start
  2. prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
  3. prefect storage create
  4. prefect work-queue create tut-work-queue
  5. prefect agent start ‘’
1 Like

Regarding the error message you got, the problem occurs since your flow code is stored locally on your machine, but the flow run is deployed within an independent container that doesn’t have access to the host machine disk, unless you bind mount a specific volume.

So you have two options:

  1. Build your custom Dockerfile and COPY your flow code into the docker image, then reference the flow_location to match the path you specified on your Docker image - this allows you to use local storage
  2. Use object storage such as S3, Azure or GCS, but then make sure to bind mount the credentials for the cloud provider on your DockerFlowRunner - here is an example for AWS S3:

And regarding the prefect orion quick-start, I’ll forward the feedback to the team :slight_smile:

1 Like

@ahuang11 thanks for bringing this up. In broad strokes what you’re running into is related to the relationship between API URL, the Orion database used by the API you’re pointing at via the API URL, and the storage you’ve created, which is defined on that database. As a result, order of operations is important.

We haven’t done as good a job as I’d like covering this in the docs. It’s somewhat covered in the concepts docs, but needs to be more explicit both there and in the tutorials. This is on my roadmap.

In the meantime, I’d suggest the following order of operations:

  1. Set PREFECT_API_URL if not already configured for the server you want to use or Cloud.
  2. Configure storage if you haven’t already done so for this server. You may want to prefect storage ls to make sure the expected storage is available.
  3. Create your deployment.

@anna_geller feel free to correct me if I’m missing any critical details of this scenario. I’m mostly trying to clarify that the tutorials need a little more work to make this particular aspect clear.

2 Likes

Thanks for the reply; I think those makes sense, but I am still encountering some issues.

For #1, I created a Dockerfile

FROM prefecthq/prefect:latest

WORKDIR /app

ADD docker-deployment.py .

and then ran
docker build . -t test:latest

Updated my DeploymentSpec

from prefect import flow, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner

@flow
def my_docker_flow():
    logger = get_run_logger()
    logger.info("Hello from Docker!")

DeploymentSpec(
    name="docker-example",
    flow=my_docker_flow,
    image="test:latest",
    flow_runner=DockerFlowRunner()
)

However, when I run prefect deployment create ./docker-deployment.py, I encounter

ValidationError: 1 validation error for DeploymentSpec
image
  extra fields not permitted (type=value_error.extra)
Encountered exception while loading specifications from 'docker-deployment.py'

Any ideas appreciated!

Oops, I should be putting the image keyword under DockerFlowRunner()

1 Like

the image is an argument of a DockerFlowRunner, rather than a DeploymentSpec:

from prefect import flow, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner

@flow
def my_docker_flow():
    logger = get_run_logger()
    logger.info("Hello from Docker!")

DeploymentSpec(
    name="docker-example",
    flow=my_docker_flow,
    flow_runner=DockerFlowRunner(image="test:latest")
)

Also, I would use COPY rather than ADD (super subjective, both should work here, but COPY makes the intent clearer, especially given that you don’t have any zip files that would need to be unzipped):

COPY docker-deployment.py .

I think I’m getting closer, but still running into issues with Docker (and I think it’s due to my minimal knowledge of using Docker). Thanks for guiding me through!

First issue; I don’t think I can just use a local image, i.e. I need to docker push test:latest?

docker.errors.ImageNotFound: 404 Client Error for http+docker://localhost/v1.41/images/create?tag=latest&fromImage=test: Not Found ("pull access denied for test, repository does not exist or may require 'docker login': denied: requested access to the resource is denied")

But I am encountering denied: requested access to the resource is denied

The second issue is that I am not sure how to access the Orion image (prefect v2). In Dockerfile I tried using
FROM prefecthq/prefect:2.0b2
But I can’t find the image, so I think I should just start from the latest image and do a pip install -U prefect==2.0b2?

A couple of things to make it work:

Create a Dockerfile

Here is a super simple Dockerfile example:

FROM prefecthq/prefect:2.0b2-python3.9
RUN pip install requests awswrangler

Build a local image

Name your image as you like, here we use a name orion-aws:

docker build -t orion-aws .

Set image pull policy to NEVER and set PREFECT_API_KEY environment variable

To make sure that Prefect doesn’t try to pull the image from a public Dockerhub repository, set: image_pull_policy="NEVER". Also, make sure to add environment variable with your Cloud 2.0 API key to make sure your container can communicate with the Orion API in the :cloud: .

from prefect import flow, task, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
import platform


@task
def say_hi():
    logger = get_run_logger()
    logger.info("Hello from Docker!")
    import awswrangler as wr

    logger.info(f"{wr.__version__}")  # to show dependencies from the Docker image


@task
def print_platform_info():
    logger = get_run_logger()
    logger.info(
        "Platform information: IP = %s, Python = %s, Platform type = %s, OS Version = %s",
        platform.node(),
        platform.python_version(),
        platform.platform(),
        platform.version(),
    )


@flow
def orion_s3_docker():
    hi = say_hi()
    print_platform_info(wait_for=[hi])


DeploymentSpec(
    name="docker_custom_local_image",
    flow=orion_s3_docker,
    tags=["local"],
    flow_runner=DockerFlowRunner(
        image="orion-aws:latest",
        image_pull_policy="NEVER",
        volumes=["/Users/anna/.aws:/root/.aws"],
        env=dict(PREFECT_API_KEY="YOUR_API_KEY",),
    ),
)

This should give you output as follows:


1 Like

Woohoo!! I finally got it to run on GCP! (I didn’t run AWS because I don’t have an account yet)

13:53:32.176 | INFO    | prefect.flow_runner.docker - Flow run container 'splendid-flamingo' has status 'running'
20:53:41.327 | INFO    | Flow run 'splendid-flamingo' - Using task runner 'ConcurrentTaskRunner'
20:53:41.396 | INFO    | Flow run 'splendid-flamingo' - Created task run 'say_hi-93ad9977-0' for task 'say_hi'
20:53:41.519 | INFO    | Flow run 'splendid-flamingo' - Created task run 'print_platform_info-340ece69-0' for task 'print_platform_info'
20:53:42.044 | INFO    | Task run 'say_hi-93ad9977-0' - Hello from Docker!
20:54:03.577 | INFO    | Task run 'say_hi-93ad9977-0' - Finished in state Completed(None)
20:54:03.691 | INFO    | Task run 'print_platform_info-340ece69-0' - Platform information: IP = ce1f8a84e806, Python = 3.9.11, Platform type = Linux-5.10.104-linuxkit-x86_64-with-glibc2.31, OS Version = #1 SMP PREEMPT Wed Mar 9 19:01:25 UTC 2022
20:54:05.579 | INFO    | Task run 'print_platform_info-340ece69-0' - Finished in state Completed(None)
20:54:06.650 | INFO    | Flow run 'splendid-flamingo' - Finished in state Completed('All states completed.')

My Dockerfile:

FROM prefecthq/prefect:2.0b2-python3.9
RUN pip install requests

My flow code:

from prefect import flow, task, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
import platform


@task
def say_hi():
    logger = get_run_logger()
    logger.info("Hello from Docker!")


@task
def print_platform_info():
    logger = get_run_logger()
    logger.info(
        "Platform information: IP = %s, Python = %s, Platform type = %s, OS Version = %s",
        platform.node(),
        platform.python_version(),
        platform.platform(),
        platform.version(),
    )


@flow
def orion_test_docker():
    hi = say_hi()
    print_platform_info(wait_for=[hi])


DeploymentSpec(
    name="docker_custom_local_image",
    flow=orion_test_docker,
    tags=["local"],
    flow_runner=DockerFlowRunner(
        image="orion-test:latest",
        image_pull_policy="NEVER",
        volumes=["/Users/andrew/.secrets/gcp.json:/root/gcp-docker.json"],
        env=dict(GOOGLE_APPLICATION_CREDENTIALS="/root/gcp-docker.json")
    ),
)
1 Like

Nice work and thanks for bringing this all up! We definitely need to build more examples, this will land in a repo with deployment recipes for Prefect 2! :tada:

By the way, I was wondering what the PREFECT_API_KEY is used for in your example. Also, where do you retrieve it?

I was using Cloud 2.0 API rather than running Orion API locally.

You can read more about Cloud Beta on this docs page:

and here: