The following script creates something like:
requires:
- prefect (
pip install prefect>=2.0.0
) - prefect-github (
pip install prefect-github && prefect block register prefect_github.credentials
) - creating a GitHubCredentials block document on UI named
github-token
- creating a SlackWebhook block document on UI named
channel-webhook
addition ideas:
- daily deployment
- retries
- concurrency
from textwrap import shorten
from typing import Any, Dict, List
import pendulum
from prefect import flow, get_run_logger, task
from prefect.blocks.notifications import SlackWebhook
from prefect_github import GitHubCredentials
from prefect_github.repository import (
query_repository_issue,
query_repository_issues,
query_repository_pull_request,
query_repository_pull_requests,
query_repository_stargazers,
)
NOW = pendulum.now()
PREFECT_COLLECTIONS = [
"prefect-collection-template",
"prefect-airbyte",
"prefect-aws",
"prefect-azure",
"prefect-dask",
"prefect-databricks",
"prefect-dbt",
"prefect-email",
"prefect-gcp",
"prefect-github",
"prefect-great-expectations",
"prefect-ray",
"prefect-shell",
"prefect-slack",
"prefect-snowflake",
"prefect-sqlalchemy",
"prefect-twitter",
]
@task
async def add_body_section(node, repository_kwargs, is_pr=False):
if is_pr:
label = "PR"
query_fn = query_repository_pull_request.fn
else:
label = "Issue"
query_fn = query_repository_issue.fn
issue_or_pr = await query_fn(number=node["number"], **repository_kwargs)
short_title = shorten(issue_or_pr["title"], width=70, placeholder="...")
created_at = pendulum.parse(issue_or_pr["createdAt"])
duration = " ".join((NOW - created_at).in_words().split()[:2]) + " ago"
body_section = (
"\n\t[{label} #{number}] <{url}|"
"*{short_title}*> (_{duration}_)".format(
label=label,
short_title=short_title,
duration=duration,
**issue_or_pr, # noqa
)
)
return body_section
@flow
def discover_github_issues_and_slack(
repository_names: List[str], repository_owner: str = "PrefectHQ"
) -> Dict[str, Any]:
logger = get_run_logger()
logger.info(
f"Reporting for {len(repository_names)} repos in {repository_owner}"
) # noqa
github_credentials_block = GitHubCredentials.load("github-token")
body = ""
total_count = {
"star": 0,
"pull_request": 0,
"issue": 0,
}
query_kwargs = dict(
first=100, labels=None, states=["OPEN"], return_fields=["number"]
)
for repository_name in sorted(repository_names):
logger.info(f"Processing {repository_name}")
repository_kwargs = dict(
owner=repository_owner,
name=repository_name,
github_credentials=github_credentials_block,
)
query_kwargs.update(repository_kwargs)
pull_request_nodes = query_repository_pull_requests(**query_kwargs)[
"nodes"
] # noqa
issue_nodes = query_repository_issues(
**query_kwargs,
)["nodes"]
star_count = query_repository_stargazers(**repository_kwargs)[
"totalCount"
] # noqa
pull_request_count = len(pull_request_nodes)
issue_count = len(issue_nodes)
total_count["star"] += star_count
total_count["pull_request"] += pull_request_count
total_count["issue"] += issue_count
body += (
f"\n\nā¢ *{repository_name}* "
f"(_{star_count} Stars / {pull_request_count} PRs / "
f"{issue_count} Issues_):"
)
body += "".join(
add_body_section(node, repository_kwargs, is_pr=True)
for node in pull_request_nodes
)
body += "".join(
add_body_section(node, repository_kwargs) for node in issue_nodes
)
day_datetime = NOW.to_day_datetime_string()
body = (
"\n{day_datetime}\n"
"*Prefect Collections Report* "
"(_{star} Stars / {pull_request} PRs / {issue} Issues_) "
":spiral_note_pad:"
"{body}\n"
).format(day_datetime=day_datetime, body=body, **total_count)
slack_webhook_block = SlackWebhook.load("channel-webhook")
return slack_webhook_block.notify(body=body)
if __name__ == "__main__":
discover_github_issues_and_slack(PREFECT_COLLECTIONS)