26ba25
From 0636b876de2af6bb06bbc7ec6dd55a234c591a95 Mon Sep 17 00:00:00 2001
26ba25
From: Kevin Wolf <kwolf@redhat.com>
26ba25
Date: Tue, 26 Jun 2018 09:48:16 +0200
26ba25
Subject: [PATCH 108/268] job: Move single job finalisation to Job
26ba25
26ba25
RH-Author: Kevin Wolf <kwolf@redhat.com>
26ba25
Message-id: <20180626094856.6924-34-kwolf@redhat.com>
26ba25
Patchwork-id: 81070
26ba25
O-Subject: [RHV-7.6 qemu-kvm-rhev PATCH v2 33/73] job: Move single job finalisation to Job
26ba25
Bugzilla: 1513543
26ba25
RH-Acked-by: Jeffrey Cody <jcody@redhat.com>
26ba25
RH-Acked-by: Max Reitz <mreitz@redhat.com>
26ba25
RH-Acked-by: Fam Zheng <famz@redhat.com>
26ba25
26ba25
This moves the finalisation of a single job from BlockJob to Job.
26ba25
26ba25
Some part of this code depends on job transactions, and job transactions
26ba25
call this code, we introduce some temporary calls from Job functions to
26ba25
BlockJob ones. This will be fixed once transactions move to Job, too.
26ba25
26ba25
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
26ba25
Reviewed-by: Max Reitz <mreitz@redhat.com>
26ba25
(cherry picked from commit 4ad351819b974d724e926fd23cdd66bec3c9768e)
26ba25
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
26ba25
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
26ba25
---
26ba25
 block/backup.c               |  22 +++----
26ba25
 block/commit.c               |   2 +-
26ba25
 block/mirror.c               |   2 +-
26ba25
 blockjob.c                   | 142 ++++++++-----------------------------------
26ba25
 include/block/blockjob.h     |   9 ---
26ba25
 include/block/blockjob_int.h |  36 -----------
26ba25
 include/qemu/job.h           |  53 +++++++++++++++-
26ba25
 job.c                        | 100 +++++++++++++++++++++++++++++-
26ba25
 qemu-img.c                   |   2 +-
26ba25
 tests/test-blockjob.c        |  10 +--
26ba25
 10 files changed, 194 insertions(+), 184 deletions(-)
26ba25
26ba25
diff --git a/block/backup.c b/block/backup.c
26ba25
index 4d011d5..bd31282 100644
26ba25
--- a/block/backup.c
26ba25
+++ b/block/backup.c
26ba25
@@ -207,25 +207,25 @@ static void backup_cleanup_sync_bitmap(BackupBlockJob *job, int ret)
26ba25
     }
26ba25
 }
26ba25
 
26ba25
-static void backup_commit(BlockJob *job)
26ba25
+static void backup_commit(Job *job)
26ba25
 {
26ba25
-    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
26ba25
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
26ba25
     if (s->sync_bitmap) {
26ba25
         backup_cleanup_sync_bitmap(s, 0);
26ba25
     }
26ba25
 }
26ba25
 
26ba25
-static void backup_abort(BlockJob *job)
26ba25
+static void backup_abort(Job *job)
26ba25
 {
26ba25
-    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
26ba25
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
26ba25
     if (s->sync_bitmap) {
26ba25
         backup_cleanup_sync_bitmap(s, -1);
26ba25
     }
26ba25
 }
26ba25
 
26ba25
-static void backup_clean(BlockJob *job)
26ba25
+static void backup_clean(Job *job)
26ba25
 {
26ba25
-    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
26ba25
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
26ba25
     assert(s->target);
26ba25
     blk_unref(s->target);
26ba25
     s->target = NULL;
26ba25
@@ -530,10 +530,10 @@ static const BlockJobDriver backup_job_driver = {
26ba25
         .free                   = block_job_free,
26ba25
         .user_resume            = block_job_user_resume,
26ba25
         .start                  = backup_run,
26ba25
+        .commit                 = backup_commit,
26ba25
+        .abort                  = backup_abort,
26ba25
+        .clean                  = backup_clean,
26ba25
     },
26ba25
-    .commit                 = backup_commit,
26ba25
-    .abort                  = backup_abort,
26ba25
-    .clean                  = backup_clean,
26ba25
     .attached_aio_context   = backup_attached_aio_context,
26ba25
     .drain                  = backup_drain,
26ba25
 };
26ba25
@@ -678,8 +678,8 @@ BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs,
26ba25
         bdrv_reclaim_dirty_bitmap(bs, sync_bitmap, NULL);
26ba25
     }
26ba25
     if (job) {
26ba25
-        backup_clean(&job->common);
26ba25
-        block_job_early_fail(&job->common);
26ba25
+        backup_clean(&job->common.job);
26ba25
+        job_early_fail(&job->common.job);
26ba25
     }
26ba25
 
26ba25
     return NULL;
26ba25
diff --git a/block/commit.c b/block/commit.c
26ba25
index 7a6ae59..e53b2d7 100644
26ba25
--- a/block/commit.c
26ba25
+++ b/block/commit.c
26ba25
@@ -385,7 +385,7 @@ fail:
26ba25
     if (commit_top_bs) {
26ba25
         bdrv_replace_node(commit_top_bs, top, &error_abort);
26ba25
     }
26ba25
-    block_job_early_fail(&s->common);
26ba25
+    job_early_fail(&s->common.job);
26ba25
 }
26ba25
 
26ba25
 
26ba25
diff --git a/block/mirror.c b/block/mirror.c
26ba25
index 5091e72..e9a90ea 100644
26ba25
--- a/block/mirror.c
26ba25
+++ b/block/mirror.c
26ba25
@@ -1257,7 +1257,7 @@ fail:
26ba25
 
26ba25
         g_free(s->replaces);
26ba25
         blk_unref(s->target);
26ba25
-        block_job_early_fail(&s->common);
26ba25
+        job_early_fail(&s->common.job);
26ba25
     }
26ba25
 
26ba25
     bdrv_child_try_set_perm(mirror_top_bs->backing, 0, BLK_PERM_ALL,
26ba25
diff --git a/blockjob.c b/blockjob.c
26ba25
index 05d7921..34c57da 100644
26ba25
--- a/blockjob.c
26ba25
+++ b/blockjob.c
26ba25
@@ -127,7 +127,7 @@ void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
26ba25
     block_job_txn_ref(txn);
26ba25
 }
26ba25
 
26ba25
-static void block_job_txn_del_job(BlockJob *job)
26ba25
+void block_job_txn_del_job(BlockJob *job)
26ba25
 {
26ba25
     if (job->txn) {
26ba25
         QLIST_REMOVE(job, txn_list);
26ba25
@@ -262,101 +262,12 @@ const BlockJobDriver *block_job_driver(BlockJob *job)
26ba25
     return job->driver;
26ba25
 }
26ba25
 
26ba25
-static void block_job_decommission(BlockJob *job)
26ba25
-{
26ba25
-    assert(job);
26ba25
-    job->job.busy = false;
26ba25
-    job->job.paused = false;
26ba25
-    job->job.deferred_to_main_loop = true;
26ba25
-    block_job_txn_del_job(job);
26ba25
-    job_state_transition(&job->job, JOB_STATUS_NULL);
26ba25
-    job_unref(&job->job);
26ba25
-}
26ba25
-
26ba25
-static void block_job_do_dismiss(BlockJob *job)
26ba25
-{
26ba25
-    block_job_decommission(job);
26ba25
-}
26ba25
-
26ba25
-static void block_job_conclude(BlockJob *job)
26ba25
-{
26ba25
-    job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
26ba25
-    if (job->job.auto_dismiss || !job_started(&job->job)) {
26ba25
-        block_job_do_dismiss(job);
26ba25
-    }
26ba25
-}
26ba25
-
26ba25
-static void block_job_update_rc(BlockJob *job)
26ba25
-{
26ba25
-    if (!job->ret && job_is_cancelled(&job->job)) {
26ba25
-        job->ret = -ECANCELED;
26ba25
-    }
26ba25
-    if (job->ret) {
26ba25
-        job_state_transition(&job->job, JOB_STATUS_ABORTING);
26ba25
-    }
26ba25
-}
26ba25
-
26ba25
 static int block_job_prepare(BlockJob *job)
26ba25
 {
26ba25
-    if (job->ret == 0 && job->driver->prepare) {
26ba25
-        job->ret = job->driver->prepare(job);
26ba25
-    }
26ba25
-    return job->ret;
26ba25
-}
26ba25
-
26ba25
-static void block_job_commit(BlockJob *job)
26ba25
-{
26ba25
-    assert(!job->ret);
26ba25
-    if (job->driver->commit) {
26ba25
-        job->driver->commit(job);
26ba25
-    }
26ba25
-}
26ba25
-
26ba25
-static void block_job_abort(BlockJob *job)
26ba25
-{
26ba25
-    assert(job->ret);
26ba25
-    if (job->driver->abort) {
26ba25
-        job->driver->abort(job);
26ba25
-    }
26ba25
-}
26ba25
-
26ba25
-static void block_job_clean(BlockJob *job)
26ba25
-{
26ba25
-    if (job->driver->clean) {
26ba25
-        job->driver->clean(job);
26ba25
+    if (job->job.ret == 0 && job->driver->prepare) {
26ba25
+        job->job.ret = job->driver->prepare(job);
26ba25
     }
26ba25
-}
26ba25
-
26ba25
-static int block_job_finalize_single(BlockJob *job)
26ba25
-{
26ba25
-    assert(job_is_completed(&job->job));
26ba25
-
26ba25
-    /* Ensure abort is called for late-transactional failures */
26ba25
-    block_job_update_rc(job);
26ba25
-
26ba25
-    if (!job->ret) {
26ba25
-        block_job_commit(job);
26ba25
-    } else {
26ba25
-        block_job_abort(job);
26ba25
-    }
26ba25
-    block_job_clean(job);
26ba25
-
26ba25
-    if (job->cb) {
26ba25
-        job->cb(job->opaque, job->ret);
26ba25
-    }
26ba25
-
26ba25
-    /* Emit events only if we actually started */
26ba25
-    if (job_started(&job->job)) {
26ba25
-        if (job_is_cancelled(&job->job)) {
26ba25
-            job_event_cancelled(&job->job);
26ba25
-        } else {
26ba25
-            job_event_completed(&job->job);
26ba25
-        }
26ba25
-    }
26ba25
-
26ba25
-    block_job_txn_del_job(job);
26ba25
-    block_job_conclude(job);
26ba25
-    return 0;
26ba25
+    return job->job.ret;
26ba25
 }
26ba25
 
26ba25
 static void block_job_cancel_async(BlockJob *job, bool force)
26ba25
@@ -424,8 +335,8 @@ static int block_job_finish_sync(BlockJob *job,
26ba25
     while (!job_is_completed(&job->job)) {
26ba25
         aio_poll(qemu_get_aio_context(), true);
26ba25
     }
26ba25
-    ret = (job_is_cancelled(&job->job) && job->ret == 0)
26ba25
-          ? -ECANCELED : job->ret;
26ba25
+    ret = (job_is_cancelled(&job->job) && job->job.ret == 0)
26ba25
+          ? -ECANCELED : job->job.ret;
26ba25
     job_unref(&job->job);
26ba25
     return ret;
26ba25
 }
26ba25
@@ -466,7 +377,7 @@ static void block_job_completed_txn_abort(BlockJob *job)
26ba25
             assert(job_is_cancelled(&other_job->job));
26ba25
             block_job_finish_sync(other_job, NULL, NULL);
26ba25
         }
26ba25
-        block_job_finalize_single(other_job);
26ba25
+        job_finalize_single(&other_job->job);
26ba25
         aio_context_release(ctx);
26ba25
     }
26ba25
 
26ba25
@@ -478,6 +389,11 @@ static int block_job_needs_finalize(BlockJob *job)
26ba25
     return !job->job.auto_finalize;
26ba25
 }
26ba25
 
26ba25
+static int block_job_finalize_single(BlockJob *job)
26ba25
+{
26ba25
+    return job_finalize_single(&job->job);
26ba25
+}
26ba25
+
26ba25
 static void block_job_do_finalize(BlockJob *job)
26ba25
 {
26ba25
     int rc;
26ba25
@@ -516,7 +432,7 @@ static void block_job_completed_txn_success(BlockJob *job)
26ba25
         if (!job_is_completed(&other_job->job)) {
26ba25
             return;
26ba25
         }
26ba25
-        assert(other_job->ret == 0);
26ba25
+        assert(other_job->job.ret == 0);
26ba25
     }
26ba25
 
26ba25
     block_job_txn_apply(txn, block_job_transition_to_pending, false);
26ba25
@@ -601,14 +517,14 @@ void block_job_dismiss(BlockJob **jobptr, Error **errp)
26ba25
         return;
26ba25
     }
26ba25
 
26ba25
-    block_job_do_dismiss(job);
26ba25
+    job_do_dismiss(&job->job);
26ba25
     *jobptr = NULL;
26ba25
 }
26ba25
 
26ba25
 void block_job_cancel(BlockJob *job, bool force)
26ba25
 {
26ba25
     if (job->job.status == JOB_STATUS_CONCLUDED) {
26ba25
-        block_job_do_dismiss(job);
26ba25
+        job_do_dismiss(&job->job);
26ba25
         return;
26ba25
     }
26ba25
     block_job_cancel_async(job, force);
26ba25
@@ -691,8 +607,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
26ba25
     info->status    = job->job.status;
26ba25
     info->auto_finalize = job->job.auto_finalize;
26ba25
     info->auto_dismiss  = job->job.auto_dismiss;
26ba25
-    info->has_error = job->ret != 0;
26ba25
-    info->error     = job->ret ? g_strdup(strerror(-job->ret)) : NULL;
26ba25
+    info->has_error = job->job.ret != 0;
26ba25
+    info->error     = job->job.ret ? g_strdup(strerror(-job->job.ret)) : NULL;
26ba25
     return info;
26ba25
 }
26ba25
 
26ba25
@@ -729,8 +645,8 @@ static void block_job_event_completed(Notifier *n, void *opaque)
26ba25
         return;
26ba25
     }
26ba25
 
26ba25
-    if (job->ret < 0) {
26ba25
-        msg = strerror(-job->ret);
26ba25
+    if (job->job.ret < 0) {
26ba25
+        msg = strerror(-job->job.ret);
26ba25
     }
26ba25
 
26ba25
     qapi_event_send_block_job_completed(job_type(&job->job),
26ba25
@@ -787,7 +703,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
26ba25
     }
26ba25
 
26ba25
     job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk),
26ba25
-                     flags, errp);
26ba25
+                     flags, cb, opaque, errp);
26ba25
     if (job == NULL) {
26ba25
         blk_unref(blk);
26ba25
         return NULL;
26ba25
@@ -799,8 +715,6 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
26ba25
 
26ba25
     job->driver        = driver;
26ba25
     job->blk           = blk;
26ba25
-    job->cb            = cb;
26ba25
-    job->opaque        = opaque;
26ba25
 
26ba25
     job->finalize_cancelled_notifier.notify = block_job_event_cancelled;
26ba25
     job->finalize_completed_notifier.notify = block_job_event_completed;
26ba25
@@ -828,7 +742,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
26ba25
 
26ba25
         block_job_set_speed(job, speed, &local_err);
26ba25
         if (local_err) {
26ba25
-            block_job_early_fail(job);
26ba25
+            job_early_fail(&job->job);
26ba25
             error_propagate(errp, local_err);
26ba25
             return NULL;
26ba25
         }
26ba25
@@ -847,20 +761,14 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
26ba25
     return job;
26ba25
 }
26ba25
 
26ba25
-void block_job_early_fail(BlockJob *job)
26ba25
-{
26ba25
-    assert(job->job.status == JOB_STATUS_CREATED);
26ba25
-    block_job_decommission(job);
26ba25
-}
26ba25
-
26ba25
 void block_job_completed(BlockJob *job, int ret)
26ba25
 {
26ba25
     assert(job && job->txn && !job_is_completed(&job->job));
26ba25
     assert(blk_bs(job->blk)->job == job);
26ba25
-    job->ret = ret;
26ba25
-    block_job_update_rc(job);
26ba25
-    trace_block_job_completed(job, ret, job->ret);
26ba25
-    if (job->ret) {
26ba25
+    job->job.ret = ret;
26ba25
+    job_update_rc(&job->job);
26ba25
+    trace_block_job_completed(job, ret, job->job.ret);
26ba25
+    if (job->job.ret) {
26ba25
         block_job_completed_txn_abort(job);
26ba25
     } else {
26ba25
         block_job_completed_txn_success(job);
26ba25
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
26ba25
index aef0629..3f405d1 100644
26ba25
--- a/include/block/blockjob.h
26ba25
+++ b/include/block/blockjob.h
26ba25
@@ -76,9 +76,6 @@ typedef struct BlockJob {
26ba25
     /** Rate limiting data structure for implementing @speed. */
26ba25
     RateLimit limit;
26ba25
 
26ba25
-    /** The completion function that will be called when the job completes.  */
26ba25
-    BlockCompletionFunc *cb;
26ba25
-
26ba25
     /** Block other operations when block job is running */
26ba25
     Error *blocker;
26ba25
 
26ba25
@@ -94,12 +91,6 @@ typedef struct BlockJob {
26ba25
     /** BlockDriverStates that are involved in this block job */
26ba25
     GSList *nodes;
26ba25
 
26ba25
-    /** The opaque value that is passed to the completion function.  */
26ba25
-    void *opaque;
26ba25
-
26ba25
-    /** ret code passed to block_job_completed. */
26ba25
-    int ret;
26ba25
-
26ba25
     BlockJobTxn *txn;
26ba25
     QLIST_ENTRY(BlockJob) txn_list;
26ba25
 } BlockJob;
26ba25
diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
26ba25
index 88639f7..bf2b762 100644
26ba25
--- a/include/block/blockjob_int.h
26ba25
+++ b/include/block/blockjob_int.h
26ba25
@@ -54,34 +54,6 @@ struct BlockJobDriver {
26ba25
      */
26ba25
     int (*prepare)(BlockJob *job);
26ba25
 
26ba25
-    /**
26ba25
-     * If the callback is not NULL, it will be invoked when all the jobs
26ba25
-     * belonging to the same transaction complete; or upon this job's
26ba25
-     * completion if it is not in a transaction. Skipped if NULL.
26ba25
-     *
26ba25
-     * All jobs will complete with a call to either .commit() or .abort() but
26ba25
-     * never both.
26ba25
-     */
26ba25
-    void (*commit)(BlockJob *job);
26ba25
-
26ba25
-    /**
26ba25
-     * If the callback is not NULL, it will be invoked when any job in the
26ba25
-     * same transaction fails; or upon this job's failure (due to error or
26ba25
-     * cancellation) if it is not in a transaction. Skipped if NULL.
26ba25
-     *
26ba25
-     * All jobs will complete with a call to either .commit() or .abort() but
26ba25
-     * never both.
26ba25
-     */
26ba25
-    void (*abort)(BlockJob *job);
26ba25
-
26ba25
-    /**
26ba25
-     * If the callback is not NULL, it will be invoked after a call to either
26ba25
-     * .commit() or .abort(). Regardless of which callback is invoked after
26ba25
-     * completion, .clean() will always be called, even if the job does not
26ba25
-     * belong to a transaction group.
26ba25
-     */
26ba25
-    void (*clean)(BlockJob *job);
26ba25
-
26ba25
     /*
26ba25
      * If the callback is not NULL, it will be invoked before the job is
26ba25
      * resumed in a new AioContext.  This is the place to move any resources
26ba25
@@ -156,14 +128,6 @@ void block_job_yield(BlockJob *job);
26ba25
 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n);
26ba25
 
26ba25
 /**
26ba25
- * block_job_early_fail:
26ba25
- * @bs: The block device.
26ba25
- *
26ba25
- * The block job could not be started, free it.
26ba25
- */
26ba25
-void block_job_early_fail(BlockJob *job);
26ba25
-
26ba25
-/**
26ba25
  * block_job_completed:
26ba25
  * @job: The job being completed.
26ba25
  * @ret: The status code.
26ba25
diff --git a/include/qemu/job.h b/include/qemu/job.h
26ba25
index 14d9377..3e817be 100644
26ba25
--- a/include/qemu/job.h
26ba25
+++ b/include/qemu/job.h
26ba25
@@ -29,6 +29,7 @@
26ba25
 #include "qapi/qapi-types-block-core.h"
26ba25
 #include "qemu/queue.h"
26ba25
 #include "qemu/coroutine.h"
26ba25
+#include "block/aio.h"
26ba25
 
26ba25
 typedef struct JobDriver JobDriver;
26ba25
 
26ba25
@@ -105,6 +106,15 @@ typedef struct Job {
26ba25
     /** True if this job should automatically dismiss itself */
26ba25
     bool auto_dismiss;
26ba25
 
26ba25
+    /** ret code passed to block_job_completed. */
26ba25
+    int ret;
26ba25
+
26ba25
+    /** The completion function that will be called when the job completes.  */
26ba25
+    BlockCompletionFunc *cb;
26ba25
+
26ba25
+    /** The opaque value that is passed to the completion function.  */
26ba25
+    void *opaque;
26ba25
+
26ba25
     /** Notifiers called when a cancelled job is finalised */
26ba25
     NotifierList on_finalize_cancelled;
26ba25
 
26ba25
@@ -151,6 +161,35 @@ struct JobDriver {
26ba25
      */
26ba25
     void (*user_resume)(Job *job);
26ba25
 
26ba25
+    /**
26ba25
+     * If the callback is not NULL, it will be invoked when all the jobs
26ba25
+     * belonging to the same transaction complete; or upon this job's
26ba25
+     * completion if it is not in a transaction. Skipped if NULL.
26ba25
+     *
26ba25
+     * All jobs will complete with a call to either .commit() or .abort() but
26ba25
+     * never both.
26ba25
+     */
26ba25
+    void (*commit)(Job *job);
26ba25
+
26ba25
+    /**
26ba25
+     * If the callback is not NULL, it will be invoked when any job in the
26ba25
+     * same transaction fails; or upon this job's failure (due to error or
26ba25
+     * cancellation) if it is not in a transaction. Skipped if NULL.
26ba25
+     *
26ba25
+     * All jobs will complete with a call to either .commit() or .abort() but
26ba25
+     * never both.
26ba25
+     */
26ba25
+    void (*abort)(Job *job);
26ba25
+
26ba25
+    /**
26ba25
+     * If the callback is not NULL, it will be invoked after a call to either
26ba25
+     * .commit() or .abort(). Regardless of which callback is invoked after
26ba25
+     * completion, .clean() will always be called, even if the job does not
26ba25
+     * belong to a transaction group.
26ba25
+     */
26ba25
+    void (*clean)(Job *job);
26ba25
+
26ba25
+
26ba25
     /** Called when the job is freed */
26ba25
     void (*free)(Job *job);
26ba25
 };
26ba25
@@ -174,10 +213,12 @@ typedef enum JobCreateFlags {
26ba25
  * @driver: The class object for the newly-created job.
26ba25
  * @ctx: The AioContext to run the job coroutine in.
26ba25
  * @flags: Creation flags for the job. See @JobCreateFlags.
26ba25
+ * @cb: Completion function for the job.
26ba25
+ * @opaque: Opaque pointer value passed to @cb.
26ba25
  * @errp: Error object.
26ba25
  */
26ba25
 void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
26ba25
-                 int flags, Error **errp);
26ba25
+                 int flags, BlockCompletionFunc *cb, void *opaque, Error **errp);
26ba25
 
26ba25
 /**
26ba25
  * Add a reference to Job refcnt, it will be decreased with job_unref, and then
26ba25
@@ -300,6 +341,10 @@ Job *job_get(const char *id);
26ba25
  */
26ba25
 int job_apply_verb(Job *job, JobVerb verb, Error **errp);
26ba25
 
26ba25
+/** The @job could not be started, free it. */
26ba25
+void job_early_fail(Job *job);
26ba25
+
26ba25
+
26ba25
 typedef void JobDeferToMainLoopFn(Job *job, void *opaque);
26ba25
 
26ba25
 /**
26ba25
@@ -322,5 +367,11 @@ void job_state_transition(Job *job, JobStatus s1);
26ba25
 void coroutine_fn job_do_yield(Job *job, uint64_t ns);
26ba25
 bool job_should_pause(Job *job);
26ba25
 bool job_started(Job *job);
26ba25
+void job_do_dismiss(Job *job);
26ba25
+int job_finalize_single(Job *job);
26ba25
+void job_update_rc(Job *job);
26ba25
+
26ba25
+typedef struct BlockJob BlockJob;
26ba25
+void block_job_txn_del_job(BlockJob *job);
26ba25
 
26ba25
 #endif
26ba25
diff --git a/job.c b/job.c
26ba25
index 817c3b4..64b64da 100644
26ba25
--- a/job.c
26ba25
+++ b/job.c
26ba25
@@ -85,7 +85,7 @@ void job_state_transition(Job *job, JobStatus s1)
26ba25
 {
26ba25
     JobStatus s0 = job->status;
26ba25
     assert(s1 >= 0 && s1 <= JOB_STATUS__MAX);
26ba25
-    trace_job_state_transition(job, /* TODO re-enable: job->ret */ 0,
26ba25
+    trace_job_state_transition(job, job->ret,
26ba25
                                JobSTT[s0][s1] ? "allowed" : "disallowed",
26ba25
                                JobStatus_str(s0), JobStatus_str(s1));
26ba25
     assert(JobSTT[s0][s1]);
26ba25
@@ -182,7 +182,7 @@ static void job_sleep_timer_cb(void *opaque)
26ba25
 }
26ba25
 
26ba25
 void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
26ba25
-                 int flags, Error **errp)
26ba25
+                 int flags, BlockCompletionFunc *cb, void *opaque, Error **errp)
26ba25
 {
26ba25
     Job *job;
26ba25
 
26ba25
@@ -214,6 +214,8 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
26ba25
     job->pause_count   = 1;
26ba25
     job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
26ba25
     job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
26ba25
+    job->cb            = cb;
26ba25
+    job->opaque        = opaque;
26ba25
 
26ba25
     notifier_list_init(&job->on_finalize_cancelled);
26ba25
     notifier_list_init(&job->on_finalize_completed);
26ba25
@@ -449,6 +451,100 @@ void job_user_resume(Job *job, Error **errp)
26ba25
     job_resume(job);
26ba25
 }
26ba25
 
26ba25
+void job_do_dismiss(Job *job)
26ba25
+{
26ba25
+    assert(job);
26ba25
+    job->busy = false;
26ba25
+    job->paused = false;
26ba25
+    job->deferred_to_main_loop = true;
26ba25
+
26ba25
+    /* TODO Don't assume it's a BlockJob */
26ba25
+    block_job_txn_del_job((BlockJob*) job);
26ba25
+
26ba25
+    job_state_transition(job, JOB_STATUS_NULL);
26ba25
+    job_unref(job);
26ba25
+}
26ba25
+
26ba25
+void job_early_fail(Job *job)
26ba25
+{
26ba25
+    assert(job->status == JOB_STATUS_CREATED);
26ba25
+    job_do_dismiss(job);
26ba25
+}
26ba25
+
26ba25
+static void job_conclude(Job *job)
26ba25
+{
26ba25
+    job_state_transition(job, JOB_STATUS_CONCLUDED);
26ba25
+    if (job->auto_dismiss || !job_started(job)) {
26ba25
+        job_do_dismiss(job);
26ba25
+    }
26ba25
+}
26ba25
+
26ba25
+void job_update_rc(Job *job)
26ba25
+{
26ba25
+    if (!job->ret && job_is_cancelled(job)) {
26ba25
+        job->ret = -ECANCELED;
26ba25
+    }
26ba25
+    if (job->ret) {
26ba25
+        job_state_transition(job, JOB_STATUS_ABORTING);
26ba25
+    }
26ba25
+}
26ba25
+
26ba25
+static void job_commit(Job *job)
26ba25
+{
26ba25
+    assert(!job->ret);
26ba25
+    if (job->driver->commit) {
26ba25
+        job->driver->commit(job);
26ba25
+    }
26ba25
+}
26ba25
+
26ba25
+static void job_abort(Job *job)
26ba25
+{
26ba25
+    assert(job->ret);
26ba25
+    if (job->driver->abort) {
26ba25
+        job->driver->abort(job);
26ba25
+    }
26ba25
+}
26ba25
+
26ba25
+static void job_clean(Job *job)
26ba25
+{
26ba25
+    if (job->driver->clean) {
26ba25
+        job->driver->clean(job);
26ba25
+    }
26ba25
+}
26ba25
+
26ba25
+int job_finalize_single(Job *job)
26ba25
+{
26ba25
+    assert(job_is_completed(job));
26ba25
+
26ba25
+    /* Ensure abort is called for late-transactional failures */
26ba25
+    job_update_rc(job);
26ba25
+
26ba25
+    if (!job->ret) {
26ba25
+        job_commit(job);
26ba25
+    } else {
26ba25
+        job_abort(job);
26ba25
+    }
26ba25
+    job_clean(job);
26ba25
+
26ba25
+    if (job->cb) {
26ba25
+        job->cb(job->opaque, job->ret);
26ba25
+    }
26ba25
+
26ba25
+    /* Emit events only if we actually started */
26ba25
+    if (job_started(job)) {
26ba25
+        if (job_is_cancelled(job)) {
26ba25
+            job_event_cancelled(job);
26ba25
+        } else {
26ba25
+            job_event_completed(job);
26ba25
+        }
26ba25
+    }
26ba25
+
26ba25
+    /* TODO Don't assume it's a BlockJob */
26ba25
+    block_job_txn_del_job((BlockJob*) job);
26ba25
+    job_conclude(job);
26ba25
+    return 0;
26ba25
+}
26ba25
+
26ba25
 
26ba25
 typedef struct {
26ba25
     Job *job;
26ba25
diff --git a/qemu-img.c b/qemu-img.c
26ba25
index 843dc6a..91b3151 100644
26ba25
--- a/qemu-img.c
26ba25
+++ b/qemu-img.c
26ba25
@@ -883,7 +883,7 @@ static void run_block_job(BlockJob *job, Error **errp)
26ba25
     if (!job_is_completed(&job->job)) {
26ba25
         ret = block_job_complete_sync(job, errp);
26ba25
     } else {
26ba25
-        ret = job->ret;
26ba25
+        ret = job->job.ret;
26ba25
     }
26ba25
     job_unref(&job->job);
26ba25
     aio_context_release(aio_context);
26ba25
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
26ba25
index 8bb0aa8..1fe6803 100644
26ba25
--- a/tests/test-blockjob.c
26ba25
+++ b/tests/test-blockjob.c
26ba25
@@ -128,11 +128,11 @@ static void test_job_ids(void)
26ba25
     job[1] = do_test_id(blk[1], "id0", false);
26ba25
 
26ba25
     /* But once job[0] finishes we can reuse its ID */
26ba25
-    block_job_early_fail(job[0]);
26ba25
+    job_early_fail(&job[0]->job);
26ba25
     job[1] = do_test_id(blk[1], "id0", true);
26ba25
 
26ba25
     /* No job ID specified, defaults to the backend name ('drive1') */
26ba25
-    block_job_early_fail(job[1]);
26ba25
+    job_early_fail(&job[1]->job);
26ba25
     job[1] = do_test_id(blk[1], NULL, true);
26ba25
 
26ba25
     /* Duplicate job ID */
26ba25
@@ -145,9 +145,9 @@ static void test_job_ids(void)
26ba25
     /* This one is valid */
26ba25
     job[2] = do_test_id(blk[2], "id_2", true);
26ba25
 
26ba25
-    block_job_early_fail(job[0]);
26ba25
-    block_job_early_fail(job[1]);
26ba25
-    block_job_early_fail(job[2]);
26ba25
+    job_early_fail(&job[0]->job);
26ba25
+    job_early_fail(&job[1]->job);
26ba25
+    job_early_fail(&job[2]->job);
26ba25
 
26ba25
     destroy_blk(blk[0]);
26ba25
     destroy_blk(blk[1]);
26ba25
-- 
26ba25
1.8.3.1
26ba25