View in #prefect-community on Slack
Problem
@Nick_Hart: I’m looking to run create_flow_runs using threading for a custom module I’m writing and for some reason I’m getting an attribute error. Below is my test code and the error. Would you know how to fix this? Also, I’m assuming this is not a prefect problem and more of a threading problem, but I was hoping someone would be able to help! Thanks in advance
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname):
print("Running thread for: ",flowname)
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(target = thread_flows, args=(flowname,))
threads.append(x)
x.start()
for thread in threads:
thread.join()
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
flow_id = create_flow_run.run(flow_name=flowname)
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
logger = prefect.context.logger
AttributeError: 'Context' object has no attribute 'logger'
@Michael_Adkins: Hey Nick, create_flow_run
is intended to be called from a Flow
@Nick_Hart: I’ve also tried this code but I get a new error:
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname):
print("Running thread for: ",flowname)
with prefect.context(logger = prefect.context.get("logger")):
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(target = thread_flows, args=(flowname,))
threads.append(x)
x.start()
for thread in threads:
thread.join()
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 8, in thread_flows
flow_id = create_flow_run.run(flow_name=flowname)
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 124, in create_flow_run
logger.debug("Looking up flow metadata...")
AttributeError: 'NoneType' object has no attribute 'debug'
@Michael_Adkins: I see you’re using .run
to call the task outside a flow. I think the issue here is that the context does not pass data across threads and the logger is only present in the main thread.
You’d have to send the logger as an argument to your thread and retrieve it in your main block not in the thread target function.
@Nick_Hart: @Michael_Adkins I usually like to call create_flow_run within a flow but for our module we do not want it to be a flow and how would I go about sending the logger as an argument to my thread?
Solution
@Michael_Adkins:
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname, logger):
print("Running thread for: ", flowname)
with prefect.context(logger=logger):
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True) #
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(
target=thread_flows, args=(flowname, prefect.context.logger)
)
threads.append(x)
x.start()
for thread in threads:
thread.join()
@Anna_Geller: this may also be helpful
@Nick_Hart: Oh wow! that worked I didn’t even think of passing it as a variable Thanks @Michael_Adkins and thank you @Anna_Geller for the other example!