View in #prefect-community on Slack
@Bob_Colner: So I’m migrating some 1.0 flows to 2.0. loving the new @flow API -loops and native control flow are very nice to have. I’m having a strange task PicklingError error with the dask task runner. The same task works on 1.0 and (this is strange) was working on 2.0 before I tried to setup a deployment. Any ideas on how to debug this task? (see thread)
error message:
Traceback (most recent call last):
File "/opt/miniconda3/envs/orion/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 40, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function asa_get_campaigns at 0x1087c4160>: it's not the same object as __main__.asa_get_campaigns
task def:
@task(
retries=3,
retry_delay_seconds=60,
tags=['asa_api'],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
logger = get_run_logger()
logger.infof'fetching campaigns for: org: {org_id}, date: {date}')
return asa_api.get_campaigns(org_id, date, access_token)
asa.get_campaigns
def:
def get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
requestBody = {
'startTime': date,
'endTime': date,
'selector': {
'orderBy': [{'field': 'campaignId', 'sortOrder': 'ASCENDING'}],
'conditions': [{'field': 'campaignStatus', 'operator': 'EQUALS', 'values': ['ENABLED']}]
},
'returnRowTotals': 'true'
}
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}', 'X-AP-Context': f'orgId={org_id}'}
response = <http://requests.post|requests.post>(CAMPAIGNS_URL, data=json.dumps(requestBody, ensure_ascii=False), headers=headers)
jsonResponse = response.json()
if jsonResponse["data"] == None or jsonResponse['pagination']['totalResults'] == 0:
return pd.DataFrame()
rows = jsonResponse['data']['reportingDataResponse']['row']
format_row = lambda row : {
'spend_date': date,
'org_id': org_id,
'app_name': row['metadata']['app']['appName'],
'campaign_id': row['metadata']['campaignId'],
'campaign_name': row['metadata']['campaignName'],
'modification_at': row['metadata']['modificationTime'],
'inserted_at': datetime.utcnow().isoformat(),
'spend_amount': float(row['total']['localSpend']['amount']),
'impressions': row['total']['impressions'],
'taps': row['total']['taps'],
'installs': row['total']['installs'],
'country': row['metadata']['countriesOrRegions'][0],
'json_payload': json.dumps(row)
}
formatted_rows = [*map(format_row, rows)]
df = pd.DataFrame(formatted_rows).convert_dtypes()
df['spend_amount'] = df['spend_amount'].astype(float)
return df
my task does not have any client objects that I am aware of. Just makes an HTTP post
@Michael_Adkins: Hm, I’ve seen this before. Usually when you’re lacking the if __name__ == "__main__"
guard before calling the flow
@Bob_Colner: Interesting, I have the main guard in place in the flow file. Any other debug ideas?
@Michael_Adkins: Does it happen with a minimal example?
@Bob_Colner: It was working for me. I’ll try to revert to some earlier git commit. Issue started when I tried to create a local deployment for the first time. Unless I somehow introduced a subtle bug at the same time
@Michael_Adkins: How did you define your deployment? Is the error happening when you just call the flow or does it happen when the deployment runs?
@Bob_Colner: Local flow, default universal flow runner, gcp storage. FIY changing flow runners is not well documented IMO
I get the same pickle issue when running the flow file or ‘executing’ the deployment
@Michael_Adkins: We’re designing some changes to flow runners atm so we’re hesitant to sink much time into the existing interface.
Can you share the full flow file?
@Bob_Colner: Makes sense, yeah I’ll share the full flow file tomorrow when I’m in front of my computer. Thanks for your help
here is my full task/flow .py file
import time
import pandas as pd
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner, ConcurrentTaskRunner, DaskTaskRunner
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_insert_stream
from apple_search_ads import asa_api
@task(
retries=3,
retry_delay_seconds=60,
tags=['asa_api'],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
logger = get_run_logger()
logger.info(f'fetching campaigns for: org: {org_id}, date: {date}')
return asa_api.get_campaigns(org_id, date, access_token)
@task(
retries=3,
retry_delay_seconds=60,
tags=['asa_api'],
)
def asa_get_adgroups(campaign_id: str, org_id: str, date: str, access_token: str) -> pd.DataFrame:
logger = get_run_logger()
logger.info(f'fetching adgroups for: org: {org_id}, campaign: {campaign_id}, date: {date}')
return asa_api.get_adgroups(campaign_id, org_id, date, access_token)
@task(
retries=3,
retry_delay_seconds=60,
tags=['asa_api'],
)
def asa_get_keywords(campaign_id: str, org_id: str, date: str, access_token: str) -> pd.DataFrame:
logger = get_run_logger()
logger.info(f'fetching keywords for: {org_id}, campaign: {campaign_id}, date: {date}')
return asa_api.get_keywords(campaign_id, org_id, date, access_token)
dask_local = DaskTaskRunner(
cluster_kwargs={
'processes': True,
'n_workers': 4,
'threads_per_worker': 2,
'dashboard_address': None,
}
)
@flow(task_runner=dask_local)
def fetch_asa_data(start_date: str, end_date: str, orgs: list[str]=['117960', '2657930', '3737470']) -> str:
start_at = time.time()
logger = get_run_logger()
asa_token = asa_api.get_access_token()
gcp_credentials = GcpCredentials(project="emerald-skill-201716")
date_range = [d.date().isoformat() for d in pd.date_range(start=start_date, end=end_date)]
for date in date_range:
for org_id in orgs:
dfc_future = asa_get_campaigns(org_id, date, asa_token)
dfc = dfc_future.result()
if (dfc is None or len(dfc) == 0):
logger.info(f'No campaign data for org: {org_id}, date: {date}')
continue
rows_c = dfc.to_dict('records')
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="campaigns_stream",
records=rows_c,
gcp_credentials=gcp_credentials,
)
for row in dfc.itertuples():
dfa_future = asa_get_adgroups(row.campaign_id, org_id, date, asa_token)
dfa = dfa_future.result()
if (dfa is None or len(dfa) == 0):
logger.info(f'No adgroup data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}')
continue
rows_a = dfa.to_dict('records')
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="adgroups_stream",
records=rows_a,
gcp_credentials=gcp_credentials
)
dfk_future = asa_get_keywords(row.campaign_id, org_id, date, asa_token)
dfk = dfk_future.result()
if (dfk is None or len(dfk) == 0):
logger.info(f'No keyword data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}')
continue
rows_k = dfk.to_dict('records')
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="keywords_stream",
records=rows_k,
gcp_credentials=gcp_credentials
)
end_at = time.time()
logger.info(f'flow run time (mins): {(end_at-start_at)/60}')
return True
if __name__ == "__main__":
fetch_asa_data(start_date='2022-05-06', end_date='2022-05-06')
@Michael_Adkins: Hm. If I throw some mocks in there for the external items this runs fine
import time
from prefect import task, flow, get_run_logger
from prefect.task_runners import (
SequentialTaskRunner,
ConcurrentTaskRunner,
DaskTaskRunner,
)
from typing import List
from unittest.mock import MagicMock
pd = MagicMock()
GcpCredentials = MagicMock()
bigquery_insert_stream = MagicMock()
asa_api = MagicMock()
@task(
retries=3,
retry_delay_seconds=60,
tags=["asa_api"],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
logger = get_run_logger()
logger.infof"fetching campaigns for: org: {org_id}, date: {date}")
return asa_api.get_campaigns(org_id, date, access_token)
@task(
retries=3,
retry_delay_seconds=60,
tags=["asa_api"],
)
def asa_get_adgroups(
campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
logger = get_run_logger()
logger.info
f"fetching adgroups for: org: {org_id}, campaign: {campaign_id}, date: {date}"
)
return asa_api.get_adgroups(campaign_id, org_id, date, access_token)
@task(
retries=3,
retry_delay_seconds=60,
tags=["asa_api"],
)
def asa_get_keywords(
campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
logger = get_run_logger()
logger.info
f"fetching keywords for: {org_id}, campaign: {campaign_id}, date: {date}"
)
return asa_api.get_keywords(campaign_id, org_id, date, access_token)
dask_local = DaskTaskRunner(
cluster_kwargs={
"processes": True,
"n_workers": 4,
"threads_per_worker": 2,
"dashboard_address": None,
}
)
@flow(task_runner=dask_local)
def fetch_asa_data(
start_date: str, end_date: str, orgs: List[str] = ["117960", "2657930", "3737470"]
) -> str:
start_at = time.time()
logger = get_run_logger()
asa_token = asa_api.get_access_token()
gcp_credentials = GcpCredentials(project="emerald-skill-201716")
date_range = [
d.date().isoformat() for d in pd.date_range(start=start_date, end=end_date)
]
for date in date_range:
for org_id in orgs:
dfc_future = asa_get_campaigns(org_id, date, asa_token)
dfc = dfc_future.result()
if dfc is None or len(dfc) == 0:
logger.infof"No campaign data for org: {org_id}, date: {date}")
continue
rows_c = dfc.to_dict("records")
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="campaigns_stream",
records=rows_c,
gcp_credentials=gcp_credentials,
)
for row in dfc.itertuples():
dfa_future = asa_get_adgroups(row.campaign_id, org_id, date, asa_token)
dfa = dfa_future.result()
if dfa is None or len(dfa) == 0:
logger.info
f"No adgroup data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
)
continue
rows_a = dfa.to_dict("records")
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="adgroups_stream",
records=rows_a,
gcp_credentials=gcp_credentials,
)
dfk_future = asa_get_keywords(row.campaign_id, org_id, date, asa_token)
dfk = dfk_future.result()
if dfk is None or len(dfk) == 0:
logger.info
f"No keyword data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
)
continue
rows_k = dfk.to_dict("records")
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="keywords_stream",
records=rows_k,
gcp_credentials=gcp_credentials,
)
end_at = time.time()
logger.infof"flow run time (mins): {(end_at-start_at)/60}")
return True
if __name__ == "__main__":
fetch_asa_data(start_date="2022-05-06", end_date="2022-05-06")
@Bob_Colner: Interesting, so something in my module must not be pickle friendly ? I’m using the same module in 1.0 and it works with dask. In general debugging the stuff is a huge mystery to me
@Michael_Adkins: The error you’re getting doesn’t entirely suggest that, but I’m not sure what’s happening.