Hi all, Im trying to run a deployment in a private cluster using K3s and minio in the same cluster.
Im having issues trying to run my first flow in the cluster.
I’ve something simple as:
@task
def say_hi():
logger = get_run_logger()
logger.info("Hello from the Health Check Flow! 👋")
@flow
def healthcheck():
hi = say_hi()
Im confused how to do the deployment for prefect2. If I use Deployment.build_from_flow I can’t use remote storage.
As a reference, this is what I have:
- prefect-server (in k3s cluster)
- prefect-worker (in k3s cluster), it appears in Work Pools with ready status
- Remote File System / minio as my S3 private storage
Using the following code:
deployment = Deployment.build_from_flow(
flow=healthcheck,
name="healthcheck",
work_pool_name="myworker",
storage=RemoteFileSystem.load("minio"),
infra_overrides=dict(env={"PREFECT_LOGGING_LEVEL": "DEBUG"}),
)
if __name__ == "__main__":
deployment.apply()
The deployment it’s assocaited with the worker, and when I try to run it, it shows multiple lines like:
INFO Worker 'KubernetesWorker bd0663b7-84dc-42ba-81c4-ce5518d6eac4' submitting flow run '854d4829-7e57-4bd3-bed6-2dee6a63ac43' 02:13:16 AM prefect.flow_runs.worker
INFO Worker 'KubernetesWorker bd0663b7-84dc-42ba-81c4-ce5518d6eac4' submitting flow run '854d4829-7e57-4bd3-bed6-2dee6a63ac43' 02:13:16 AM prefect.flow_runs.worker
No pods or jobs are created in my cluster
Anyone know what I may doing wrong ?
UPDATE 1:
Looks like there some checks failing:
02:21:58.963 | INFO | prefect.flow_runs.worker - Worker 'KubernetesWorker 0c8452f8-7f2f-48e9-bafe-2c9a39650949' submitting flow run '854d4829-7e57-4bd3-bed6-2dee6a63ac43'
02:21:59.011 | ERROR | prefect.worker.kubernetes.kubernetesworker 0c8452f8-7f2f-48e9-bafe-2c9a39650949 - Flow run 854d4829-7e57-4bd3-bed6-2dee6a63ac43 did not pass checks and will not be submitted for execution
Traceback (most recent call last):
File "/Users/agustinvinao/dev/tlon/kaizen-ui/portfolio-analysis/.venv/lib/python3.12/site-packages/prefect/workers/base.py", line 859, in _submit_run
await self._check_flow_run(flow_run)
File "/Users/agustinvinao/dev/tlon/kaizen-ui/portfolio-analysis/.venv/lib/python3.12/site-packages/prefect/workers/base.py", line 844, in _check_flow_run
raise ValueError(
ValueError: Flow run UUID('854d4829-7e57-4bd3-bed6-2dee6a63ac43') was created from deployment 'healthcheck' which is configured with a storage block. Please use an agent to execute this flow run.
Im trying to identify why:
02:21:59.011 | ERROR | prefect.worker.kubernetes.kubernetesworker 0c8452f8-7f2f-48e9-bafe-2c9a39650949 - Flow run 854d4829-7e57-4bd3-bed6-2dee6a63ac43 did not pass checks and will not be submitted for execution
UPDATE 2:
Looks like Im mixing V1 and V2 ways of doing deployments:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 859, in _submit_run
await self._check_flow_run(flow_run)
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 844, in _check_flow_run
raise ValueError(
ValueError: Flow run UUID('2a06a32d-3d22-4676-bff4-f36f83dda42e') was created from deployment 'healthcheck' which is configured with a storage block. Please use an agent to execute this flow run.
Im trying to use a storage block and this may not be the right way for kubernetes:
was created from deployment 'healthcheck' which is configured with a storage block. Please use an agent to execute this flow run.
I will try to add an agent to the cluster just to see if it runs the deployment. I saw workers are replacements for agents but I couldn’t found good documentation how to do the deployment for Prefect2 + Kubernetes + Minio.
FINAL UPDATE:
I can confirm adding an Agent to the cluster with the same pool name it runs the job.
My question then is simple, using Prefect2 + Minio in a private cluster, how can I setup the deployment with a remote storage?
When I use:
deployment = Deployment.build_from_flow(
flow=healthcheck,
name="healthcheck",
work_pool_name="myworker",
storage=RemoteFileSystem.load("minio"),
infra_overrides=dict(env={"PREFECT_LOGGING_LEVEL": "DEBUG"}),
)
if __name__ == "__main__":
deployment.apply()
It requires an agent when it finds storage=RemoteFileSystem.load("minio"),
.
Any help is welcome, thanks