github-stats/github_stats.py

543 lines
17 KiB
Python

#!/usr/bin/python3
import asyncio
import os
from typing import Dict, List, Optional, Set, Tuple, Any, cast
import aiohttp
import requests
###############################################################################
# Main Classes
###############################################################################
class Queries(object):
"""
Class with functions to query the GitHub GraphQL (v4) API and the REST (v3)
API. Also includes functions to dynamically generate GraphQL queries.
"""
def __init__(
self,
username: str,
access_token: str,
session: aiohttp.ClientSession,
max_connections: int = 10,
):
self.username = username
self.access_token = access_token
self.session = session
self.semaphore = asyncio.Semaphore(max_connections)
async def query(self, generated_query: str) -> Dict:
"""
Make a request to the GraphQL API using the authentication token from
the environment
:param generated_query: string query to be sent to the API
:return: decoded GraphQL JSON output
"""
headers = {
"Authorization": f"Bearer {self.access_token}",
}
try:
async with self.semaphore:
r_async = await self.session.post(
"https://api.github.com/graphql",
headers=headers,
json={"query": generated_query},
)
result = await r_async.json()
if result is not None:
return result
except:
print("aiohttp failed for GraphQL query")
# Fall back on non-async requests
async with self.semaphore:
r_requests = requests.post(
"https://api.github.com/graphql",
headers=headers,
json={"query": generated_query},
)
result = r_requests.json()
if result is not None:
return result
return dict()
async def query_rest(self, path: str, params: Optional[Dict] = None) -> Dict:
"""
Make a request to the REST API
:param path: API path to query
:param params: Query parameters to be passed to the API
:return: deserialized REST JSON output
"""
for _ in range(60):
headers = {
"Authorization": f"token {self.access_token}",
}
if params is None:
params = dict()
if path.startswith("/"):
path = path[1:]
try:
async with self.semaphore:
r_async = await self.session.get(
f"https://api.github.com/{path}",
headers=headers,
params=tuple(params.items()),
)
if r_async.status == 202:
# print(f"{path} returned 202. Retrying...")
print(f"A path returned 202. Retrying...")
await asyncio.sleep(2)
continue
result = await r_async.json()
if result is not None:
return result
except:
print("aiohttp failed for rest query")
# Fall back on non-async requests
async with self.semaphore:
r_requests = requests.get(
f"https://api.github.com/{path}",
headers=headers,
params=tuple(params.items()),
)
if r_requests.status_code == 202:
print(f"A path returned 202. Retrying...")
await asyncio.sleep(2)
continue
elif r_requests.status_code == 200:
return r_requests.json()
# print(f"There were too many 202s. Data for {path} will be incomplete.")
print("There were too many 202s. Data for this repository will be incomplete.")
return dict()
@staticmethod
def repos_overview(
contrib_cursor: Optional[str] = None, owned_cursor: Optional[str] = None
) -> str:
"""
:return: GraphQL query with overview of user repositories
"""
return f"""{{
viewer {{
login,
name,
repositories(
first: 100,
orderBy: {{
field: UPDATED_AT,
direction: DESC
}},
isFork: false,
after: {"null" if owned_cursor is None else '"'+ owned_cursor +'"'}
) {{
pageInfo {{
hasNextPage
endCursor
}}
nodes {{
nameWithOwner
stargazers {{
totalCount
}}
forkCount
languages(first: 10, orderBy: {{field: SIZE, direction: DESC}}) {{
edges {{
size
node {{
name
color
}}
}}
}}
}}
}}
repositoriesContributedTo(
first: 100,
includeUserRepositories: false,
orderBy: {{
field: UPDATED_AT,
direction: DESC
}},
contributionTypes: [
COMMIT,
PULL_REQUEST,
REPOSITORY,
PULL_REQUEST_REVIEW
]
after: {"null" if contrib_cursor is None else '"'+ contrib_cursor +'"'}
) {{
pageInfo {{
hasNextPage
endCursor
}}
nodes {{
nameWithOwner
stargazers {{
totalCount
}}
forkCount
languages(first: 10, orderBy: {{field: SIZE, direction: DESC}}) {{
edges {{
size
node {{
name
color
}}
}}
}}
}}
}}
}}
}}
"""
@staticmethod
def contrib_years() -> str:
"""
:return: GraphQL query to get all years the user has been a contributor
"""
return """
query {
viewer {
contributionsCollection {
contributionYears
}
}
}
"""
@staticmethod
def contribs_by_year(year: str) -> str:
"""
:param year: year to query for
:return: portion of a GraphQL query with desired info for a given year
"""
return f"""
year{year}: contributionsCollection(
from: "{year}-01-01T00:00:00Z",
to: "{int(year) + 1}-01-01T00:00:00Z"
) {{
contributionCalendar {{
totalContributions
}}
}}
"""
@classmethod
def all_contribs(cls, years: List[str]) -> str:
"""
:param years: list of years to get contributions for
:return: query to retrieve contribution information for all user years
"""
by_years = "\n".join(map(cls.contribs_by_year, years))
return f"""
query {{
viewer {{
{by_years}
}}
}}
"""
class Stats(object):
"""
Retrieve and store statistics about GitHub usage.
"""
def __init__(
self,
username: str,
access_token: str,
session: aiohttp.ClientSession,
exclude_repos: Optional[Set] = None,
exclude_langs: Optional[Set] = None,
ignore_forked_repos: bool = False,
):
self.username = username
self._ignore_forked_repos = ignore_forked_repos
self._exclude_repos = set() if exclude_repos is None else exclude_repos
self._exclude_langs = set() if exclude_langs is None else exclude_langs
self.queries = Queries(username, access_token, session)
self._name: Optional[str] = None
self._stargazers: Optional[int] = None
self._forks: Optional[int] = None
self._total_contributions: Optional[int] = None
self._languages: Optional[Dict[str, Any]] = None
self._repos: Optional[Set[str]] = None
self._lines_changed: Optional[Tuple[int, int]] = None
self._views: Optional[int] = None
async def to_str(self) -> str:
"""
:return: summary of all available statistics
"""
languages = await self.languages_proportional
formatted_languages = "\n - ".join(
[f"{k}: {v:0.4f}%" for k, v in languages.items()]
)
lines_changed = await self.lines_changed
return f"""Name: {await self.name}
Stargazers: {await self.stargazers:,}
Forks: {await self.forks:,}
All-time contributions: {await self.total_contributions:,}
Repositories with contributions: {len(await self.repos)}
Lines of code added: {lines_changed[0]:,}
Lines of code deleted: {lines_changed[1]:,}
Lines of code changed: {lines_changed[0] + lines_changed[1]:,}
Project page views: {await self.views:,}
Languages:
- {formatted_languages}"""
async def get_stats(self) -> None:
"""
Get lots of summary statistics using one big query. Sets many attributes
"""
self._stargazers = 0
self._forks = 0
self._languages = dict()
self._repos = set()
next_owned = None
next_contrib = None
while True:
raw_results = await self.queries.query(
Queries.repos_overview(
owned_cursor=next_owned, contrib_cursor=next_contrib
)
)
raw_results = raw_results if raw_results is not None else {}
self._name = raw_results.get("data", {}).get("viewer", {}).get("name", None)
if self._name is None:
self._name = (
raw_results.get("data", {})
.get("viewer", {})
.get("login", "No Name")
)
contrib_repos = (
raw_results.get("data", {})
.get("viewer", {})
.get("repositoriesContributedTo", {})
)
owned_repos = (
raw_results.get("data", {}).get("viewer", {}).get("repositories", {})
)
repos = owned_repos.get("nodes", [])
if not self._ignore_forked_repos:
repos += contrib_repos.get("nodes", [])
for repo in repos:
if repo is None:
continue
name = repo.get("nameWithOwner")
if name in self._repos or name in self._exclude_repos:
continue
self._repos.add(name)
self._stargazers += repo.get("stargazers").get("totalCount", 0)
self._forks += repo.get("forkCount", 0)
for lang in repo.get("languages", {}).get("edges", []):
name = lang.get("node", {}).get("name", "Other")
languages = await self.languages
if name in self._exclude_langs:
continue
if name in languages:
languages[name]["size"] += lang.get("size", 0)
languages[name]["occurrences"] += 1
else:
languages[name] = {
"size": lang.get("size", 0),
"occurrences": 1,
"color": lang.get("node", {}).get("color"),
}
if owned_repos.get("pageInfo", {}).get(
"hasNextPage", False
) or contrib_repos.get("pageInfo", {}).get("hasNextPage", False):
next_owned = owned_repos.get("pageInfo", {}).get(
"endCursor", next_owned
)
next_contrib = contrib_repos.get("pageInfo", {}).get(
"endCursor", next_contrib
)
else:
break
# TODO: Improve languages to scale by number of contributions to
# specific filetypes
langs_total = sum([v.get("size", 0) for v in self._languages.values()])
for k, v in self._languages.items():
v["prop"] = 100 * (v.get("size", 0) / langs_total)
@property
async def name(self) -> str:
"""
:return: GitHub user's name (e.g., Jacob Strieb)
"""
if self._name is not None:
return self._name
await self.get_stats()
assert self._name is not None
return self._name
@property
async def stargazers(self) -> int:
"""
:return: total number of stargazers on user's repos
"""
if self._stargazers is not None:
return self._stargazers
await self.get_stats()
assert self._stargazers is not None
return self._stargazers
@property
async def forks(self) -> int:
"""
:return: total number of forks on user's repos
"""
if self._forks is not None:
return self._forks
await self.get_stats()
assert self._forks is not None
return self._forks
@property
async def languages(self) -> Dict:
"""
:return: summary of languages used by the user
"""
if self._languages is not None:
return self._languages
await self.get_stats()
assert self._languages is not None
return self._languages
@property
async def languages_proportional(self) -> Dict:
"""
:return: summary of languages used by the user, with proportional usage
"""
if self._languages is None:
await self.get_stats()
assert self._languages is not None
return {k: v.get("prop", 0) for (k, v) in self._languages.items()}
@property
async def repos(self) -> Set[str]:
"""
:return: list of names of user's repos
"""
if self._repos is not None:
return self._repos
await self.get_stats()
assert self._repos is not None
return self._repos
@property
async def total_contributions(self) -> int:
"""
:return: count of user's total contributions as defined by GitHub
"""
if self._total_contributions is not None:
return self._total_contributions
self._total_contributions = 0
years = (
(await self.queries.query(Queries.contrib_years()))
.get("data", {})
.get("viewer", {})
.get("contributionsCollection", {})
.get("contributionYears", [])
)
by_year = (
(await self.queries.query(Queries.all_contribs(years)))
.get("data", {})
.get("viewer", {})
.values()
)
for year in by_year:
self._total_contributions += year.get("contributionCalendar", {}).get(
"totalContributions", 0
)
return cast(int, self._total_contributions)
@property
async def lines_changed(self) -> Tuple[int, int]:
"""
:return: count of total lines added, removed, or modified by the user
"""
if self._lines_changed is not None:
return self._lines_changed
additions = 0
deletions = 0
for repo in await self.repos:
r = await self.queries.query_rest(f"/repos/{repo}/stats/contributors")
for author_obj in r:
# Handle malformed response from the API by skipping this repo
if not isinstance(author_obj, dict) or not isinstance(
author_obj.get("author", {}), dict
):
continue
author = author_obj.get("author", {}).get("login", "")
if author not in [ self.username, os.environ.get("ALT_USERNAME", "") ]:
continue
for week in author_obj.get("weeks", []):
additions += week.get("a", 0)
deletions += week.get("d", 0)
self._lines_changed = (additions, deletions)
return self._lines_changed
@property
async def views(self) -> int:
"""
Note: only returns views for the last 14 days (as-per GitHub API)
:return: total number of page views the user's projects have received
"""
if self._views is not None:
return self._views
total = 0
for repo in await self.repos:
r = await self.queries.query_rest(f"/repos/{repo}/traffic/views")
for view in r.get("views", []):
total += view.get("count", 0)
self._views = total
return total
###############################################################################
# Main Function
###############################################################################
async def main() -> None:
"""
Used mostly for testing; this module is not usually run standalone
"""
access_token = os.getenv("ACCESS_TOKEN")
user = os.getenv("GITHUB_ACTOR")
if access_token is None or user is None:
raise RuntimeError(
"ACCESS_TOKEN and GITHUB_ACTOR environment variables cannot be None!"
)
async with aiohttp.ClientSession() as session:
s = Stats(user, access_token, session)
print(await s.to_str())
if __name__ == "__main__":
asyncio.run(main())