From 735e2e59c45751d43594a148055d169f6c75dc0b Mon Sep 17 00:00:00 2001
From: Matteo Croce <teknoraver@meta.com>
Date: Mon, 17 Feb 2025 21:53:02 +0100
Subject: [PATCH] Add support for rpm2extents transcoder
Two related parts:
1. If `LIBREPO_TRANSCODE_RPMS` environment is set to a program (with parameters) then downloads are piped through it.
2. Transcoded RPMS by definition will not have the same bits on disk as downloaded. This is inherent. The transcoder is tasked with measuring the bits that enter stdin and storing a copy of the digest(s) seen in the footer. `librepo` can then use these stored digests instead if the environment variable is set.
This is part of changes described in https://fedoraproject.org/wiki/Changes/RPMCoW
Co-authored-by: Matthew Almond <malmond@meta.com>
---
librepo/checksum.c | 111 +++++++++++++++++++++++++++++++-
librepo/downloader.c | 149 ++++++++++++++++++++++++++++++++++++++++++-
librepo/rcodes.h | 2 +
3 files changed, 258 insertions(+), 4 deletions(-)
diff --git a/librepo/checksum.c b/librepo/checksum.c
index 199e40e84..c1e0c476d 100644
--- a/librepo/checksum.c
+++ b/librepo/checksum.c
@@ -40,6 +40,9 @@
#define BUFFER_SIZE 2048
#define MAX_CHECKSUM_NAME_LEN 7
+/* magic value at end of file (64 bits) that indicates this is a transcoded rpm */
+#define MAGIC 3472329499408095051
+
LrChecksumType
lr_checksum_type(const char *type)
{
@@ -103,6 +106,100 @@ lr_checksum_type_to_str(LrChecksumType type)
return NULL;
}
+char *
+lr_checksum_cow_fd(LrChecksumType type, int fd, GError **err)
+{
+ struct __attribute__ ((__packed__)) csum_offset_magic {
+ off64_t csum_offset;
+ uint64_t magic;
+ };
+ struct __attribute__ ((__packed__)) orig_size_algos_len {
+ ssize_t orig_size;
+ uint32_t algos_len;
+ };
+ struct __attribute__ ((__packed__)) algo_len_digest_len {
+ uint32_t algo_len;
+ uint32_t digest_len;
+ };
+
+ struct csum_offset_magic csum_offset_magic;
+ struct orig_size_algos_len orig_size_algos_len;
+ struct algo_len_digest_len algo_len_digest_len;
+ char *algo, *checksum;
+ unsigned char *digest;
+ size_t len = sizeof(csum_offset_magic);
+
+ if (g_getenv("LIBREPO_TRANSCODE_RPMS") == NULL) {
+ g_debug("Transcoding not enabled, skipping path");
+ return NULL;
+ }
+ if (lseek(fd, -len, SEEK_END) == -1) {
+ g_warning("seek for transcode failed, probably too small");
+ return NULL;
+ }
+ if (read(fd, &csum_offset_magic, len) != len) {
+ g_set_error(err, LR_CHECKSUM_ERROR, LRE_TRANSCODE,
+ "Cannot read csum_offset, magic. size = %lu", len);
+ return NULL;
+ }
+ if (csum_offset_magic.magic != MAGIC) {
+ g_debug("Not transcoded");
+ return NULL;
+ }
+ g_debug("Is transcoded");
+ if (lseek(fd, csum_offset_magic.csum_offset, SEEK_SET) == -1) {
+ g_set_error(err, LR_CHECKSUM_ERROR, LRE_TRANSCODE,
+ "seek for transcode csum_offset failed");
+ return NULL;
+ }
+ len = sizeof(orig_size_algos_len);
+ if (read(fd, &orig_size_algos_len, len) != len) {
+ g_set_error(err, LR_CHECKSUM_ERROR, LRE_TRANSCODE,
+ "Cannot read orig_size_algos_len");
+ return NULL;
+ }
+ while (orig_size_algos_len.algos_len > 0) {
+ len = sizeof(algo_len_digest_len);
+ if (read(fd, &algo_len_digest_len, len) != len) {
+ g_set_error(err, LR_CHECKSUM_ERROR, LRE_TRANSCODE,
+ "Cannot read algo_len_digest_len");
+ return NULL;
+ }
+
+ len = algo_len_digest_len.algo_len;
+ algo = lr_malloc0(len + 1);
+ if (read(fd, algo, len) != len) {
+ g_set_error(err, LR_CHECKSUM_ERROR, LRE_TRANSCODE,
+ "Cannot read algo");
+ lr_free(algo);
+ return NULL;
+ }
+ len = algo_len_digest_len.digest_len;
+ digest = lr_malloc0(len);
+ if (read(fd, digest, len) != len) {
+ g_set_error(err, LR_CHECKSUM_ERROR, LRE_TRANSCODE,
+ "Cannot read digest");
+ lr_free(algo);
+ lr_free(digest);
+ return NULL;
+ }
+ if (lr_checksum_type(algo) == type) {
+ /* found it, do the same as lr_checksum_fd does */
+ checksum = lr_malloc0(sizeof(char) * (len * 2 + 1));
+ for (size_t x = 0; x < len; x++) {
+ sprintf(checksum+(x*2), "%02x", digest[x]);
+ }
+ lr_free(algo);
+ lr_free(digest);
+ return checksum;
+ }
+ lr_free(algo);
+ lr_free(digest);
+ orig_size_algos_len.algos_len--;
+ }
+ return NULL;
+}
+
char *
lr_checksum_fd(LrChecksumType type, int fd, GError **err)
{
@@ -263,9 +360,17 @@ lr_checksum_fd_compare(LrChecksumType type,
}
}
- char *checksum = lr_checksum_fd(type, fd, err);
- if (!checksum)
- return FALSE;
+ char *checksum = lr_checksum_cow_fd(type, fd, err);
+ if (checksum) {
+ // if checksum is found in CoW package, do not cache it in xattr
+ // because looking this up is nearly constant time (cheap) but
+ // is not valid when CoW is not enabled in RPM.
+ caching = FALSE;
+ } else {
+ checksum = lr_checksum_fd(type, fd, err);
+ if (!checksum)
+ return FALSE;
+ }
*matches = (strcmp(expected, checksum)) ? FALSE : TRUE;
diff --git a/librepo/downloader.c b/librepo/downloader.c
index 78f6e4de9..ef07e464c 100644
--- a/librepo/downloader.c
+++ b/librepo/downloader.c
@@ -32,6 +32,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
+#include <sys/wait.h>
#include <sys/xattr.h>
#include <fcntl.h>
#include <curl/curl.h>
@@ -151,6 +152,10 @@ typedef struct {
FILE *f; /*!<
fdopened file descriptor from LrDownloadTarget and used
in curl_handle. */
+ FILE *writef; /*!<
+ the fd to write data to. Could be a subprocess. */
+ pid_t pid; /*!<
+ the pid of a transcoder. */
char errorbuffer[CURL_ERROR_SIZE]; /*!<
Error buffer used in curl handle */
GSList *tried_mirrors; /*!<
@@ -619,7 +624,7 @@ lr_writecb(char *ptr, size_t size, size_t nmemb, void *userdata)
if (range_start <= 0 && range_end <= 0) {
// Write everything curl give to you
target->writecb_recieved += all;
- return fwrite(ptr, size, nmemb, target->f);
+ return fwrite(ptr, size, nmemb, target->writef);
}
/* Deal with situation when user wants only specific byte range of the
@@ -1434,6 +1439,140 @@ open_target_file(LrTarget *target, GError **err)
return f;
}
+/** Maybe transcode the file
+ */
+void
+maybe_transcode(LrTarget *target, GError **err)
+{
+ const char *e = g_getenv("LIBREPO_TRANSCODE_RPMS");
+ int transcoder_stdin[2], fd;
+ pid_t pid;
+ FILE *out;
+ _cleanup_strv_free_ gchar **args = NULL;
+ target->writef = NULL;
+ if (!e) {
+ g_debug("Not transcoding");
+ target->writef = target->f;
+ return;
+ }
+ if (g_str_has_suffix(target->target->path, ".rpm") == FALSE) {
+ g_debug("Not transcoding %s due to name", target->target->path);
+ target->writef = target->f;
+ return;
+ }
+ g_debug("Transcoding %s", target->target->path);
+ args = g_strsplit(e, " ", -1);
+ if (args[0] == NULL) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "transcode env empty");
+ return;
+ }
+ if (pipe(transcoder_stdin) != 0) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "input pipe creation failed: %s",
+ g_strerror(errno));
+ return;
+ }
+ /** librepo collects the 'write' ends of the pipes. We must mark these as
+ * FD_CLOEXEC so a second download/transcode does not inherit them and
+ * hold them open, as it'll prevent an EOF and cause a deadlock.
+ */
+ if (fcntl(transcoder_stdin[1], F_SETFD, FD_CLOEXEC) != 0) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "input pipe write close-on-fork failed: %s",
+ g_strerror(errno));
+ return;
+ }
+ pid = fork();
+ if (pid == -1) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "fork failed: %s",
+ g_strerror(errno));
+ return;
+ }
+ if (pid == 0) {
+ /* child */
+ if (dup2(transcoder_stdin[0], STDIN_FILENO) == -1) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "dup2 of stdin failed: %s",
+ g_strerror(errno));
+ return;
+ }
+ close(transcoder_stdin[0]);
+ close(transcoder_stdin[1]);
+ fd = fileno(target->f);
+ if (fd == -1) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "fileno for target failed");
+ return;
+ }
+ if (dup2(fd, STDOUT_FILENO) == -1) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "dup2 of stdout failed: %s",
+ g_strerror(errno));
+ return;
+ }
+ if (execv(args[0], args) == -1) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "execv failed: %s", g_strerror(errno));
+ }
+ /* we never get here, but appease static analysis */
+ return;
+ } else {
+ /* parent */
+ close(transcoder_stdin[0]);
+ out = fdopen(transcoder_stdin[1], "w");
+ if (out == NULL) {
+ g_set_error(err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "fdopen failed: %s",
+ g_strerror(errno));
+ return;
+ }
+ target->pid = pid;
+ target->writef = out;
+ /* resuming a transcode is not yet implemented */
+ target->resume = FALSE;
+ }
+}
+
+void
+cleanup_transcode(LrTarget *target, GError **transfer_err)
+{
+ /** transfer_err can be NULL if we're using this to clean up a failed
+ * transfer. In that circumstance g_set_error does nothing which is fine,
+ * we don't need to pile on a second failure reason.
+ */
+ int wstatus, trc;
+ if (!target->writef) {
+ return;
+ }
+ if (target->writef == target->f) {
+ return;
+ }
+ fclose(target->writef);
+ if(waitpid(target->pid, &wstatus, 0) == -1) {
+ g_set_error(transfer_err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "transcode waitpid failed: %s", g_strerror(errno));
+ } else if (WIFEXITED(wstatus)) {
+ trc = WEXITSTATUS(wstatus);
+ if (trc != 0) {
+ g_set_error(transfer_err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "transcode process non-zero exit code %d", trc);
+ }
+ } else if (WIFSIGNALED(wstatus)) {
+ g_set_error(transfer_err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "transcode process was terminated with a signal: %d",
+ WTERMSIG(wstatus));
+ } else {
+ /* don't think this can happen, but covering all bases */
+ g_set_error(transfer_err, LR_DOWNLOADER_ERROR, LRE_TRANSCODE,
+ "transcode unhandled circumstance in waitpid");
+ }
+ target->writef = NULL;
+ /* pid is only valid if writef is not NULL */
+ /* target->pid = -1; */
+}
+
/** Prepare next transfer
*/
static gboolean
@@ -1515,6 +1654,9 @@ prepare_next_transfer(LrDownload *dd, gboolean *candidatefound, GError **err)
target->f = open_target_file(target, err);
if (!target->f)
goto fail;
+ maybe_transcode(target, err);
+ if (!target->writef)
+ goto fail;
target->writecb_recieved = 0;
target->writecb_required_range_written = FALSE;
@@ -1690,6 +1832,7 @@ prepare_next_transfer(LrDownload *dd, gboolean *candidatefound, GError **err)
curl_easy_cleanup(target->curl_handle);
target->curl_handle = NULL;
}
+ cleanup_transcode(target, NULL);
if (target->f != NULL) {
fclose(target->f);
target->f = NULL;
@@ -2257,6 +2400,8 @@ check_transfer_statuses(LrDownload *dd, GError **err)
if (!ret) // Error
return FALSE;
+ cleanup_transcode(target, &transfer_err);
+
if (transfer_err) // Transfer was unsuccessful
goto transfer_error;
@@ -2354,6 +2499,7 @@ check_transfer_statuses(LrDownload *dd, GError **err)
target->curl_handle = NULL;
g_free(target->headercb_interrupt_reason);
target->headercb_interrupt_reason = NULL;
+ cleanup_transcode(target, NULL);
fclose(target->f);
target->f = NULL;
if (target->curl_rqheaders) {
@@ -2757,6 +2903,7 @@ lr_download(GSList *targets,
curl_multi_remove_handle(dd.multi_handle, target->curl_handle);
curl_easy_cleanup(target->curl_handle);
target->curl_handle = NULL;
+ cleanup_transcode(target, NULL);
fclose(target->f);
target->f = NULL;
g_free(target->headercb_interrupt_reason);
diff --git a/librepo/rcodes.h b/librepo/rcodes.h
index dcbeb4137..fd65bd604 100644
--- a/librepo/rcodes.h
+++ b/librepo/rcodes.h
@@ -125,6 +125,8 @@ typedef enum {
key/group not found, ...) */
LRE_ZCK, /*!<
(41) Zchunk error (error reading zchunk file, ...) */
+ LRE_TRANSCODE, /*!<
+ (42) Transcode error (env empty, ...) */
LRE_UNKNOWNERROR, /*!<
(xx) unknown error - sentinel of error codes enum */
} LrRc; /*!< Return codes */