# -*- coding: utf-8 -*-
# utils.py - a module with support methods for centpkg
#
# Copyright (C) 2021 Red Hat Inc.
# Author(s): Ondrej Nosek <onosek@redhat.com>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version. See http://www.gnu.org/copyleft/gpl.html for
# the full text of the license.
import git
import json
import logging
import os
import re
import requests
import sys
from collections import namedtuple
from datetime import date, datetime
from http import HTTPStatus
from pyrpkg import rpkgError
from requests.exceptions import ConnectionError, HTTPError
from configparser import NoOptionError, NoSectionError
from urllib.parse import quote_plus, urlparse
import yaml
import git as gitpython
dist_git_config = None
# RHEL Product Pages Phase Identifiers
pp_phase_name_lookup = dict()
# Phase 230 is "Planning / Development / Testing" (AKA DevTestDoc)
pp_phase_devtestdoc = 230
pp_phase_name_lookup[pp_phase_devtestdoc] = "DevTestDoc"
# Phase 450 is "Stabilization" (AKA Exception Phase)
pp_phase_stabilization = 450
pp_phase_name_lookup[pp_phase_stabilization] = "Stabilization"
# Phase 600 is "Maintenance" (AKA Z-stream Phase)
pp_phase_maintenance = 600
pp_phase_name_lookup[pp_phase_maintenance] = "Maintenance"
# Default lookup location for unsynced packages
default_distrobaker_config = "https://gitlab.cee.redhat.com/osci/distrobaker_config/-/raw/rhel9/distrobaker.yaml?ref_type=heads"
rhel_state_nt = namedtuple(
"RHELState",
[
"latest_version",
"target_version",
"rule_branch",
"phase",
"rhel_target_default",
"enforcing",
"synced",
],
)
# Super-class for errors related to internal RHEL infrastructure
class RHELError(Exception):
pass
logger = logging.getLogger(__name__)
def do_fork(logger, base_url, token, repo_name, namespace, cli_name):
"""
Creates a fork of the project.
:param logger: A logger object
:param base_url: a string of the URL repository
:param token: a string of the API token that has rights to make a fork
:param repo_name: a string of the repository name
:param namespace: a string determines a type of the repository
:param cli_name: string of the CLI's name (e.g. centpkg)
:return: a tuple consisting of whether the fork needed to be created (bool)
and the fork path (string)
"""
api_url = "{0}/api/v4".format(base_url.rstrip("/"))
project_id = quote_plus("redhat/centos-stream/{0}/{1}".format(namespace, repo_name))
fork_url = "{0}/projects/{1}/fork".format(api_url, project_id)
headers = {
"PRIVATE-TOKEN": token,
"Accept": "application/json",
"Content-Type": "application/json",
}
# define a new repository name/path to avoid collision with other projects
safe_name = "centos_{0}_{1}".format(namespace, repo_name)
payload = json.dumps(
{
"name": safe_name, # name of the project after forking
"path": safe_name,
}
)
try:
rv = requests.post(fork_url, headers=headers, data=payload, timeout=60)
except ConnectionError as error:
error_msg = (
"The connection to API failed while trying to "
"create a new fork. The error was: {0}".format(str(error))
)
raise rpkgError(error_msg)
try:
# Extract response json for debugging
rv_json = rv.json()
logger.debug("GitLab API response: '{0}'".format(rv_json))
except Exception:
pass
if rv.ok:
fork_id = rv.json()["id"]
try:
# Unprotect c9s in fork
rv = requests.delete(
"{0}/projects/{1}/protected_branches/{2}".format(
api_url, fork_id, "c9s"
),
headers=headers,
)
except ConnectionError as error:
error_msg = (
"The connection to API failed while trying to unprotect c9s branch"
"in the fork. The error was: {0}".format(str(error))
)
raise rpkgError(error_msg)
try:
# Reprotect c9s to disable pushes
# Only maintainers in gitlab are allowed to push with the following config
# In CS, every pkg maintainer is considered as a developer in gitlab
data = {
"id": fork_id,
"name": "c9s",
"allowed_to_push": [{"access_level": 40}],
"allowed_to_merge": [{"access_level": 40}],
}
rv = requests.post(
"{0}/projects/{1}/protected_branches".format(api_url, fork_id),
json=data,
headers=headers,
)
except ConnectionError as error:
error_msg = (
"The connection to API failed while trying to reprotect c9s branch"
"in the fork fork. The error was: {0}".format(str(error))
)
raise rpkgError(error_msg)
base_error_msg = "The following error occurred while creating a new fork: {0}"
if not rv.ok:
# fork was already created
if rv.status_code == 409 or rv.reason == "Conflict":
# When the repo already exists, the return doesn't contain the repo
# path or username. Make one more API call to get the username of
# the token to construct the repo path.
rv = requests.get("{0}/user".format(api_url), headers=headers)
username = rv.json()["username"]
return False, "{0}/{1}".format(username, safe_name)
# show hint for invalid, expired or revoked token
elif rv.status_code == 401 or rv.reason == "Unauthorized":
base_error_msg += (
"\nFor invalid or expired token refer to "
'"{0} fork -h" to set a token in your user '
"configuration.".format(cli_name)
)
raise rpkgError(base_error_msg.format(rv.text))
return True, rv_json["path_with_namespace"]
def do_add_remote(base_url, remote_base_url, repo, repo_path, remote_name):
"""
Adds remote tracked repository
:param base_url: a string of the URL repository
:param remote_base_url: a string of the remote tracked repository
:param repo: object, current project git repository
:param repo_path: a string of the repository path
:param remote_name: a string of the remote name
:return: a bool; True if remote was created, False when already exists
"""
parsed_url = urlparse(remote_base_url)
remote_url = "{0}://{1}/{2}.git".format(
parsed_url.scheme,
parsed_url.netloc,
repo_path,
)
# check already existing remote
for remote in repo.remotes:
if remote.name == remote_name:
return False
try:
repo.create_remote(remote_name, url=remote_url)
except git.exc.GitCommandError as e:
error_msg = "During create remote:\n {0}\n {1}".format(
" ".join(e.command), e.stderr
)
raise rpkgError(error_msg)
return True
def config_get_safely(config, section, option):
"""
Returns option from the user's configuration file. In case of missing
section or option method throws an exception with a human-readable
warning and a possible hint.
The method should be used especially in situations when there are newly
added sections/options into the config. In this case, there is a risk that
the user's config wasn't properly upgraded.
:param config: ConfigParser object
:param section: section name in the config
:param option: name of the option
:return: option value from the right section
:rtype: str
"""
hint = (
"First (if possible), refer to the help of the current command "
"(-h/--help).\n"
"There also might be a new version of the config after upgrade.\n"
"Hint: you can check if you have 'centpkg.conf.rpmnew' or "
"'centpkg.conf.rpmsave' in the config directory. If yes, try to merge "
"your changes to the config with the maintainer provided version "
"(or replace centpkg.conf file with 'centpkg.conf.rpmnew')."
)
try:
return config.get(section, option)
except NoSectionError:
msg = "Missing section '{0}' in the config file.".format(section)
raise rpkgError("{0}\n{1}".format(msg, hint))
except NoOptionError:
msg = "Missing option '{0}' in the section '{1}' of the config file.".format(
option, section
)
raise rpkgError("{0}\n{1}".format(msg, hint))
except Exception:
raise
def get_canonical_repo_name(config, repo_url):
"""
Check whether the current repo is a fork and if so, retrieve the parent
fork to get the proper name.
"""
# Look up the repo and query for forked_from_project
cli_name = config_get_safely(dist_git_config, "__default", "cli_name")
distgit_section = "{0}.distgit".format(cli_name)
distgit_api_base_url = config_get_safely(
dist_git_config, distgit_section, "apibaseurl"
)
parsed_repo_url = urlparse(repo_url)
if not parsed_repo_url.scheme and repo_url.startswith("git@"):
# Some git checkouts are in the form of git@gitlab.com/...
# If it's missing the scheme, it will treat the entire URL as the path
# so we'll fake up the scheme for this situation
# https://www.git-scm.com/book/en/v2/Git-Basics-Getting-a-Git-Repository
# implies that no scheme is equivalent to git+ssh://
# When making that conversion, we also have to replace the leading ':'
# with a slash.
faked_url = "git+ssh://{0}".format(repo_url.replace(":", "/", 1))
parsed_repo_url = urlparse(faked_url)
try:
distgit_token = config_get_safely(dist_git_config, distgit_section, "token")
api_url = "{0}/api/v4".format(distgit_api_base_url.rstrip("/"))
project_url = "{0}/projects/{1}".format(
api_url, quote_plus(parsed_repo_url.path.lstrip("/"))
)
headers = {
"PRIVATE-TOKEN": distgit_token,
"Accept": "application/json",
"Content-Type": "application/json",
}
rv = requests.get(project_url, headers=headers)
rv.raise_for_status()
# Extract response json for debugging
rv_json = rv.json()
canonical_repo_name = rv_json["forked_from_project"]["name"]
except HTTPError as e:
# We got a 4xx or 5xx error code from the URL lookup
if e.response.status_code == HTTPStatus.FORBIDDEN:
raise rpkgError("Insufficient Gitlab API permissions. Missing token?")
# Other errors are unexpected, so re-raise them
raise
except KeyError as e:
# There was no 'forked_from_project' key, likely meaning the
# user lacked permissions to read the API. Usually this means
# they haven't supplied a token or it is expired.
raise rpkgError("Insufficient Gitlab API permissions. Missing token?")
# Chop off a trailing .git if any
return canonical_repo_name.rsplit(".git", 1)[0]
def get_repo_name(name, org="rpms"):
"""
Try to parse the repository name in case it is a git url.
Parameters
----------
name: str
The repository name, including the org name.
It will try to retrieve both repository name and org in case "name" is an url.
org: str
The org to use in case name parsing is needed.
Returns
-------
str
A string containing the repository name: $ORG/$REPO`.
It will return the original `name` parameter in case of regex match failure.
"""
if name.startswith(org):
return name
# This is probably a renamed fork, so try to find the fork's parent
repo_name = get_canonical_repo_name(dist_git_config, name)
return "%s/%s" % (org, repo_name)
def stream_mapping(csname):
"""
Given a CentOS Stream name, map it to the corresponding RHEL name.
Parameters
----------
csname: str
The CentOS Stream name.
Returns
-------
str
Corresponding RHEL name.
"""
if csname == "c8s" or csname == "cs8":
return 8, "rhel-8"
if csname == "c9s" or csname == "cs9":
return 9, "rhel-9"
if csname == "c10s" or csname == "cs10":
return 10, "rhel-10"
if csname == "c11s" or csname == "cs11":
return 11, "rhel-11"
return None
def does_branch_exist(rhel_dist_git, namespace, repo_name, branch):
# Determine if the Y-1 branch exists for this repo
g = gitpython.cmd.Git()
try:
g.ls_remote(
"--exit-code",
os.path.join(rhel_dist_git, namespace, repo_name),
branch,
)
branch_exists = True
except gitpython.GitCommandError as e:
t, v, tb = sys.exc_info()
# `git ls-remote --exit-code` returns "2" if it cannot find the ref
if e.status == 2:
branch_exists = False
else:
raise
return branch_exists
def _datesplit(isodate):
date_string_tuple = isodate.split("-")
return [int(x) for x in date_string_tuple]
# Certain packages are not synced to RHEL, and will always use the 'cXs' branch
# rules. This list is maintained in the distrobaker configuration:
def get_unsynced_projects(distrobaker_config, namespace):
res = requests.get(distrobaker_config, timeout=60)
res.raise_for_status()
payload = yaml.safe_load(res.content.decode("utf-8"))
return payload["configuration"]["control"]["exclude"][namespace]
def parse_rhel_shortname(shortname):
# The shortname is in the form rhel-9-1.0 or rhel-10.0[.beta]
m = re.match(
"rhel-(?P<major>[0-9]+)[.-](?P<minor>[0-9]+)([.]0|[.](?P<extra>.*))?", shortname
)
if not m:
raise RuntimeError("Could not parse version from {}".format(shortname))
major_version = int(m.group("major"))
minor_version = int(m.group("minor"))
extra_version = m.group("extra") or None
return major_version, minor_version, extra_version
def parse_rhel_branchname(shortname):
# The branchname is in the form rhel-9-1.0 or rhel-10.0[-beta]
m = re.match(
"rhel-(?P<major>[0-9]+)[.-](?P<minor>[0-9]+)([.]0|[-](?P<extra>.*))?", shortname
)
if not m:
raise RuntimeError("Could not parse version from {}".format(shortname))
major_version = int(m.group("major"))
minor_version = int(m.group("minor"))
extra_version = m.group("extra") or None
return major_version, minor_version, extra_version
def query_package_pages(api_url, request_params):
"""
api_url: A URL to the API endpoing of the Product Pages (e.g.
"https://example.com/pp/api/")
request_params: A set of python-requests-compatible URL parameters to
focus the query.
"""
res = requests.get(
os.path.join(api_url, "latest", "releases"),
params=request_params,
timeout=60,
)
res.raise_for_status()
payload = json.loads(res.text)
logger.debug("Response from PP API: {}".format(json.dumps(payload, indent=2)))
return payload
def format_branch(x_version, y_version, is_beta):
if x_version <= 9:
# 9.x and older releases include an excess .0 in the branch name
if is_beta:
branch = "rhel-{}.{}.0-beta".format(x_version, y_version)
else:
branch = "rhel-{}.{}.0".format(x_version, y_version)
else:
# Starting with RHEL 10, the branch names have dropped the extra .0
if is_beta:
branch = "rhel-{}.{}-beta".format(x_version, y_version)
else:
branch = "rhel-{}.{}".format(x_version, y_version)
return branch
def determine_rhel_state(
rhel_dist_git, namespace, repo_name, cs_branch, pp_api_url, distrobaker_config
):
"""
Arguments:
* rhel_dist_git: an https URL to the RHEL dist-git. Used for determining
the presence of the prior release's Z-stream branch.
* namespace: The dist-git namespace (rpms, containers, modules, etc.).
Used for determining the presence of the prior release's Z-stream
branch.
* repo_name: The name of the repo in the namespace from which we will
determine status. Used for determining the presence of the prior
release's Z-stream branch.
* cs_branch: The CentOS Stream branch for this repo. Used to determine the
RHEL major release.
* pp_api_url: The URL to the RHEL Product Pages API. Used for determining
the current development phase.
* distrobaker_config: The URL to the DistroBaker configuration. Used for
identifying packages that are not synced to RHEL.
Returns: a namedtuple containing key information about the RHEL release
associated with this CentOS Stream branch. It has the following members:
* latest_version: The most recent major and minor release of RHEL. This
is a presentation string and its format is not guaranteed.
* target_version: The major and minor release of RHEL that is currently
targeted by this CentOS Stream branch. This is a presentation string
and its format is not guaranteed.
* rule_branch: The branch to be used for check-tickets rules (str)
* rhel_target_default: The default `--rhel-target` (str) or
None (NoneType). The possible values if not None are "latest",
"zstream" or "none" (distinctive from NoneType)
* enforcing: Whether ticket approvals should be enforced. (bool)
* synced: Whether this package is synced to RHEL. False means it is a
CentOS Stream-only package. (bool)
"""
# First, check if this package has a RHEL counterpart or is CentOS Stream
# only.
try:
if repo_name in get_unsynced_projects(distrobaker_config, namespace):
# We don't need to do any looking up, because it will always use the
# stream version and never enforce tickets. It will return
# rhel_target_default="none" to instruct distrobaker not to attempt
# to build on RHEL.
return rhel_state_nt(
latest_version=cs_branch,
target_version=cs_branch,
rule_branch=cs_branch,
phase=pp_phase_devtestdoc,
rhel_target_default="none",
enforcing=False,
synced=False,
)
except (ConnectionError, HTTPError) as e:
raise RHELError("Could not retrieve distrobaker config. Are you on the VPN?")
x_version, rhel_version = stream_mapping(cs_branch)
# Query the "package pages" API for the current active Y-stream release
request_params = {
"phase__in": "{},{},{}".format(
pp_phase_devtestdoc, pp_phase_stabilization, pp_phase_maintenance
),
"product__shortname": "rhel",
"relgroup__shortname": rhel_version,
"format": "json",
}
try:
pp_response = query_package_pages(
api_url=pp_api_url, request_params=request_params
)
except (ConnectionError, HTTPError) as e:
raise RHELError("Could not contact Product Pages. Are you on the VPN?")
if len(pp_response) < 1:
# Received zero potential release matches
logger.warning("Didn't match any active releases. Assuming pre-Beta.")
# Fake up a Beta payload
pp_response = [
{
"shortname": "{}.0-beta".format(rhel_version),
"phase": pp_phase_devtestdoc,
}
]
active_y_version = -1
beta = False
phase_lookup = dict()
for entry in pp_response:
shortname = entry["shortname"]
# The shortname is in the form rhel-9-1.0 or rhel-10.0[.beta]
# Extract the active Y-stream version
x_version, y_version, extra_version = parse_rhel_shortname(shortname)
entry_is_beta = bool(extra_version and "beta" in extra_version)
# Enable looking up the phase later
branch_name = format_branch(x_version, y_version, entry_is_beta)
phase_lookup[branch_name] = entry["phase"]
if y_version > active_y_version or (
y_version == active_y_version and beta and not entry_is_beta
):
# Replace the saved values with a higher Y version if we
# see one. Also check whether we have the same Y version
# but without the Beta indicator
active_y_version = y_version
beta = entry_is_beta
if beta:
latest_version = "{}.{} Beta".format(x_version, active_y_version)
else:
latest_version = "{}.{}".format(x_version, active_y_version)
logger.debug("Latest version: {}".format(latest_version))
# Next we need to find out if we're actually USING the latest version or
# the previous one, by asking RHEL dist-git if the rhel-X.(Y-1).0 branch
# exists. (Or rhel-X.Y.0-beta in the special case of Y=0)
# If the latest release is the Beta, we can skip checking for a prior
# release branch, since none can exist and we know it cannot be in
# the Stabilization Phase yet. Thus, we return the CS branch and
# --rhel-target=latest
if beta:
return rhel_state_nt(
latest_version=latest_version,
target_version=latest_version,
rule_branch=cs_branch,
phase=pp_phase_devtestdoc,
rhel_target_default="latest",
enforcing=False,
synced=True,
)
# First, check if this is the special case of Y=0
# Note: since this is being written during the 10.0 Beta timeframe, there
# is no need to special case older formats like 9.0.0-beta. We can just
# use rhel-X.0-beta instead.
if active_y_version == 0:
prior_release_branch = format_branch(x_version, active_y_version, is_beta=True)
else:
prior_release_branch = format_branch(
x_version, active_y_version - 1, is_beta=False
)
current_release_branch = format_branch(x_version, active_y_version, is_beta=False)
logger.debug("Prior release branch: {}".format(prior_release_branch))
try:
branch_exists = does_branch_exist(
rhel_dist_git, namespace, repo_name, prior_release_branch
)
except gitpython.GitCommandError as e:
raise RHELError("Could not read from RHEL dist-git. Are you on the VPN?")
if branch_exists:
# The branch is there, so work on the active Y-stream, which is always
# in either DevTestDoc Phase or Maintenance Phase (in the case of an
# end-of-life CentOS Stream)
phase = phase_lookup[current_release_branch]
check_tickets_branch = cs_branch
rhel_target_default = "latest"
target_version = latest_version
if phase == pp_phase_maintenance:
enforcing = True
else:
enforcing = False
else:
# The branch is not present, so we'll work on the prior Y-stream
check_tickets_branch = prior_release_branch
target_x, target_y, target_extra = parse_rhel_branchname(prior_release_branch)
target_version = "{}.{}{}".format(
target_x,
target_y,
" Beta" if target_extra and "beta" in target_extra else "",
)
# The prior Y-stream is always in either Stabilization or Maintenance
# phase, so it always enforces.
enforcing = True
# Determine which phase the prior release is in:
phase = phase_lookup[prior_release_branch]
if phase == pp_phase_stabilization:
# We're in the Stabilization phase, so we can't automatically determine
# between the "zstream" and "exception" targets.
rhel_target_default = None
else:
# We must be in Maintenance phase
rhel_target_default = "zstream"
return rhel_state_nt(
latest_version=latest_version,
target_version=target_version,
rule_branch=check_tickets_branch,
phase=phase,
rhel_target_default=rhel_target_default,
enforcing=enforcing,
synced=True,
)
def format_current_state_message(rhel_state):
"""
Returns a human-readable string providing actionable information about the
current state of this repository. Useful for `centpkg current-state` and
the check-tickets function in merge requests
"""
if not rhel_state.synced:
return f"This component is not synced to RHEL"
message = (
f"Current RHEL status:\n"
f"\tThe latest active Y-stream release is RHEL {rhel_state.latest_version}\n"
f"\tThis project is targeting RHEL {rhel_state.target_version}\n"
)
if rhel_state.latest_version != rhel_state.target_version:
zstream_active_msg = (
f"\t\tThe latest and targeted versions differ.\n"
f"\t\tIf this is not intentional, please see\n"
f"\t\thttps://one.redhat.com/rhel-development-guide/#proc_centos-stream-first_assembly_rhel-9-development\n"
f"\t\tfor details on how to unlock Y-stream development by creating the {rhel_state.rule_branch} branch.\n"
)
message = "".join((message, zstream_active_msg))
target_phase = pp_phase_name_lookup[rhel_state.phase]
message = "".join(
(
message,
f"\tThe {rhel_state.target_version} release is currently in {target_phase} phase\n",
)
)
if rhel_state.phase == pp_phase_stabilization:
message = "".join(
(message, f"\t\tThe --rhel-target argument must be used when building.\n")
)
message = "".join(
(
message,
f"\tTicket approvals are {'' if rhel_state.enforcing else 'not '}currently required for merge request approval.",
)
)
return message