Prefect gives connection error when using Dask

Hi! I’m trying to run a flow with a DaskTaskRunner but it gives me some problems when it wants to change the flow run state. It seems that the Prefect client cannot access the Prefect server API for some reason. I think there is some problems with my configuration.

This is the stack

version: "3.8"
services:
  ### PostgreSQL Database
  database:
    image: postgres:14.1-alpine
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=prefectpass
      - POSTGRES_DB=prefect
    expose:
      - 5432
    volumes: 
      - ./dockerdata/postgres:/var/lib/postgresql/data

  ### Prefect server
  prefect:
    image: prefecthq/prefect:2.18-python3.11
    volumes:
      - ./dockerdata/prefect:/prefect
    entrypoint: ["prefect", "server", "start"]
    environment:
      - PREFECT_SERVER_API_HOST=0.0.0.0
      - PREFECT_SERVER_API_PORT=4200
      - PREFECT_UI_API_URL=http://127.0.0.1:4200/api
      - PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://postgres:prefectpass@database:5432/prefect
      - PREFECT_SERVER_CSRF_PROTECTION_ENABLED=False
    ports:
      - 4200:4200
    expose:
      - 4200
    depends_on:
      - database
    networks:
      - default
      

  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin4_container
    restart: always
    ports:
      - "8888:80"
    environment:
      PGADMIN_DEFAULT_EMAIL: test@test.it
      PGADMIN_DEFAULT_PASSWORD: admin
    volumes:
      - ./dockerdata/pgadmin-data:/var/lib/pgadmin

  
  scheduler:
    image: ghcr.io/dask/dask:2023.3.0
    hostname: scheduler
    ports:
      - "8786:8786"
      - "8787:8787"
    command: ["dask","scheduler"]
    networks:
      - default

  worker:
    # image: dask-worker-custom
    command: ["dask-worker", "tcp://scheduler:8786"]
    build:
      dockerfile: ./Dockerfile
    # For Docker swarm you can specify multiple workers, this is ignored by `docker-compose up`
 
    deploy:
      replicas: 2
    
    networks:
      - default

networks:
  default:
    

  
volumes:
  prefect:
  postgres:

This is my client configuration, I disabled CSRF support because it was giving me problem when accessing to the APIs

PREFECT_PROFILE='default'
PREFECT_API_URL='http://127.0.0.1:4200/api' (from profile)
PREFECT_CLIENT_CSRF_SUPPORT_ENABLED='False' (from profile)

This is the error, it’s not very esplicative to be honest

+---------+----------------+----------------+----------------+
| Package | Client         | Scheduler      | Workers        |
+---------+----------------+----------------+----------------+
| python  | 3.8.10.final.0 | 3.8.16.final.0 | 3.8.16.final.0 |
+---------+----------------+----------------+----------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
09:49:55.520 | INFO    | prefect.engine - Created flow run 'tall-weasel' for flow 'get-repo-info'
09:49:55.525 | INFO    | Flow run 'tall-weasel' - View at http://127.0.0.1:4200/flow-runs/flow-run/69e7a92c-01f3-444d-9200-c74ab85bf470
09:49:55.526 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://scheduler:8786
/home/vagrant/Desktop/Tesi/.venv/lib/python3.8/site-packages/distributed/client.py:1361: VersionMismatchWarning: Mismatched versions found

+---------+----------------+----------------+----------------+
| Package | Client         | Scheduler      | Workers        |
+---------+----------------+----------------+----------------+
| python  | 3.8.10.final.0 | 3.8.16.final.0 | 3.8.16.final.0 |
+---------+----------------+----------------+----------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
09:49:55.583 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://scheduler:8787/status
PrefectHQ/prefect repository statistics 🤓:
Stars 🌠 : 15130
Forks 🍴 : 1493
09:49:56.585 | INFO    | Flow run 'tall-weasel' - Created task run 'print_info-1' for task 'print_info'
09:49:59.953 | INFO    | Flow run 'tall-weasel' - Submitted task run 'print_info-1' for execution.
09:49:59.983 | INFO    | Flow run 'tall-weasel' - Created task run 'print_info-0' for task 'print_info'
09:50:00.057 | INFO    | Flow run 'tall-weasel' - Submitted task run 'print_info-0' for execution.
09:50:16.770 | INFO    | Task run 'print_info-0' - Crash detected! Request to http://127.0.0.1:4200/api/task_runs/19d51c96-c76f-4124-b8dc-09cb445171fe/set_state failed: ConnectError: All connection attempts failed.
09:50:16.943 | INFO    | Task run 'print_info-1' - Crash detected! Request to http://127.0.0.1:4200/api/task_runs/cc32312a-c918-4eb6-8f78-fd7399994ac6/set_state failed: ConnectError: All connection attempts failed.
09:50:17.144 | ERROR   | Flow run 'tall-weasel' - Finished in state Failed('2/2 states failed.')

And that’s the simple code I would like to run parallel with Dask.

# This is a sample Python script.
from prefect.client.orchestration import get_client
import asyncio

import dask.distributed
import httpx
from prefect import flow, task
from prefect_dask import DaskTaskRunner

# Press Shift+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.

client = dask.distributed.Client(address="tcp://scheduler:8786")

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def get_repo_info():
   url = "https://api.github.com/repos/PrefectHQ/prefect"
   response = httpx.get(url)
   response.raise_for_status()
   repo = response.json()
   print("PrefectHQ/prefect repository statistics 🤓:")
   print(f"Stars 🌠 : {repo['stargazers_count']}")
   print(f"Forks 🍴 : {repo['forks_count']}")
   print_info.submit()
   print_info.submit()


@task()
def print_info(value: str = "Hello, world!"):
   print("Hello, world!")

if __name__ == "__main__":
   get_repo_info()

Thank you!!

Dario