View in #prefect-server on Slack
Problem
@Shuchita_Tripathi: to run a flow frrom any code, we need to know the flow_id. how can we get the flow id programmatically?
Solution
@Kevin_Kho: prefect.context.get("flow_id")
to get it inside a task or flow run. Is that what you want?
Or do you mean you need to get the id after flow run in created from another process?
Alternative options using GraphQL and FlowView
@Kyle_McChesney: If you are able to interact with the graphql API directly, you can use the following query
LATEST_FLOW_BY_NAME = gql(
'''
query LatestFlowByName($name: String) {
flow(
where: {name: {_eq: $name}},
order_by: {version: desc},
limit: 1,
)
{
id
}
}
''',
)
you pass in the flow name, which should be known/constant, and it gives you the latest flow version id
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
RequestsHTTPTransport(
url=PREFECT_API_ROOT,
verify=True,
retries=3,
)
Client(
transport=transport,
fetch_schema_from_transport=True,
)
client.execute(
LATEST_FLOW_BY_NAME,
variable_values={'name': name},
)
@Shuchita_Tripathi: Thank you Kevin.
Thank you Kyle. That’s exactly what I was looking for.
@Anna_Geller: btw @Kyle_McChesney there is an easier way to execute queries without having to install third-party libraries like gql
:
from prefect.client import Client
client = Client()
query = '''
query LatestFlowByName($name: String) {
flow(
where: {name: {_eq: $name}},
order_by: {version: desc},
limit: 1,
)
{
id
}
}
'''
client.graphql(query)
@Kyle_McChesney: oh awesome. we have kind of a freak show deployment process, we make this call in a lambda that doesn’t actually have prefect installed. but good to know its right in there.
@Anna_Geller: For AWS lambda function, you can use urllib which is included in the Lambda’s Python environment - no need for any extra libraries:
import json
import logging
import os
import urllib.parse
import urllib.request
os.environ["API_KEY"] = "xxx"
os.environ["FLOW_VERSION_GROUP_ID"] = "339c86be-5c1c-48f0-b8d3-fe57654afe22"
logger = logging.getLogger()
logger.setLevel(<http://logging.INFO|logging.INFO>)
def handler(event, context):
<http://logger.info|logger.info>("Received event: " + json.dumps(event))
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3_path = f"s3://{bucket}/{key}"
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
inputs = dict(
versionGroupId=os.environ["FLOW_VERSION_GROUP_ID"],
parameters=dict(s3_path=s3_path),
idempotencyKey=event["Records"][0]["eventTime"],
)
variables = dict(input=inputs)
data = json.dumps(
dict(query=create_mutation, variables=json.dumps(variables))
).encode("utf-8")
## prep the request
req = urllib.request.Request("<https://api.prefect.io>", data=data)
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", "Bearer {}".format(os.environ["API_KEY"]))
## send the request and return the response
resp = urllib.request.urlopen(req)
print(json.loads(resp.read().decode()))
In this case this is a mutation, but queries work the same way since GraphQL requires post request anyway.
obviously - those could be set in the env variables section in Lambda console
os.environ["API_KEY"] = "xxx"
os.environ["FLOW_VERSION_GROUP_ID"] = "339c86be-5c1c-48f0-b8d3-fe57654afe22"
@Kyle_McChesney: excellent!
@Michael_Adkins: There’s also FlowView.from_flow_name
https://docs.prefect.io/api/latest/backend/flow.html#flowview
Flow | Prefect Docs