Hi,
I have created a task and inside a flow & using the map function.(based on this Tasks - Prefect Docs)
-
I would like to change my Task name dynamically based on the input i give to my Task.
Here I gave dict as an input to the task & i want to use a key from the dict to update its
task_run_name
.
how can i achieve this ? -
Also, I have 1000’s of dicts and I if i run this, all the 1000 dict run as separate task, which is difficult to manage & the visualization in UI will not be appealing. So Can we do it batch based on the resource available in my machine, so that i can see the first few batches running in UI and after one job finished take the next task and so on …?
Please note that, I don’t want to change my below code format, since it breaks many things.
Code example
class Pipeline:
def __init__(self):
pass
def run(self, doc):
# do something with doc
# & update the task name from doc
task_run_name = doc['id']
pass
class PrefectRunner:
def __init__(self, pipeline: Pipeline):
self.pipeline = pipeline
def run(self):
# contains list of dict
dict_docs = [
{"doc": "doc1", "id": 1},
{"doc": "doc2", "id": 2},
{"doc": "doc3", "id": 3},
]
task_run = task(self.create_task_run_from_flow)
task_run.map(dict_docs)
def create_task_run_from_flow(self, doc):
return self.pipeline.run(doc)