PicklingError error when using a DaskTaskRunner

View in #prefect-community on Slack

Bob_Colner @Bob_Colner: So I’m migrating some 1.0 flows to 2.0. loving the new @flow API -loops and native control flow are very nice to have. I’m having a strange task PicklingError error with the dask task runner. The same task works on 1.0 and (this is strange) was working on 2.0 before I tried to setup a deployment. Any ideas on how to debug this task? (see thread)
error message:

Traceback (most recent call last):
  File "/opt/miniconda3/envs/orion/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 40, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function asa_get_campaigns at 0x1087c4160>: it's not the same object as __main__.asa_get_campaigns

task def:

@task(
    retries=3,
    retry_delay_seconds=60,
    tags=['asa_api'],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.infof'fetching campaigns for: org: {org_id}, date: {date}')
    return asa_api.get_campaigns(org_id, date, access_token)

asa.get_campaigns def:

def get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
    requestBody = {
        'startTime': date,
        'endTime': date,
        'selector': {
            'orderBy': [{'field': 'campaignId', 'sortOrder': 'ASCENDING'}],
            'conditions': [{'field': 'campaignStatus', 'operator': 'EQUALS', 'values': ['ENABLED']}]
        },
        'returnRowTotals': 'true'
    }
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}', 'X-AP-Context': f'orgId={org_id}'}
    response = <http://requests.post|requests.post>(CAMPAIGNS_URL, data=json.dumps(requestBody, ensure_ascii=False), headers=headers)

    jsonResponse = response.json()
    if jsonResponse["data"] == None or jsonResponse['pagination']['totalResults'] == 0:
        return pd.DataFrame()

    rows = jsonResponse['data']['reportingDataResponse']['row']
    format_row = lambda row : {
        'spend_date': date, 
        'org_id': org_id,
        'app_name': row['metadata']['app']['appName'],
        'campaign_id': row['metadata']['campaignId'],
        'campaign_name': row['metadata']['campaignName'],
        'modification_at': row['metadata']['modificationTime'],
        'inserted_at': datetime.utcnow().isoformat(),
        'spend_amount': float(row['total']['localSpend']['amount']),
        'impressions': row['total']['impressions'],
        'taps': row['total']['taps'],
        'installs': row['total']['installs'],
        'country': row['metadata']['countriesOrRegions'][0],
        'json_payload': json.dumps(row)
    }
    formatted_rows = [*map(format_row, rows)]
    df = pd.DataFrame(formatted_rows).convert_dtypes()
    df['spend_amount'] = df['spend_amount'].astype(float)
    return df

my task does not have any client objects that I am aware of. Just makes an HTTP post

Michael_Adkins @Michael_Adkins: Hm, I’ve seen this before. Usually when you’re lacking the if __name__ == "__main__" guard before calling the flow

Bob_Colner @Bob_Colner: Interesting, I have the main guard in place in the flow file. Any other debug ideas?

Michael_Adkins @Michael_Adkins: Does it happen with a minimal example?

Bob_Colner @Bob_Colner: It was working for me. I’ll try to revert to some earlier git commit. Issue started when I tried to create a local deployment for the first time. Unless I somehow introduced a subtle bug at the same time

Michael_Adkins @Michael_Adkins: How did you define your deployment? Is the error happening when you just call the flow or does it happen when the deployment runs?

Bob_Colner @Bob_Colner: Local flow, default universal flow runner, gcp storage. FIY changing flow runners is not well documented IMO
I get the same pickle issue when running the flow file or ‘executing’ the deployment

Michael_Adkins @Michael_Adkins: We’re designing some changes to flow runners atm so we’re hesitant to sink much time into the existing interface.
Can you share the full flow file?

Bob_Colner @Bob_Colner: Makes sense, yeah I’ll share the full flow file tomorrow when I’m in front of my computer. Thanks for your help
here is my full task/flow .py file

import time
import pandas as pd
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner, ConcurrentTaskRunner, DaskTaskRunner
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_insert_stream
from apple_search_ads import asa_api
​
​
@task(
    retries=3,
    retry_delay_seconds=60,
    tags=['asa_api'],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f'fetching campaigns for: org: {org_id}, date: {date}')
    return asa_api.get_campaigns(org_id, date, access_token)
​
​
@task(
    retries=3,
    retry_delay_seconds=60,
    tags=['asa_api'],
)
def asa_get_adgroups(campaign_id: str, org_id: str, date: str, access_token: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f'fetching adgroups for: org: {org_id}, campaign: {campaign_id}, date: {date}')
    return asa_api.get_adgroups(campaign_id, org_id, date, access_token)
​
​
@task(
    retries=3,
    retry_delay_seconds=60,
    tags=['asa_api'],
)
def asa_get_keywords(campaign_id: str, org_id: str, date: str, access_token: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f'fetching keywords for: {org_id}, campaign: {campaign_id}, date: {date}')
    return asa_api.get_keywords(campaign_id, org_id, date, access_token)
​
​
dask_local = DaskTaskRunner(
    cluster_kwargs={
        'processes': True,
        'n_workers': 4,
        'threads_per_worker': 2,
        'dashboard_address': None,
    }
)
​
@flow(task_runner=dask_local)
def fetch_asa_data(start_date: str, end_date: str, orgs: list[str]=['117960', '2657930', '3737470']) -> str:
    start_at = time.time()
    logger = get_run_logger()
    asa_token = asa_api.get_access_token()
    gcp_credentials = GcpCredentials(project="emerald-skill-201716")
    date_range = [d.date().isoformat() for d in pd.date_range(start=start_date, end=end_date)]
    for date in date_range:
        for org_id in orgs:
            dfc_future = asa_get_campaigns(org_id, date, asa_token)
            dfc = dfc_future.result()
            if (dfc is None or len(dfc) == 0):
                logger.info(f'No campaign data for org: {org_id}, date: {date}')
                continue
​
            rows_c = dfc.to_dict('records')
            result = bigquery_insert_stream(
                dataset="apple_search_ads",
                table="campaigns_stream",
                records=rows_c,
                gcp_credentials=gcp_credentials,
            )
            for row in dfc.itertuples():
                dfa_future = asa_get_adgroups(row.campaign_id, org_id, date, asa_token)
                dfa = dfa_future.result()
                if (dfa is None or len(dfa) == 0):
                    logger.info(f'No adgroup data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}')
                    continue
​
                rows_a = dfa.to_dict('records')
                result = bigquery_insert_stream(
                    dataset="apple_search_ads",
                    table="adgroups_stream",
                    records=rows_a,
                    gcp_credentials=gcp_credentials
                )
                dfk_future = asa_get_keywords(row.campaign_id, org_id, date, asa_token)
                dfk = dfk_future.result()
                if (dfk is None or len(dfk) == 0):
                    logger.info(f'No keyword data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}')
                    continue
​
                rows_k = dfk.to_dict('records')
                result = bigquery_insert_stream(
                    dataset="apple_search_ads",
                    table="keywords_stream",
                    records=rows_k,
                    gcp_credentials=gcp_credentials
                )
​
    end_at = time.time()
    logger.info(f'flow run time (mins): {(end_at-start_at)/60}')
    return True
​
​
if __name__ == "__main__":
    fetch_asa_data(start_date='2022-05-06', end_date='2022-05-06')

Michael_Adkins @Michael_Adkins: Hm. If I throw some mocks in there for the external items this runs fine

import time
from prefect import task, flow, get_run_logger
from prefect.task_runners import (
    SequentialTaskRunner,
    ConcurrentTaskRunner,
    DaskTaskRunner,
)

from typing import List
from unittest.mock import MagicMock

pd = MagicMock()
GcpCredentials = MagicMock()
bigquery_insert_stream = MagicMock()
asa_api = MagicMock()


@task(
    retries=3,
    retry_delay_seconds=60,
    tags=["asa_api"],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.infof"fetching campaigns for: org: {org_id}, date: {date}")
    return asa_api.get_campaigns(org_id, date, access_token)


@task(
    retries=3,
    retry_delay_seconds=60,
    tags=["asa_api"],
)
def asa_get_adgroups(
    campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info
        f"fetching adgroups for: org: {org_id}, campaign: {campaign_id}, date: {date}"
    )
    return asa_api.get_adgroups(campaign_id, org_id, date, access_token)


@task(
    retries=3,
    retry_delay_seconds=60,
    tags=["asa_api"],
)
def asa_get_keywords(
    campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info
        f"fetching keywords for: {org_id}, campaign: {campaign_id}, date: {date}"
    )
    return asa_api.get_keywords(campaign_id, org_id, date, access_token)


dask_local = DaskTaskRunner(
    cluster_kwargs={
        "processes": True,
        "n_workers": 4,
        "threads_per_worker": 2,
        "dashboard_address": None,
    }
)


@flow(task_runner=dask_local)
def fetch_asa_data(
    start_date: str, end_date: str, orgs: List[str] = ["117960", "2657930", "3737470"]
) -> str:
    start_at = time.time()
    logger = get_run_logger()
    asa_token = asa_api.get_access_token()
    gcp_credentials = GcpCredentials(project="emerald-skill-201716")
    date_range = [
        d.date().isoformat() for d in pd.date_range(start=start_date, end=end_date)
    ]
    for date in date_range:
        for org_id in orgs:
            dfc_future = asa_get_campaigns(org_id, date, asa_token)
            dfc = dfc_future.result()
            if dfc is None or len(dfc) == 0:
                logger.infof"No campaign data for org: {org_id}, date: {date}")
                continue

            rows_c = dfc.to_dict("records")
            result = bigquery_insert_stream(
                dataset="apple_search_ads",
                table="campaigns_stream",
                records=rows_c,
                gcp_credentials=gcp_credentials,
            )
            for row in dfc.itertuples():
                dfa_future = asa_get_adgroups(row.campaign_id, org_id, date, asa_token)
                dfa = dfa_future.result()
                if dfa is None or len(dfa) == 0:
                    logger.info
                        f"No adgroup data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
                    )
                    continue

                rows_a = dfa.to_dict("records")
                result = bigquery_insert_stream(
                    dataset="apple_search_ads",
                    table="adgroups_stream",
                    records=rows_a,
                    gcp_credentials=gcp_credentials,
                )
                dfk_future = asa_get_keywords(row.campaign_id, org_id, date, asa_token)
                dfk = dfk_future.result()
                if dfk is None or len(dfk) == 0:
                    logger.info
                        f"No keyword data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
                    )
                    continue

                rows_k = dfk.to_dict("records")
                result = bigquery_insert_stream(
                    dataset="apple_search_ads",
                    table="keywords_stream",
                    records=rows_k,
                    gcp_credentials=gcp_credentials,
                )

    end_at = time.time()
    logger.infof"flow run time (mins): {(end_at-start_at)/60}")
    return True


if __name__ == "__main__":
    fetch_asa_data(start_date="2022-05-06", end_date="2022-05-06")

Bob_Colner @Bob_Colner: Interesting, so something in my module must not be pickle friendly ? I’m using the same module in 1.0 and it works with dask. In general debugging the stuff is a huge mystery to me

Michael_Adkins @Michael_Adkins: The error you’re getting doesn’t entirely suggest that, but I’m not sure what’s happening.

:point_right: Note to anyone using DaskTaskRunner or RayTaskRunner:

from prefect version 2.0b8 onwards, those task runners were moved to the respective Prefect Collections for better code dependency management (the core library no longer requires dask or ray as dependencies - now, those can be installed sepataely when needed).

The correct imports are now:

from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner