From adfce693971c819ab8eaac179da083c41d4bcf6a Mon Nov 28 16:35:09 2022 From: Aleksandr Kazakov Date: Mon, 28 Nov 2022 16:35:09 +0000 Subject: [PATCH] Backport multi-threaded zstd to 4.14.x to support multi-threaded zstd compression on centos 8 Signed-off-by: Aleksandr Kazakov --- configure.ac | 2 +- macros.in | 1 + rpmio/rpmio.c | 82 +++++++++++++++++++++++++++++++++++---------------- 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/configure.ac b/configure.ac index 47327bd..2fa5daf 100644 --- a/configure.ac +++ b/configure.ac @@ -214,7 +214,7 @@ AC_ARG_ENABLE([zstd], [enable_zstd=auto]) AS_IF([test "x$enable_zstd" != "xno"], [ - PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [have_zstd=no]) + PKG_CHECK_MODULES([ZSTD], [libzstd >= 1.5.0], [have_zstd=yes], [have_zstd=no]) AS_IF([test "$enable_zstd" = "yes"], [ if test "$have_zstd" = "no"; then AC_MSG_ERROR([--enable-zstd specified, but not available]) diff --git a/macros.in b/macros.in index 9b9fe23..832b60a 100644 --- a/macros.in +++ b/macros.in @@ -394,6 +394,7 @@ package or when debugging this package.\ # "w9.bzdio" bzip2 level 9. # "w6.xzdio" xz level 6, xz's default. # "w7T16.xzdio" xz level 7 using 16 thread (xz only) +# "w19T8.zstdio" zstd level 19 using 8 threads # "w6.lzdio" lzma-alone level 6, lzma's default # #%_source_payload w9.gzdio diff --git a/rpmio/rpmio.c b/rpmio/rpmio.c index 09b5d02..d030a9c 100644 --- a/rpmio/rpmio.c +++ b/rpmio/rpmio.c @@ -1070,6 +1070,7 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode) char *t = stdio; char *te = t + sizeof(stdio) - 2; int c; + int threads = 0; switch ((c = *s++)) { case 'a': @@ -1098,7 +1099,14 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode) flags &= ~O_ACCMODE; flags |= O_RDWR; continue; - break; + case 'T': + if (*s >= '0' && *s <= '9') { + threads = strtol(s, (char **)&s, 10); + /* T0 means automatic detection */ + if (threads == 0) + threads = sysconf(_SC_NPROCESSORS_ONLN); + } + continue; default: if (c >= (int)'0' && c <= (int)'9') { level = strtol(s-1, (char **)&s, 10); @@ -1132,10 +1140,16 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode) } nb = ZSTD_DStreamInSize(); } else { /* compressing */ - if ((_stream = (void *) ZSTD_createCStream()) == NULL - || ZSTD_isError(ZSTD_initCStream(_stream, level))) { + if ((_stream = (void *) ZSTD_createCCtx()) == NULL + || ZSTD_isError(ZSTD_CCtx_setParameter(_stream, ZSTD_c_compressionLevel, level))) { goto err; } + + rpmlog(RPMLOG_DEBUG, "using %i threads in zstd compression\n", threads); + if (threads > 0) { + if (ZSTD_isError (ZSTD_CCtx_setParameter(_stream, ZSTD_c_nbWorkers, threads))) + rpmlog(RPMLOG_WARNING, "zstd library does not support multi-threading\n"); + } nb = ZSTD_CStreamOutSize(); } @@ -1155,7 +1169,7 @@ err: if ((flags & O_ACCMODE) == O_RDONLY) ZSTD_freeDStream(_stream); else - ZSTD_freeCStream(_stream); + ZSTD_freeCCtx(_stream); return NULL; } @@ -1181,16 +1195,24 @@ assert(zstd); rc = 0; } else { /* compressing */ /* close frame */ - zstd->zob.dst = zstd->b; - zstd->zob.size = zstd->nb; - zstd->zob.pos = 0; - int xx = ZSTD_flushStream(zstd->_stream, &zstd->zob); - if (ZSTD_isError(xx)) - fps->errcookie = ZSTD_getErrorName(xx); - else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) - fps->errcookie = "zstdFlush fwrite failed."; - else - rc = 0; + int xx; + do { + ZSTD_inBuffer zib = { NULL, 0, 0 }; + zstd->zob.dst = zstd->b; + zstd->zob.size = zstd->nb; + zstd->zob.pos = 0; + xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_flush); + if (ZSTD_isError(xx)) { + fps->errcookie = ZSTD_getErrorName(xx); + break; + } + else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) { + fps->errcookie = "zstdClose fwrite failed."; + break; + } + else + rc = 0; + } while (xx != 0); } return rc; } @@ -1235,7 +1257,7 @@ assert(zstd); zstd->zob.pos = 0; /* Compress next chunk. */ - int xx = ZSTD_compressStream(zstd->_stream, &zstd->zob, &zib); + int xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_continue); if (ZSTD_isError(xx)) { fps->errcookie = ZSTD_getErrorName(xx); return -1; @@ -1264,17 +1286,25 @@ assert(zstd); ZSTD_freeDStream(zstd->_stream); } else { /* compressing */ /* close frame */ - zstd->zob.dst = zstd->b; - zstd->zob.size = zstd->nb; - zstd->zob.pos = 0; - int xx = ZSTD_endStream(zstd->_stream, &zstd->zob); - if (ZSTD_isError(xx)) - fps->errcookie = ZSTD_getErrorName(xx); - else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) - fps->errcookie = "zstdClose fwrite failed."; - else - rc = 0; - ZSTD_freeCStream(zstd->_stream); + int xx; + do { + ZSTD_inBuffer zib = { NULL, 0, 0 }; + zstd->zob.dst = zstd->b; + zstd->zob.size = zstd->nb; + zstd->zob.pos = 0; + xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_end); + if (ZSTD_isError(xx)) { + fps->errcookie = ZSTD_getErrorName(xx); + break; + } + else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) { + fps->errcookie = "zstdClose fwrite failed."; + break; + } + else + rc = 0; + } while (xx != 0); + ZSTD_freeCCtx(zstd->_stream); } if (zstd->fp && fileno(zstd->fp) > 2) -- 2.38.1