Run Flow in venv

I’m trying to run a flow run in a virtual environment (created with python -m venv environmentname). I saw this discourse thread that explains how to run a flow in a conda virtual environment by changing the “command” parameter in the deployment.yaml file.

Here: How to run my deployment in a virtual environment such as conda?

When I generated the deployment.yaml file, “command” was null. It didn’t run in this state. It says this error below (even though I don’t reference “C:/Program” anywhere in the script or deployment file).

09:48:10.305 | INFO    | prefect.agent - Submitting flow run '8c6bd613-7c3e-4ba3-bd68-bbb95fad270b'
09:48:10.578 | INFO    | prefect.infrastructure.process - Opening process 'industrious-shellfish'...
09:48:10.650 | INFO    | prefect.agent - Completed submission of flow run '8c6bd613-7c3e-4ba3-bd68-bbb95fad270b'
'C:\Program' is not recognized as an internal or external command,
operable program or batch file.
09:48:10.708 | ERROR   | prefect.infrastructure.process - Process 'industrious-shellfish' exited with status code: 1

So, I changed it to see if it’d run under my virtual environment…

  command:
  - '"C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\Scripts\python.exe"'
  - -m 
  - prefect.engine

However, with that, it gives a different error:

09:29:06.930 | ERROR   | Flow run 'resilient-coua' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "testflow.py", line 27, in <module>
    my_favorite_function()
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\flows.py", line 439, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\engine.py", line 150, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\lib\asyncio\base_events.py", line 642, in run_until_complete
    return future.result()
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\engine.py", line 222, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\engine.py", line 349, in begin_flow_run
    flow_run_context.result_factory = await ResultFactory.from_flow(
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\results.py", line 161, in from_flow
    return await cls.default_factory(
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\results.py", line 123, in default_factory
    return await cls.from_settings(**kwargs, client=client)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\results.py", line 229, in from_settings
    storage_block_id, storage_block = await cls.resolve_storage_block(
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\results.py", line 257, in resolve_storage_block
    or await storage_block._save(is_anonymous=True, overwrite=True)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\blocks\core.py", line 751, in _save
    await self.register_type_and_schema(client=client)
TypeError: object NoneType can't be used in 'await' expression

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\engine.py", line 247, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\deployments.py", line 174, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\utilities\asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\utilities\importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenv\prefectenv\lib\site-packages\prefect\utilities\importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'testflow.py' encountered an exception
09:29:08.358 | INFO    | prefect.infrastructure.process - Process 'resilient-coua' exited cleanly.

For reference… here’s my prefect version:

Version:             2.6.5
API version:         0.8.3
Python version:      3.9.7
Git commit:          9fc2658f
Built:               Thu, Oct 27, 2022 2:24 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.35.5

Here’s the flow:

from prefect import flow, task
import sys


@task(name="print-fav-number")
def print_fav_number():
    print("What is your favorite number?")

@task(name="print-99")
def print_99():
    print("99")
    return 99

@task(name="print-environment")
def print_env():
    if(sys.prefix is None):
        print("None")
    else:
        print(sys.prefix)

@flow(name="testflow")
def my_favorite_function():
    print_fav_number()
    print_99()
    print_env()

my_favorite_function()

… and here’s the full deployment.yaml:

###
### A complete description of a Prefect Deployment for flow 'testflow'
###
name: Test Deployment Name 2
description: null
version: beb696a16301c83f6ea868325f8d72ab
# The work queue that will handle this deployment's runs
work_queue_name: default
tags: []
parameters: {}
schedule: null
infra_overrides: {}

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: testflow
manifest_path: null
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command:
  - '"C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python39_64\pyvenvs\prefectenv\python.exe"'
  - -m
  - prefect.engine
  stream_output: true
  working_dir: null
  _block_document_id: c467ec15-0737-4366-a358-d0eede78e455
  _block_document_name: anonymous-6e7141c8-7082-490a-9c6c-406e7fe69c51
  _is_anonymous: true
  block_type_slug: process
  _block_type_slug: process
storage: null
path: E:\Prefect\test
entrypoint: testflow.py:my_favorite_function
parameter_openapi_schema:
  title: Parameters
  type: object
  properties: {}
  required: null
  definitions: null

I understand the problem, let me open an issue to add a custom infra block for allowing to set up virtual environments

@doc I actually just cross-checked that and this should already be possible, the only issue you need to be careful about is to provide that command as a list

this issue comment explains it more and shows an example: Add support for virtual environments on the `Process` infrastructure block · Issue #7379 · PrefectHQ/prefect · GitHub

I also updated this Discourse topic - it should now provide more helpful setup on how to configure this:

1 Like

Thanks for the info, @anna_geller! My bug actually ended up being that my flow had the method at the bottom of the flow. d’oh! So, it was like running the flow within the actual flow or something. Anyway, so I fixed it by removing the method from the end of the flow file.

Thanks so much for reporting it and nice work!

Hey, sorry if I missed this. Is there any way to make an agent enter the necessary venv before running a flow?

I have several pre-made virtual environments created via “python -m venv {flow_name_here}_env”
These virtual environments contain the necessary dependencies for each flow.
I want agents to go into the correct venv before running a flow.

Is there a way to do this? I don’t really understand the examples I see, and I’m not sure why they’re referencing prefect.engine.

I have attempted to use full paths and relative paths, but the agent can’t see the directory even though its right next to the flow.

  command: 
  - '"./env/bin/activate"'
  - -m
  - prefect.engine

Outputs

FileNotFoundError: [Errno 2] No such file or directory: '"./env/bin/activate"'