View in #prefect-community on Slack
@Jake: We have a parameter that gets passed to some of our tasks (like which DB endpoint to point to); when this value changes, re-registration won’t happen (since it doesn’t count as any of the metadata) but how can I make it so that this change does trigger a re-registration?
@Anna_Geller: Can you share more about this YAML file process? not sure how this works
I’d be curious to see how you pass the parameter values for the flow run - are you attaching parameters to your clock and schedule?
@Jake: we have a function:
def get_flow(general_configs):
with Flow(...) as flow:
some_task(general_configs)
return flow
where general_config is an object that is generate from the yml
We call this get_flow function when trying to register
@Anna_Geller: and the YAML file? so far, this function doesn’t tell me anything about how parameters are passed to this so that we could force registration upon change to those parameter values
and perhaps do you mean something else when you say “parameters”? When I hear Parameter, I’m thinking of Prefect parameter task
Thinking Prefectly | Prefect Docs
> We call this get_flow function when trying to register
Can you share how you do it? This would be important to see before we can make any recommendations about the registration process
@Jake: The yaml file is not really relevant; it is read before we start building the flow; the flow’s tasks only receives values read from the file. When I say parameter I mean like:
@task
def add(x, y=1):
return x + y
so x, and y would be parameters (sorry if this is the wrong term to be using).
@Anna_Geller: and really if you do:
flow = get_flow("/path/to/config.yaml")
flow.register("project_name")
then by default this will trigger a flow registration and will bump up the flow version - always, any time you run it, regardless if something in the yaml changed or not
@Jake: In the function that is using get_flows, we then do:
flow.register(
project_name=prefect_config["project_name"],
labels=prefect_config.get("labels"),
idempotency_key=flow.serialized_hash(),
)
@Anna_Geller: throw out this argument and you’re done
idempotency_key=flow.serialized_hash(),
Doing this instead will always register the flow and bump up the flow version:
flow.register(
project_name=prefect_config["project_name"],
labels=prefect_config.get("labels"),
)
@Jake: ideally I do not want it to re-register every time (and bump the version). Is this possible?
@Anna_Geller: ok, sorry I now get it. You need to compute the hash of your config dictionary instead of your flow and pass it as idempotency_key
. This way if anything in your config has changed since the last registration, this will increment the flow version
is the general_configs
just a path or dictionary? if it’s a dict, you could do:
import json
import hashlib
flow_idempotency_key = hashlib.sha256(
json.dumps(general_configs, sort_keys=True).encode()
).hexdigest()
flow.register( project_name=prefect_config["project_name"],
labels=prefect_config.get("labels"),
idempotency_key=flow_idempotency_key,
)
@Jake: that could work!! basically make that dict into part of the hash right?
@Anna_Geller: correct! instead of using the hash of your serialized flow as an idempotency key, you are using the hash of your config dict. And you can build the idempotency key as you wish - you could even combine it with other flow metadata you care about and build a single dictionary of all arguments that you consider relevant as part of a flow version
btw in Orion, versioning will get even easier since you will be able to provide a custom flow version as you wish, e.g.
@flow(name="versioned_flow", version=os.getenv("GIT_COMMIT_SHA"))