[Prefect - Airbyte - Error] ssl.SSLSyscallError: Some I/O error occurred (_ssl.c:997) and anyio.BrokenResourceError

Hi, I’m running Prefect 2.0 with Prefect Cloud, a Agent in Docker Container (Image prefect 2.7, python 3.10) and Local Infras in order to trigger Airbyte Connection (Airbyte Locally).
And I encounter below problem occasionally, and don’t know how to fix it (it’s quite annoying):

  • During run-time, it pops up Crash Detected!
Crash detected! Request to https://api.prefect.cloud/api/accounts/a97f32b1-459d-44c5-a184-686af42acb05/workspaces/cf08ccbd-f54a-41b5-9c8f-a4d63a16cd42/task_runs/bcbae618-2cb0-4dd2-a5b4-600b03a24e2c/set_state failed: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
    result = func(*args)
  File "/usr/local/lib/python3.10/ssl.py", line 975, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLSyscallError: Some I/O error occurred (_ssl.c:997)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 76, in start_tls
    raise exc
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 67, in start_tls
    ssl_stream = await anyio.streams.tls.TLSStream.wrap(
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 122, in wrap
    await wrapper._call_sslobject_method(ssl_object.do_handshake)
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 151, in _call_sslobject_method
    raise BrokenResourceError from exc
anyio.BrokenResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
    stream = await self._connect(request)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 150, in _connect
    stream = await stream.start_tls(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 64, in start_tls
    with map_exceptions(exc_map):
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ConnectError

The above exception was the direct cause of the following exception:

httpx.ConnectError
  • With the same code, it run smoothly several days, then this problem occurs, then run smoothly some days, and then get crashed.

Even the task and flow in Prefect shows up as Crashed, the actual flow and task in Airbyte still run successfully.
Does anyone have any ideas to fix this?

Great writeup! Thanks for adding all the tags.

Could you say a bit more about your setup? When you say that you run your agent in a Docker container, does it mean you run everything in a single container? Assuming that your Airbyte probably runs in Docker (docker-compose setup?), it can make networking (and SSL error) issues less likely if you would run your agent in a local process instead of a Docker container.

You could also try adding those environment variables:

export CURL_CA_BUNDLE=''
export PYTHONHTTPSVERIFY='false'

if you’re on Windows, setting CURL_CA_BUNDLE=’’, PYTHONHTTPSVERIFY=‘false’ and the HTTP_PROXY and HTTPS_PROXY environment variables have helped some users before with similar issues

Thanks @anna_geller for replying,
Sorry about the tags, I just see them then choose.
Btw, to be honest, I’m not from tech field (just move backward to analytics engineer recently), so Prefect’s term about Agent and Infrastructure still confusing me a bit.
But I guess you’re right, I’m running everything in a single container. Here is what I setup for Agent:
Step 1: Build a custom image

FROM prefecthq/prefect:2.7.0-python3.10
RUN python -m pip install prefect-airbyte
CMD ["prefect", "agent", "start", "-q", "APP"]

Step 2: then docker-compose.yml to run in Container

version: "3.9"
services:

## Prefect Agent
  agent1:
    image: repo.volio.vn/phucdm/prefect-local:2022.12.06-1
    restart: always
    entrypoint: ["prefect", "agent", "start", "-q", "APP"]
    environment:
      - PREFECT_API_URL=https://api.prefect.cloud/api/accounts/a97f32b1-459d-44c5-a184-686af42acb05/workspaces/cf08ccbd-f54a-41b5-9c8f-a4d63a16cd42
      - PREFECT_API_KEY=${PREFECT_API_KEY}
    network_mode: "host" 
    profiles: ["agent"]

And now I have a Docker Agent that can listen to Prefect Cloud and execute the Deployment.

The reason, I want to use Agent in a Container is that, if using Agent in local machine directly, I need to open a CMD all the time (due to my limited knowledge, if it doesn’t need to open CMD all the time, please tell me). While Agent in a Container can run in background and restarted automatically if my local machine restart suddenly.

For Airbyte, I run it in Docker too, local host 8000.

About your suggestion - adding environment variables, I just need to add them in the custom image or in the environment variables in docker-compose.yml
For example, I assume just need to add in Step 2 like:
Step 2: then docker-compose.yml to run in Container

version: "3.9"
services:

## Prefect Agent
  agent1:
    image: repo.volio.vn/phucdm/prefect-local:2022.12.06-1
    restart: always
    entrypoint: ["prefect", "agent", "start", "-q", "APP"]
    environment:
      - PREFECT_API_URL=${PREFECT_API_URL}
      - PREFECT_API_KEY=${PREFECT_API_KEY}
      - export CURL_CA_BUNDLE=''
      - export PYTHONHTTPSVERIFY='false'
    network_mode: "host" 
    profiles: ["agent"]

Really hope to hear from you soon!

Don’t sorry, I gave you compliment for using them! Well done, please continue this

on the agent within the Dockerfile sounds more fitting

Do you use the same Docker network for the Prefect agent and the Airbyte server?

I’d say give those environment variables a try first and if this doesn’t work, maybe we can try to reproduce together

Feel free to share your entire setup incl. Airbyte (walking through all the steps you did to set this up) - this could help us reproduce and I’m also sure it would be useful to others

Yes, I run Prefect Agent with network_mode: “host” which help Prefect Agent be able to call API to other local host (Aibyte in my case)

Sure, I will try sharing each step!

1 Like

I tried using this setup, it seems to work (I will wait few days then confirm again), but another problem occurs :smiley:
In my code, I have a task for Slack to send a message through webhook to a channel when a flow is run completely.

""" basic task for slack noti"""
@task(name = "Slack Noti")
async def slack_noti(text_input):
    slack_webhook_block = await SlackWebhook.load("slack-prefect")
    await slack_webhook_block.notify(f"{text_input}")

Now, after set

export CURL_CA_BUNDLE=''
export PYTHONHTTPSVERIFY='false'

It stops sending noti :cry: . Even through in Prefect Cloud, it shows up as successfully
I check the Agent, and see this line of Warning after every execution of Slack Task

 | INFO    | Flow run 'gigantic-bettong' - Executing 'Slack Noti-59d2d3d8-8' immediately...
 | WARNING | apprise - An I/O error occurred while reading attachment.

Any ideas how to fix this :smiley:

To reproduce error (perhaps), here is all of my setting:

  1. Setup Airbyte Local:
    Just deploy Airbyte locallly by using docker. Deploy Airbyte | Airbyte Documentation
    P/s: I’m using Windown 10
  2. Set up prefect agent
  • Build a custom image with Dockerfile:
FROM prefecthq/prefect:2.7.0-python3.10
RUN python -m pip install prefect-airbyte
CMD export CURL_CA_BUNDLE='';export PYTHONHTTPSVERIFY='false'
  • Running container with docker-compose.yml
version: "3.9"
services:

## Prefect Agent
  agent1:
    image: ${your custom image name and tag for example: phucdm/prefect-local:2022.12.16-1}
    restart: always
    entrypoint: ["prefect", "agent", "start", "-q", "APP"]
    environment:
      - PREFECT_API_URL=${PREFECT_API_URL}
      - PREFECT_API_KEY=${PREFECT_API_KEY}
      - CURL_CA_BUNDLE=''
      - PYTHONHTTPSVERIFY='false'
    network_mode: "host" 
    profiles: ["agent"]
  1. Slack noti task
""" basic task for slack noti"""
@task(name = "Slack Noti")
async def slack_noti(text_input):
    slack_webhook_block = await SlackWebhook.load("slack-prefect")
    await slack_webhook_block.notify(f"{text_input}")
  1. Airbyte trigger flow
""" base flow realtime"""
@flow(name="realtime_app_cost",
      timeout_seconds=3600,
      retries=1,
      retry_delay_seconds=10
)
async def base_flow_realtime_1():
    airbyte_server = AirbyteServer()
    await trigger_sync(
        airbyte_server=airbyte_server,
        connection_id={xxxxxx},
        poll_interval_s=60,
        status_updates=False,
        timeout=3600
    )
  1. Put everthing together:
""" add sub_flow into one main flow"""
@flow(timeout_seconds=7200)
async def main_flow():
    await slack_noti("Flow start running")
    await base_flow_realtime_1()
    await slack_noti("Flow run successfully")

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
  1. Deploy and run

P/s: The SSL error usually occur with longtime run deployment. A short one (around 10 mins never has this issue)
P/s 2: After turn HTTP verify is False, Slack Noti Task doesn’t send message anymore :crying_cat_face:

1 Like

Hi, I just found solution for Slack Noti above.

In docker-compose.yml, remove two env variables:

      - CURL_CA_BUNDLE=''
      - PYTHONHTTPSVERIFY='false'

Now it works. Let check if the SSL problem still occurs after several days

1 Like

nice work! I was also curious why wouldn’t notification get sent if this is just a warning rather than error. Also the warning says something about attachments and you seem to send just text

I would replace that with ENV:

FROM prefecthq/prefect:2.7.0-python3.10
RUN python -m pip install prefect-airbyte
ENV CURL_CA_BUNDLE=''
ENV PYTHONHTTPSVERIFY='false'
1 Like

is it a special Christmas thing that your task is “naughty”? :santa: :smile:

great writeup btw! keep us posted

Haha! What a special gift =))))

I will try this way, thanks! CMD way seems not to working, still facing SSL 997.

1 Like

exactly, I wouldn’t set any CMD here, the example above I gave is a complete Dockerfile already - Prefect will set a custom entrypoint for a flow run as part of the Infrastructure block

1 Like

I did run for several days, and it works like a charm. Thank you so much!! @anna_geller

1 Like