How to set a list as parameter values in DeploymentSpec so that it can be used for parallel workflows?

My goal is to process forecasts for each station id separately, but I think it just passed the entire list:
AttributeError: 'list' object has no attribute 'upper'

How do I process the forecast for each station id separately (perhaps in parallel)?

STATION_IDS = ["KCMI", "KBDU", "KSEA"]
DeploymentSpec(
    flow=process_forecast,
    name="process-forecast-hourly",
    parameters={"stid": STATION_IDS},
    tags=["nws", "forecast"],
    schedule=IntervalSchedule(interval=timedelta(hours=1)),
)

Could you share your process_forecast flow? It could be that you didn’t provide type annotations to your flow parameters and Prefect assumed by default that this is a string.

Sure, see below (and here for full code forecast_verification_flow/store_nws.py at main · ahuang11/forecast_verification_flow · GitHub). I am trying to map each item of the parameter list to process_forecast:

e.g.

for stid in STATION_IDS:
    process_forecast(stid)

But right now it’s doing:

process_forecast(STATION_IDS)

Code:

@flow
def process_forecast(stid: str):
    stid = stid.upper()

    meta_df = retrieve_meta()
    lon, lat = get_station_coords(meta_df, stid).result()

    data = retrieve_data(lon, lat)
    root = get_root(data)
    initialization_time = extract_initialization_time(root)
    exists = check_if_exists(initialization_time)
    if not exists:
        mapping = extract_params(root)
        df = create_df(mapping, initialization_time)
        to_database(stid, df)

As a workaround, I can replace @flow with @task on process_forecast, and add a new function flow named process_forecasts to do:

@flow
def process_forecasts(stids: List):
    for stid in stids:
        process_forecast(stid)
1 Like

That’s exactly how I would approach it - given that we don’t have a mapping feature in Prefect 2 yet, the for loop executed within the @flow will have the same effect as mapping.

Correction: process_forecast cannot be a @task because RuntimeError: Tasks cannot be called from within tasks. Did you mean to call this task in a flow? So I have to just leave process_forecast as a plain python function.

Also downside to this is that I have no idea which task corresponds to which station id:

You don’t have to make it to a task, you can just move the entire logic into the flow, basically moving the for loop and other functionality directly to the flow.

Remember in Orion you can run any normal Python code so the sky is the limit here - you can design it as you wish. :smile:

1 Like

btw, nice flow! It worked fine for me but the SQLite file nws_forecast.db was not generated - did I do something wrong?

import sqlite3
import xml.etree.cElementTree as et
from collections import defaultdict
from io import StringIO
from datetime import timedelta
from typing import List
import pandas as pd
import requests
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.orion.schemas.schedules import IntervalSchedule
from prefect.deployments import DeploymentSpec


META_URL = (
    "https://mesonet.agron.iastate.edu/sites/networks.php?"
    "network=_ALL_&format=csv&nohtml=on"
)
DATA_URL_FMT = (
    "https://forecast.weather.gov/MapClick.php?lat={lat}&lon={lon}&FcstType=digitalDWML"
)
TEMPORAL_COLS = ("start_valid_time", "initialization_time", "forecast_hour")
DATABASE_NAME = "nws_forecast.db"
STATION_IDS = ("KSEA", "KBDU", "KORD", "KCMI", "KMRY", "KSAN", "KNYC", "KIND", "KHOU")


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=365))
def retrieve_meta():
    meta_df = pd.read_csv(META_URL)
    return meta_df


@task(cache_key_fn=task_input_hash)
def get_station_coords(meta_df, stid):
    lon, lat = meta_df.loc[meta_df["stid"] == stid, ["lon", "lat"]].iloc[0]
    return lon, lat


@task
def retrieve_data(lon, lat):
    url = DATA_URL_FMT.format(lon=lon, lat=lat)
    with requests.get(url) as resp:
        data = resp.content.decode()
    return data


@task
def get_root(data):
    with StringIO(data) as buf:
        tree = et.parse(buf)
    root = tree.getroot()
    return root


@task
def extract_initialization_time(root):
    for head in root.findall("head"):
        for product in head.findall("product"):
            for creation_date in product.findall("creation-date"):
                initialization_time = [creation_date.text]
                return initialization_time


@task
def check_if_exists(initialization_time):
    return False


@task
def extract_params(root):
    mapping = defaultdict(lambda: [])
    for data in root.findall("data"):
        for time_layout in data.findall("time-layout"):
            for start_valid_time in time_layout.findall("start-valid-time"):
                mapping["start_valid_time"].append(start_valid_time.text)

        for parameters in data.findall("parameters"):
            for parameter in parameters:
                tag = parameter.tag.replace("-", "_")
                type_ = parameter.attrib.get("type")

                for value in parameter.findall("value"):
                    if type_ is None:
                        break
                    mapping[f"{tag}_{type_}"].append(value.text)

                for weather_conditions in parameter.findall("weather-conditions"):
                    if weather_conditions.attrib:  # is nil
                        mapping["weather"].append("")
                    else:
                        for i, value in enumerate(weather_conditions.findall("value")):
                            text = " ".join(value.attrib.values())
                            if i == 0:
                                mapping["weather"].append(text)
                            else:
                                mapping["weather"][-1] += f" and {text}"
    return mapping


@task
def create_df(mapping, initialization_time):
    df = (
        pd.DataFrame(mapping)
        .pipe(
            lambda df: df.assign(
                **{
                    "initialization_time": pd.to_datetime(initialization_time),
                    "start_valid_time": pd.to_datetime(df["start_valid_time"]),
                }
            )
        )
        .pipe(
            lambda df: df.assign(
                **{
                    "forecast_hour": (
                        df["initialization_time"] - df["start_valid_time"]
                    ).total_seconds()
                    / 3600
                }
            )
        )
        .set_index(["start_valid_time", "initialization_time"])
        .apply(pd.to_numeric, errors="ignore")
        .reset_index()
    )
    return df


@task
def to_database(stid, df):
    with sqlite3.connect(DATABASE_NAME) as con:
        df.to_sql(stid, con, index=False, if_exists="append")
        for col in TEMPORAL_COLS:
            con.execute(f"CREATE INDEX {col}_index ON {stid}({col});")


@flow
def process_forecasts(stids: List[str]):
    for stdid in stids:
        stid = stdid.upper()

        meta_df = retrieve_meta()
        lon, lat = get_station_coords(meta_df, stid).result()

        data = retrieve_data(lon, lat)
        root = get_root(data)
        initialization_time = extract_initialization_time(root)
        exists = check_if_exists(initialization_time)
        if not exists:
            mapping = extract_params(root)
            df = create_df(mapping, initialization_time)
            to_database(stid, df)


if __name__ == "__main__":
    process_forecasts(stids=STATION_IDS)

DeploymentSpec(
    flow=process_forecasts,
    name="process-forecast-hourly",
    parameters={"stid": STATION_IDS},
    tags=["nws", "forecast"],
    schedule=IntervalSchedule(interval=timedelta(hours=1)),
)
1 Like

Another idea is to create a flow of subflows.

And about the db not being generated, I’m wondering about that too. Is it because it’s generated in the temporary storage of Orion so it’s wiped right after?

I’m also still trying to figure out how to rename flow runs prefixes; I was able to rename flows, but not flow run names.

    for stid in stids:
        process_forecast.with_options(name=f"{stid}_flow")(stid)

Renaming flow runs will work differently. In Prefect 1 we had a special task for it called RenameFlowRun:

subflow process_forecast will work but you would need to call it without options:

    for stid in stids:
        process_forecast(stid)

I think I can call it with_options

from prefect import flow

@flow()
def compute(x):
    return x + 1


@flow
def a_flow():
    compute.with_options(name="compute_flowo")(1)


a_flow()

But the logging is confusing

15:21:59.129 | INFO    | prefect.engine - Created flow run 'stylish-guppy' for flow 'a-flow'
15:21:59.129 | INFO    | Flow run 'stylish-guppy' - Using task runner 'ConcurrentTaskRunner'
15:21:59.203 | INFO    | Flow run 'stylish-guppy' - Created subflow run 'puzzling-panda' for flow 'compute_flowo'
15:21:59.243 | INFO    | Flow run 'puzzling-panda' - Finished in state Completed(None)
15:21:59.282 | INFO    | Flow run 'stylish-guppy' - Finished in state Completed('All states completed.')
Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=2, task_run_id=f62b14e9-90e4-4d80-9f3d-e7d7a39164ea)], flow_run_id=95381dd0-e745-4a2c-a615-e39dcc0c061b)

I was hoping the flow run name

15:21:59.203 | INFO    | Flow run 'stylish-guppy' - Created subflow run 'puzzling-panda' for flow 'compute_flowo'

Would be

15:21:59.203 | INFO    | Flow run 'compute_flowo_run01' - Created subflow run 'compute_flowo_run01' for flow 'compute_flowo'

You brought up a really good point - the flow name and flow run name are two different things: while you can rename the flow - that’s what you did, you can’t rename a flow run this way because flow runs are by default just random adjective + animal name combinations :smile: We had a strong debate about it and I totally voted for the option you mentioned that the run name should have the actual flow name + some random string, but most people voted to keep the animals because it’s simply funny :slight_smile: and our users love this

Aw that makes the log a bit confusing to follow, especially in concurrent flows because now users have to make a mental map between flow name → flow run name and look for flow run name :no_mouth:

1 Like

Woohooo got it deployed successfully (updated forecast_verification_flow/store_nws.py at main · ahuang11/forecast_verification_flow · GitHub)!

About “the SQLite file nws_forecast.db was not generated”, that’s because of my misconception of Do I need to call "result()" on a task call returning a bool

To get this to run successfully, in one terminal:

prefect orion start 

In another terminal:

prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
prefect work-queue create process-forecasts-work-queue
prefect agent start <UUID>

In yet another terminal:

prefect deployment create store_nws.py
prefect deployment run process-forecasts/hourly-deployment

Now visit http://127.0.0.1:4200/ to view sweet success!

I am wondering whether I can see when the flow runs are exactly scheduled though?

And I do wish the log names had the corresponding station IDs!

Nice work! :clap:

Perhaps you can use the log formatted to configure that? I haven’t tried that yet but it may be possible:

PREFECT_LOGGING_FORMATTERS_TASK_RUNS_FORMAT="%(asctime)s.%(msecs)03d | %(levelname)-7s | Task run %(task_run_name)r - %(message)s"

I think e.g. if you include the station ID in the task_run_name set using the with_options command, then it should be in your logs.

But I wouldn’t e.g. override log settings for every flow, it would be rather tedious - thoughts? :smile:

This blog post explains it better:
https://www.prefect.io/blog/logs-the-prefect-way/