Hi,
Running into issue where agent on AWS EC2 unable to download flow from S3.
Here details
Error when run deployment:
Downloading flow code from storage at None
02:02:09 PM
prefect.flow_runs
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File “/usr/local/lib/python3.10/dist-packages/prefect/engine.py”, line 277, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File “/usr/local/lib/python3.10/dist-packages/prefect/client/utilities.py”, line 40, in with_injected_client
return await fn(*args, **kwargs)
File “/usr/local/lib/python3.10/dist-packages/prefect/deployments.py”, line 194, in load_flow_from_flow_run
await storage_block.get_directory(from_path=deployment.path, local_path=“.”)
File “/usr/local/lib/python3.10/dist-packages/prefect/filesystems.py”, line 478, in get_directory
return await self.filesystem.get_directory(
File “/usr/local/lib/python3.10/dist-packages/prefect/filesystems.py”, line 322, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File “/usr/local/lib/python3.10/dist-packages/fsspec/asyn.py”, line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
File “/usr/local/lib/python3.10/dist-packages/fsspec/asyn.py”, line 99, in sync
raise return_result
File “/usr/local/lib/python3.10/dist-packages/fsspec/asyn.py”, line 54, in _runner
result[0] = await coro
File “/usr/local/lib/python3.10/dist-packages/fsspec/asyn.py”, line 531, in _get
rpaths = await self._expand_path(rpath, recursive=recursive)
File “/usr/local/lib/python3.10/dist-packages/fsspec/asyn.py”, line 737, in _expand_path
out = await self._expand_path([path], recursive, maxdepth)
File “/usr/local/lib/python3.10/dist-packages/fsspec/asyn.py”, line 761, in _expand_path
raise FileNotFoundError(path)
FileNotFoundError: [‘prefect-jp/dataflow-ops/prod’]
Confirmed S3 block is valid and flows uploaded to S3 when deployment is created.
Here deployment commands (same outcome if done from CLI):
from flows.healthcheck import healthcheck
from prefect.deployments import Deployment
from prefect.filesystems import S3
deployment = Deployment.build_from_flow(
flow=healthcheck,
entrypoint="flows/healthcheck.py",
name="aws-s3-deploy",
description="",
version="v1",
work_queue_name="aws-ec2",
storage=S3.load("prod"),
infra_overrides=dict(env={"PREFECT_LOGGING_LEVEL": "DEBUG"}),
output="../deploy/aws-s3-deploy.yaml"
)
if __name__ == "__main__":
deployment.apply()
Here deployment yaml file:
###
### A complete description of a Prefect Deployment for flow 'healthcheck'
###
name: aws-s3-deploy
description: null
version: v1
# The work queue that will handle this deployment's runs
work_queue_name: aws-ec2
work_pool_name: default-agent-pool
tags: []
parameters: {}
schedule: null
is_schedule_active: true
infra_overrides:
env:
PREFECT_LOGGING_LEVEL: DEBUG
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: healthcheck
manifest_path: null
infrastructure:
type: process
env: {}
labels: {}
name: null
command: null
stream_output: true
working_dir: null
_block_document_id: c8a7a5b2-deec-452b-9990-a48455eada52
_block_document_name: anonymous-aa60b665-2340-4cf4-8ebd-3a3fbb4a2877
_is_anonymous: true
block_type_slug: process
_block_type_slug: process
storage:
bucket_path: prefect-jp/dataflow-ops/prod/
aws_access_key_id: '**********'
aws_secret_access_key: '**********'
_block_document_id: 2440d8c0-eb56-4137-9f53-a5a477e44072
_block_document_name: prod
_is_anonymous: false
block_type_slug: s3
_block_type_slug: s3
path: null
entrypoint: flows/healthcheck.py
parameter_openapi_schema:
title: Parameters
type: object
properties: {}
required: null
definitions: null
timestamp: '2023-03-27T18:00:56.210487+00:00'
Any help/tip appreciated. Thanks.