How to create Azure Blob Storage event-driven Prefect 2 flows with Azure Functions

This post will describe how to run a Prefect flow in an Azure Function in response to new or updated files in Azure Blob Storage.

Prerequisites

  • A trial or paid Azure account.
  • An Azure Storage account with a blob container your function can watch for events.
  • An Azure Resource Group in which you can create a new Function App.
  • The latest version of the Azure CLI installed for your operating system installed and signed in.
  • The latest version of the Azure Functions Core Tools.
  • A Prefect Cloud account, or self-hosted Prefect Orion running in Azure and visible to your Azure Functions app. Self-hosting Orion and making it visible to your Functions app will require the creation of a virtual network and is beyond the scope of this post.
    • If you want to use Prefect Cloud but don’t yet have an account, follow these instructions to sign up, create an API key, and sign the Prefect CLI into your cloud account.

Creating an Azure Function App triggered by Blob Storage events

If you’d like to develop your function app in a Python virtual environment or Anaconda environment, create and activate the environment now.

Next, create a local Azure Functions app by opening a terminal and running:

func init prefect-azure-blob-demo --python

Feel free to change the app name to suit your use case. Next, navigate into the directory created by the func command:

cd prefect-azure-blob-demo

You’ll see four files:

.
├── getting_started.md
├── host.json
├── local.settings.json
└── requirements.txt

Open requirements.txt and add the following line at the bottom of the file:

prefect >= 2.1.1

Leave the default settings in the remaining files. You now have an Azure Functions app, but there are no functions in it. To add one, run:

func new --name PrefectBlobTrigger --template "Azure Blob Storage trigger"

This command creates a new subdirectory called PrefectBlogTrigger with the following contents:

.
├── __init__.py
├── function.json
└── readme.md

Open function.json in your favorite text editor. You will see:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "myblob",
      "type": "blobTrigger",
      "direction": "in",
      "path": "samples-workitems/{name}",
      "connection": "BlobConnectionString"
    }
  ]
}

The JSON name property is the value that will represent the added or updated blob in your function’s code. You can change it to match your use case, but if you do you will also need to change it in the code samples below. See the Azure documentation for additional filename filtering options.

In the path value, change samples-workitems to the name of the Azure Blob Storage container you want to use as a trigger to run your function. Note that the {name} value will be available in your function and will contain the name of the new or changed file that triggered the function call.

The BlobConnectionString setting of connection represents the app configration key where you will save your storage account’s connection string later in the tutorial.

Next, open __init__.py. You’ll see the following:

import logging

import azure.functions as func

def main(myblob: func.InputStream):
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {myblob.name}\n"
                 f"Blob Size: {myblob.length} bytes")

This is where you will put the Prefect flow you want to run when a file is added or changed in the blob container you specified earlier.

If you want to test the blog trigger before adding your own flow code, delete the existing text in __init__.py and replace it with:

import logging

import azure.functions as func
from prefect import flow, get_run_logger

async def main(myblob: func.InputStream):
    logging.info(f"Running Prefect flow!")
    await test_aci_flow(myblob.name, myblob.length)

@flow(name="test_azure_functions_flow")
def test_aci_flow(file_name: str, file_length: int):
    logger = get_run_logger()
    logger.info("Hello from Azure Functions!")
    logger.info(f"Processing blob file {file_name} with length {file_length}")

Before you can deploy your code, you will need to create a new Function App on Azure. You can do so with the following Azure CLI command:

bash:

az functionapp create \
--name PrefectTestFlow \
--storage-account YOUR_STORAGE_ACCOUNT_NAME \
--consumption-plan-location eastus \
--resource-group YOUR_RESOURCE_GROUP \
--functions-version 4 \
--runtime python \
--runtime-version 3.9 \
--os-type linux

PowerShell:

az functionapp create `
--name PrefectTestFlow `
--storage-account YOUR_STORAGE_ACCOUNT_NAME `
--consumption-plan-location eastus `
--resource-group YOUR_RESOURCE_GROUP `
--functions-version 4 `
--runtime python `
--runtime-version 3.9 `
--os-type linux

You can change the name parameter to match your app’s use case. Next, make sure you update the storage-account and resource-group parameters to match your storage account and resource group.

If you don’t yet have a storage account or resource group, see the Azure CLI documentation on how to create resource groups and storage accounts.

For a up-to-date list of locations where you can run your Azure Function App, run:

az functionapp list-consumption-locations -o table

Next, you must set three app configuration settings on your newly-created Function App.

First, you will set Prefect’s API URL and (if applicable) API key. If you are running a self-hosted Prefect Orion instance, you’ll already know the URL you need to use.

If you are using Prefect Cloud, the easiest way to find your API URL is by running:

prefect config view

and the CLI output will be similar to:

PREFECT_PROFILE='your-profile'
PREFECT_API_KEY='your-api-key' (from profile)
PREFECT_API_URL='https://api.prefect.cloud/api/accounts/abc-123-456/workspaces/xyz-987-654' (from profile)

You’ll also need a connection string for your storage account, which you can obtain by running the following with your own resource group and storage account names:

az storage account show-connection-string -g YOUR_RESOURCE_GROUP -n YOUR_STORAGE_ACCOUNT_NAME -o tsv 

or by following these instructions.

Now that you know the three values you need to set for your Function App, run:

bash:

az functionapp config appsettings set \
--name MyFunctionApp \
--resource-group MyResourceGroup \
--settings "BlobConnectionString=YOUR_STORAGE_CONNECTION_STRING PREFECT_API_URL=YOUR_PREFECT_API_URL PREFECT_API_KEY=YOUR_PREFECT_API_KEY"

PowerShell:

az functionapp config appsettings set `
--name MyFunctionApp `
--resource-group MyResourceGroup `
--settings "BlobConnectionString=YOUR_STORAGE_CONNECTION_STRING PREFECT_API_URL=YOUR_PREFECT_API_URL PREFECT_API_KEY=YOUR_PREFECT_API_KEY"

Make sure you insert your own configuration values instead of the placeholders. If you are self-hosting Prefect Orion, you can omit the PREFECT_API_KEY setting.

In a production app , consider storing secrets like your API key and connection string in Azure Key Vault and then referencing Key Vault in your application settings instead of storing them directly in application settings.

After you have created your Function App and added your app settings, open a terminal in the base directory of the app (not the subdirectory containing the function) and run:

func azure functionapp publish PrefectTestFlow

If you used a different app name when creating the Function App, enter that instead of PrefectTestFlow.

You have have successfully published a Prefect flow to Azure Functions that will run in response to file additions or updates in Azure Blob Storage!

To test your flow, all you need to do is add a file to the blob container your function is watching. You can add a file manually via the Azure Portal UI, or via the CLI by running:

bash

az storage blob upload \
    --account-name YOUR_STORAGE_ACCOUNT_NAME \
    --container-name YOUR_BLOB_CONTAINER_NAME \
    --name your_file_name.ext \
    --file your_file_name.ext \
    --auth-mode login

PowerShell

az storage blob upload `
    --account-name YOUR_STORAGE_ACCOUNT_NAME `
    --container-name YOUR_BLOB_CONTAINER_NAME `
    --name your_file_name.ext `
    --file your_file_name.ext `
    --auth-mode login

Replace the placeholder account, container, and file names with your own and your file will quickly end up in the blob container your specified.

Then, your Azure Function will run in response to the blob upload.

If you check the Prefect UI, you should see that your flow runs successfully when adding files to a blob storage container:

And that’s it! You now know all the steps needed to run Azure Blob event-driven Prefect flows using Azure Functions!

3 Likes

At the stage of publishing your function code to the app, I’m getting a ‘Usage Error: --settings | --slot-settings’ when I run the PowerShell script. Any idea what this means?

There might be special characters in your settings that need to be escaped. Maybe + characters in the connection string? If so, you’d need to put a \ in front of them.

Or, you could try surrounding the settings with with ' instead of ".

It could be something else, but that is the first thing I would check.

1 Like

Yea I had to write the script as one line (i.e. az functionapp config appsettings set --name PrefectTestFlow --resource-group my_resource_group --settings…)

1 Like

Hi,

I followed your tutorial, but my azure function is unable to connect to prefect. I have an error message that looks like that

Result: Failure Exception: RuntimeError: Cannot create flow run. Failed to reach API at pnu_REDACTED/. Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 461, in _handle__invocation_request call_result = await self._run_async_func( 
File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 739, in _run_async_func return await ExtensionManager.get_async_invocation_wrapper( File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 147, in get_async_invocation_wrapper result = await function(**args) 
File "/home/site/wwwroot/PrefectBlobTrigger/__init__.py", line 9, in main await test_aci_flow(myblob.name) 
File "/home/site/wwwroot/.python_packages/lib/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread return call.result() 
File "/home/site/wwwroot/.python_packages/lib/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result return self.future.result(timeout=timeout) 
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result return self.__get_result() 
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result raise self._exception 
File "/home/site/wwwroot/.python_packages/lib/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async result = await coro 
File "/home/site/wwwroot/.python_packages/lib/site-packages/prefect/client/utilities.py", line 40, in with_injected_client return await fn(*args, **kwargs) 
File "/home/site/wwwroot/.python_packages/lib/site-packages/prefect/engine.py", line 223, in create_then_begin_flow_run await check_api_reachable(client, "Cannot create flow run") File "/home/site/wwwroot/.python_packages/lib/site-packages/prefect/engine.py", line 2136, in check_api_reachable raise RuntimeError( 

Do you have any idea where that would come from ?