[Libguestfs] [PATCH virt-v2v 1/2] v2v: -o rhv-upload: Split plugin functions from create/finalize

Richard W.M. Jones rjones at redhat.com
Tue Aug 3 12:10:01 UTC 2021


The existing plugin created a new disk in open() and tried to finalize
it in close().  A correct NBD client may make several connections
(eg. connecting just to query for multi-conn or other metadata).  This
could result in multiple disks being created.

Let's fix this by using Nir's suggestion to split out the create and
finalize functions.  The nbdkit plugin becomes a much more
straightforward plugin - it no longer depends on the oVirt SDK at all
so could in theory be rewritten in another programming language.  The
other functions have been moved into new scripts ("transfer",
"finalize", "cancel") which are run separately.

Some subtler points about this commit:

 - We no longer use or need the @failing decorator.  I have extended
   the cleanup code so as well as deleting disks it also cancels the
   transfers.  Any failure detected by qemu-img convert will result in
   virt-v2v exiting and cancelling the transfer, which is much cleaner
   and more robust.

 - We still require the HTTP pool.  The reasons is a bit subtle: Since
   we are using the parallel thread model, pread and pwrite (etc) can
   be called at any time, even overlapping, so storing an HTTP handle
   per NBD connection does not work as the handle gets reused (which
   results in a fatal error).

 - In the plugin we now use the standard nbdkit_debug (Python
   nbdkit.debug) function instead of writing debug to stderr.

 - This is not noticably any slower or faster than before.

Thanks: Nir Soffer
---
 v2v/Makefile.am                               |  22 +-
 v2v/output_rhv_upload.ml                      |  79 ++-
 ...li => output_rhv_upload_cancel_source.mli} |   0
 v2v/output_rhv_upload_finalize_source.mli     |  19 +
 v2v/output_rhv_upload_transfer_source.mli     |  19 +
 ...ad-deletedisks.py => rhv-upload-cancel.py} |  19 +-
 v2v/rhv-upload-finalize.py                    | 168 +++++
 v2v/rhv-upload-plugin.py                      | 618 +++---------------
 v2v/rhv-upload-transfer.py                    | 278 ++++++++
 9 files changed, 655 insertions(+), 567 deletions(-)

diff --git a/v2v/Makefile.am b/v2v/Makefile.am
index 58656de87..dbb75dc55 100644
--- a/v2v/Makefile.am
+++ b/v2v/Makefile.am
@@ -18,10 +18,12 @@
 include $(top_srcdir)/subdir-rules.mk
 
 BUILT_SOURCES = \
+	output_rhv_upload_cancel_source.ml \
 	output_rhv_upload_createvm_source.ml \
-	output_rhv_upload_deletedisks_source.ml \
+	output_rhv_upload_finalize_source.ml \
 	output_rhv_upload_plugin_source.ml \
 	output_rhv_upload_precheck_source.ml \
+	output_rhv_upload_transfer_source.ml \
 	output_rhv_upload_vmcheck_source.ml
 
 CONFIGURE_GENERATED_ML = \
@@ -32,10 +34,12 @@ EXTRA_DIST = \
 	$(filter-out $(CONFIGURE_GENERATED_ML) $(BUILT_SOURCES),$(SOURCES_ML)) \
 	$(SOURCES_C) \
 	embed.sh \
+	rhv-upload-cancel.py \
 	rhv-upload-createvm.py \
-	rhv-upload-deletedisks.py \
+	rhv-upload-finalize.py \
 	rhv-upload-plugin.py \
 	rhv-upload-precheck.py \
+	rhv-upload-transfer.py \
 	rhv-upload-vmcheck.py \
 	test-v2v-python-syntax.sh \
 	var_expander_tests.ml \
@@ -82,10 +86,12 @@ SOURCES_MLI = \
 	output_qemu.mli \
 	output_rhv.mli \
 	output_rhv_upload.mli \
+	output_rhv_upload_cancel_source.mli \
 	output_rhv_upload_createvm_source.mli \
-	output_rhv_upload_deletedisks_source.mli \
+	output_rhv_upload_finalize_source.mli \
 	output_rhv_upload_plugin_source.mli \
 	output_rhv_upload_precheck_source.mli \
+	output_rhv_upload_transfer_source.mli \
 	output_rhv_upload_vmcheck_source.mli \
 	output_vdsm.mli \
 	parse_ova.mli \
@@ -151,10 +157,12 @@ SOURCES_ML = \
 	output_local.ml \
 	output_qemu.ml \
 	output_rhv.ml \
+	output_rhv_upload_cancel_source.ml \
 	output_rhv_upload_createvm_source.ml \
-	output_rhv_upload_deletedisks_source.ml \
+	output_rhv_upload_finalize_source.ml \
 	output_rhv_upload_plugin_source.ml \
 	output_rhv_upload_precheck_source.ml \
+	output_rhv_upload_transfer_source.ml \
 	output_rhv_upload_vmcheck_source.ml \
 	output_rhv_upload.ml \
 	output_vdsm.ml \
@@ -170,14 +178,18 @@ SOURCES_C = \
 	qemuopts-c.c
 
 # These files are generated and contain *.py embedded as an OCaml string.
+output_rhv_upload_cancel_source.ml: $(srcdir)/rhv-upload-cancel.py
+	$(srcdir)/embed.sh code $^ $@
 output_rhv_upload_createvm_source.ml: $(srcdir)/rhv-upload-createvm.py
 	$(srcdir)/embed.sh code $^ $@
-output_rhv_upload_deletedisks_source.ml: $(srcdir)/rhv-upload-deletedisks.py
+output_rhv_upload_finalize_source.ml: $(srcdir)/rhv-upload-finalize.py
 	$(srcdir)/embed.sh code $^ $@
 output_rhv_upload_plugin_source.ml: $(srcdir)/rhv-upload-plugin.py
 	$(srcdir)/embed.sh code $^ $@
 output_rhv_upload_precheck_source.ml: $(srcdir)/rhv-upload-precheck.py
 	$(srcdir)/embed.sh code $^ $@
+output_rhv_upload_transfer_source.ml: $(srcdir)/rhv-upload-transfer.py
+	$(srcdir)/embed.sh code $^ $@
 output_rhv_upload_vmcheck_source.ml: $(srcdir)/rhv-upload-vmcheck.py
 	$(srcdir)/embed.sh code $^ $@
 
diff --git a/v2v/output_rhv_upload.ml b/v2v/output_rhv_upload.ml
index d44301ce2..b99050dd2 100644
--- a/v2v/output_rhv_upload.ml
+++ b/v2v/output_rhv_upload.ml
@@ -169,12 +169,18 @@ class output_rhv_upload output_conn
   let plugin_script =
     Python_script.create ~name:"rhv-upload-plugin.py"
       Output_rhv_upload_plugin_source.code in
+  let transfer_script =
+    Python_script.create ~name:"rhv-upload-transfer.py"
+      Output_rhv_upload_transfer_source.code in
+  let finalize_script =
+    Python_script.create ~name:"rhv-upload-finalize.py"
+      Output_rhv_upload_finalize_source.code in
+  let cancel_script =
+    Python_script.create ~name:"rhv-upload-cancel.py"
+      Output_rhv_upload_cancel_source.code in
   let createvm_script =
     Python_script.create ~name:"rhv-upload-createvm.py"
       Output_rhv_upload_createvm_source.code in
-  let deletedisks_script =
-    Python_script.create ~name:"rhv-upload-deletedisks.py"
-      Output_rhv_upload_deletedisks_source.code in
 
   (* JSON parameters which are invariant between disks. *)
   let json_params = [
@@ -214,16 +220,17 @@ class output_rhv_upload output_conn
     else
       nbdkit_cmd in
 
-  (* Delete disks.
+  (* Cancel the transfer and delete disks.
    *
    * This ignores errors since the only time we are doing this is on
    * the failure path.
    *)
-  let delete_disks uuids =
-    let ids = List.map (fun uuid -> JSON.String uuid) uuids in
-    let json_params =
-      ("disk_uuids", JSON.List ids) :: json_params in
-    ignore (Python_script.run_command deletedisks_script json_params [])
+  let cancel transfer_ids disk_uuids =
+    let ids = List.map (fun id -> JSON.String id) transfer_ids in
+    let json_params = ("transfer_ids", JSON.List ids) :: json_params in
+    let ids = List.map (fun uuid -> JSON.String uuid) disk_uuids in
+    let json_params = ("disk_uuids", JSON.List ids) :: json_params in
+    ignore (Python_script.run_command cancel_script json_params [])
   in
 
 object
@@ -237,8 +244,10 @@ object
   val mutable rhv_cluster_cpu_architecture = None
   (* List of disk UUIDs. *)
   val mutable disk_uuids = []
-  (* If we didn't finish successfully, delete on exit. *)
-  val mutable delete_disks_on_exit = true
+  (* List of transfer IDs. *)
+  val mutable transfer_ids = []
+  (* If we didn't finish successfully, cancel and delete disks on exit. *)
+  val mutable cancel_on_exit = true
 
   method precheck () =
     Python_script.error_unless_python_interpreter_found ();
@@ -320,9 +329,9 @@ object
     (* Set up an at-exit handler so we delete the orphan disks on failure. *)
     at_exit (
       fun () ->
-        if delete_disks_on_exit then (
+        if cancel_on_exit then (
           if disk_uuids <> [] then
-            delete_disks disk_uuids
+            cancel transfer_ids disk_uuids
         )
     );
 
@@ -359,8 +368,36 @@ object
           json_param_file
           (fun chan -> output_string chan (JSON.string_of_doc json_params));
 
-        (* Add common arguments to per-target arguments. *)
-        let cmd = Nbdkit.add_arg nbdkit_cmd "params" json_param_file in
+        (* Start the transfer. *)
+        let transfer_json = tmpdir // "v2vtransfer.json" in
+        let fd = Unix.openfile transfer_json [O_WRONLY; O_CREAT] 0o600 in
+        if Python_script.run_command ~stdout_fd:fd
+             transfer_script json_params [] <> 0 then
+          error (f_"failed to start transfer, see earlier errors");
+        let json = JSON_parser.json_parser_tree_parse_file transfer_json in
+        debug "transfer output parsed as: %s"
+          (JSON.string_of_doc ~fmt:JSON.Indented ["", json]);
+        let destination_url =
+          JSON_parser.object_get_string "destination_url" json in
+        let transfer_id =
+          JSON_parser.object_get_string "transfer_id" json in
+        transfer_ids <- transfer_ids @ [transfer_id];
+        let is_host = JSON_parser.object_get_bool "is_host" json in
+
+        (* Create the nbdkit instance. *)
+        let cmd = nbdkit_cmd in
+        let cmd = Nbdkit.add_arg cmd "size" (Int64.to_string disk_size) in
+        let cmd = Nbdkit.add_arg cmd "destination_url" destination_url in
+        let cmd =
+          match rhv_options.rhv_cafile with
+          | None -> cmd
+          | Some cafile -> Nbdkit.add_arg cmd "cafile" cafile in
+        let cmd =
+          if not (rhv_options.rhv_verifypeer) then
+            Nbdkit.add_arg cmd "insecure" "true"
+          else cmd in
+        let cmd =
+          if is_host then Nbdkit.add_arg cmd "is_host" "true" else cmd in
         let sock, _ = Nbdkit.run_unix cmd in
 
         if have_selinux then (
@@ -387,6 +424,16 @@ object
     ) (List.combine overlays disk_uuids)
 
   method create_metadata source inspect target_meta targets =
+    (* Finalize all the transfers. *)
+    let json_params =
+      let ids = List.map (fun id -> JSON.String id) transfer_ids in
+      let json_params = ("transfer_ids", JSON.List ids) :: json_params in
+      let ids = List.map (fun uuid -> JSON.String uuid) disk_uuids in
+      let json_params = ("disk_uuids", JSON.List ids) :: json_params in
+      json_params in
+    if Python_script.run_command finalize_script json_params [] <> 0 then
+      error (f_"failed to finalize the transfers, see earlier errors");
+
     (* The storage domain UUID. *)
     let sd_uuid =
       match rhv_storagedomain_uuid with
@@ -416,7 +463,7 @@ object
       error (f_"failed to create virtual machine, see earlier errors");
 
     (* Successful so don't delete on exit. *)
-    delete_disks_on_exit <- false
+    cancel_on_exit <- false
 
 end
 
diff --git a/v2v/output_rhv_upload_deletedisks_source.mli b/v2v/output_rhv_upload_cancel_source.mli
similarity index 100%
rename from v2v/output_rhv_upload_deletedisks_source.mli
rename to v2v/output_rhv_upload_cancel_source.mli
diff --git a/v2v/output_rhv_upload_finalize_source.mli b/v2v/output_rhv_upload_finalize_source.mli
new file mode 100644
index 000000000..aa33bc548
--- /dev/null
+++ b/v2v/output_rhv_upload_finalize_source.mli
@@ -0,0 +1,19 @@
+(* virt-v2v
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ *)
+
+val code : string
diff --git a/v2v/output_rhv_upload_transfer_source.mli b/v2v/output_rhv_upload_transfer_source.mli
new file mode 100644
index 000000000..aa33bc548
--- /dev/null
+++ b/v2v/output_rhv_upload_transfer_source.mli
@@ -0,0 +1,19 @@
+(* virt-v2v
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ *)
+
+val code : string
diff --git a/v2v/rhv-upload-deletedisks.py b/v2v/rhv-upload-cancel.py
similarity index 80%
rename from v2v/rhv-upload-deletedisks.py
rename to v2v/rhv-upload-cancel.py
index 8ed8cb88b..f08713b8c 100644
--- a/v2v/rhv-upload-deletedisks.py
+++ b/v2v/rhv-upload-cancel.py
@@ -1,6 +1,6 @@
 # -*- python -*-
-# oVirt or RHV upload delete disks used by ‘virt-v2v -o rhv-upload’
-# Copyright (C) 2019 Red Hat Inc.
+# oVirt or RHV upload cancel used by ‘virt-v2v -o rhv-upload’
+# Copyright (C) 2019-2021 Red Hat Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -57,13 +57,22 @@ connection = sdk.Connection(
 )
 
 system_service = connection.system_service()
+image_transfers_service = system_service.image_transfers_service()
+
+# Try to cancel the transfers.  This will probably delete the disk.
+for id in params['transfer_ids']:
+    try:
+        transfer_service = image_transfers_service.image_transfer_service(id)
+        transfer_service.cancel()
+    except sdk.NotFoundError:
+        pass
+
 disks_service = system_service.disks_service()
 
+# As a last resort, try to remove the disks.
 for uuid in params['disk_uuids']:
-    # Try to get and remove the disk, however do not fail
-    # if it does not exist (maybe removed in the meanwhile).
     try:
         disk_service = disks_service.disk_service(uuid)
         disk_service.remove()
-    except sdk.NotFoundError:
+    except (sdk.NotFoundError, sdk.Error):
         pass
diff --git a/v2v/rhv-upload-finalize.py b/v2v/rhv-upload-finalize.py
new file mode 100644
index 000000000..d712dd0ab
--- /dev/null
+++ b/v2v/rhv-upload-finalize.py
@@ -0,0 +1,168 @@
+# -*- python -*-
+# oVirt or RHV upload finalize used by ‘virt-v2v -o rhv-upload’
+# Copyright (C) 2018-2021 Red Hat Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import json
+import logging
+import sys
+import time
+from urllib.parse import urlparse
+
+import ovirtsdk4 as sdk
+import ovirtsdk4.types as types
+
+# Timeout to wait for oVirt disks to change status, or the transfer
+# object to finish initializing [seconds].
+timeout = 5 * 60
+
+def debug(s):
+    if params['verbose']:
+        print(s, file=sys.stderr)
+        sys.stderr.flush()
+
+def finalize_transfer(connection, transfer_id, disk_id):
+    """
+    Finalize a transfer, making the transfer disk available.
+
+    If finalizing succeeds, the transfer's disk status will change to OK
+    and transfer's phase will change to FINISHED_SUCCESS. Unfortunately,
+    the disk status is modified before the transfer finishes, and oVirt
+    may still hold a lock on the disk at this point.
+
+    The only way to make sure that the disk is unlocked, is to wait
+    until the transfer phase switches FINISHED_SUCCESS. Unfortunately
+    oVirt makes this hard to use because the transfer is removed shortly
+    after switching the phase to the final phase. However if the
+    transfer was removed, we can be sure that the disk is not locked,
+    since oVirt releases the locks before removing the transfer.
+
+    On errors, the transfer's phase will change to FINISHED_FAILURE and
+    the disk status will change to ILLEGAL and it will be removed. Again
+    the transfer will be removed shortly after that.
+
+    If oVirt fails to finalize the transfer, transfer's phase will
+    change to PAUSED_SYSTEM. In this case the disk's status will change
+    to ILLEGAL and it will not be removed.
+
+    oVirt 4.4.7 made waiting for transfer easier by keeping transfers
+    after they complete, but we must support older versions so we have
+    generic code that work with any version.
+
+    For more info see:
+    - http://ovirt.github.io/ovirt-engine-api-model/4.4/#services/image_transfer
+    - http://ovirt.github.io/ovirt-engine-sdk/master/types.m.html#ovirtsdk4.types.ImageTransfer
+    """
+    debug("finalizing transfer %s" % transfer_id)
+    transfer_service = (connection.system_service()
+                        .image_transfers_service()
+                        .image_transfer_service(transfer_id))
+
+    start = time.time()
+
+    transfer_service.finalize()
+
+    while True:
+        time.sleep(1)
+        try:
+            transfer = transfer_service.get()
+        except sdk.NotFoundError:
+            # Transfer was removed (ovirt < 4.4.7). We need to check the
+            # disk status to understand if the transfer was successful.
+            # Due to the way oVirt does locking, we know that the disk
+            # is unlocked at this point so we can check only once.
+
+            debug("transfer %s was removed, checking disk %s status"
+                  % (transfer.id, disk_id))
+
+            disk_service = (connection.system_service()
+                            .disks_service()
+                            .disk_service(disk_id))
+
+            try:
+                disk = disk_service.get()
+            except sdk.NotFoundError:
+                raise RuntimeError(
+                    "transfer %s failed: disk %s was removed"
+                    % (transfer.id, disk_id))
+
+            debug("disk %s is %s" % (disk_id, disk.status))
+
+            if disk.status == types.DiskStatus.OK:
+                break
+
+            raise RuntimeError(
+                "transfer %s failed: disk is %s" % (transfer.id, disk.status))
+        else:
+            # Transfer exists, check if it reached one of the final
+            # phases, or we timed out.
+
+            debug("transfer %s is %s" % (transfer.id, transfer.phase))
+
+            if transfer.phase == types.ImageTransferPhase.FINISHED_SUCCESS:
+                break
+
+            if transfer.phase == types.ImageTransferPhase.FINISHED_FAILURE:
+                raise RuntimeError(
+                    "transfer %s has failed" % (transfer.id,))
+
+            if transfer.phase == types.ImageTransferPhase.PAUSED_SYSTEM:
+                raise RuntimeError(
+                    "transfer %s was paused by system" % (transfer.id,))
+
+            if time.time() > start + timeout:
+                raise RuntimeError(
+                    "timed out waiting for transfer %s to finalize, "
+                    "transfer is %s"
+                    % (transfer.id, transfer.phase))
+
+    debug("transfer %s finalized in %.3f seconds"
+          % (transfer.id, time.time() - start))
+
+# Parameters are passed in via a JSON doc from the OCaml code.
+# Because this Python code ships embedded inside virt-v2v there
+# is no formal API here.
+params = None
+
+if len(sys.argv) != 2:
+    raise RuntimeError("incorrect number of parameters")
+
+# Parameters are passed in via a JSON document.
+with open(sys.argv[1], 'r') as fp:
+    params = json.load(fp)
+
+# What is passed in is a password file, read the actual password.
+with open(params['output_password'], 'r') as fp:
+    output_password = fp.read()
+output_password = output_password.rstrip()
+
+# Parse out the username from the output_conn URL.
+parsed = urlparse(params['output_conn'])
+username = parsed.username or "admin at internal"
+
+# Connect to the server.
+connection = sdk.Connection(
+    url=params['output_conn'],
+    username=username,
+    password=output_password,
+    ca_file=params['rhv_cafile'],
+    log=logging.getLogger(),
+    insecure=params['insecure'],
+)
+
+# Finalize all the transfers.
+for (transfer_id, disk_id) in zip(params['transfer_ids'], params['disk_uuids']):
+    finalize_transfer(connection, transfer_id, disk_id)
diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py
index 1155cf38d..559ec5062 100644
--- a/v2v/rhv-upload-plugin.py
+++ b/v2v/rhv-upload-plugin.py
@@ -1,6 +1,6 @@
 # -*- python -*-
 # oVirt or RHV upload nbdkit plugin used by ‘virt-v2v -o rhv-upload’
-# Copyright (C) 2018 Red Hat Inc.
+# Copyright (C) 2018-2021 Red Hat Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -16,16 +16,10 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
-import builtins
-import functools
-import inspect
 import json
-import logging
 import queue
 import socket
 import ssl
-import sys
-import time
 
 from contextlib import contextmanager
 from http.client import HTTPSConnection, HTTPConnection
@@ -33,9 +27,6 @@ from urllib.parse import urlparse
 
 import nbdkit
 
-import ovirtsdk4 as sdk
-import ovirtsdk4.types as types
-
 # Using version 2 supporting the buffer protocol for better performance.
 API_VERSION = 2
 
@@ -43,31 +34,47 @@ API_VERSION = 2
 # client, this give best performance.
 MAX_CONNECTIONS = 4
 
-# Timeout to wait for oVirt disks to change status, or the transfer
-# object to finish initializing [seconds].
-timeout = 5 * 60
-
-# Parameters are passed in via a JSON doc from the OCaml code.
-# Because this Python code ships embedded inside virt-v2v there
-# is no formal API here.
-params = None
-
+cafile = None
+insecure = False
+is_host = False
 
+# Parse parameters.
 def config(key, value):
-    global params
+    global cafile, destination_url, is_host, insecure, size
 
-    if key == "params":
-        with builtins.open(value, 'r') as fp:
-            params = json.load(fp)
-        debug("using params: %s" % params)
+    if key == "cafile":
+        cafile = value
+    elif key == "destination_url":
+        destination_url = urlparse(value)
+    elif key == "is_host":
+        is_host = value.lower() in ['true', '1']
+    elif key == "insecure":
+        insecure = value.lower() in ['true', '1']
+    elif key == "size":
+        size = int(value)
     else:
         raise RuntimeError("unknown configuration key '%s'" % key)
 
-
 def config_complete():
-    if params is None:
-        raise RuntimeError("missing configuration parameters")
+    # These parameters are required.
+    if destination_url is None:
+        raise RuntimeError("destination_url parameter was not set")
+    if size is None:
+        raise RuntimeError("size parameter was not set")
 
+def after_fork():
+    global options, pool
+
+    http = create_http(destination_url)
+    options = get_options(http, destination_url)
+    http.close()
+
+    nbdkit.debug("imageio features: flush=%(can_flush)r "
+                 "zero=%(can_zero)r unix_socket=%(unix_socket)r "
+                 "max_readers=%(max_readers)r max_writers=%(max_writers)r"
+                 % options)
+
+    pool = create_http_pool(destination_url, options)
 
 def thread_model():
     """
@@ -76,119 +83,21 @@ def thread_model():
     """
     return nbdkit.THREAD_MODEL_PARALLEL
 
-
-def debug(s):
-    if params['verbose']:
-        print(s, file=sys.stderr)
-        sys.stderr.flush()
-
-
-def read_password():
-    """
-    Read the password from file.
-    """
-    with builtins.open(params['output_password'], 'r') as fp:
-        data = fp.read()
-    return data.rstrip()
-
-
-def parse_username():
-    """
-    Parse out the username from the output_conn URL.
-    """
-    parsed = urlparse(params['output_conn'])
-    return parsed.username or "admin at internal"
-
-
-def failing(func):
-    """
-    Decorator marking the handle as failed if any exception is raised in the
-    decorated function.  This is used in close() to cleanup properly after
-    failures.
-    """
-    @functools.wraps(func)
-    def wrapper(h, *args):
-        try:
-            return func(h, *args)
-        except:
-            h['failed'] = True
-            raise
-
-    return wrapper
-
-
 def open(readonly):
-    connection = sdk.Connection(
-        url=params['output_conn'],
-        username=parse_username(),
-        password=read_password(),
-        ca_file=params['rhv_cafile'],
-        log=logging.getLogger(),
-        insecure=params['insecure'],
-    )
+    return 1
 
-    # Use the local host is possible.
-    host = find_host(connection) if params['rhv_direct'] else None
-    disk = create_disk(connection)
-
-    transfer = create_transfer(connection, disk, host)
-    try:
-        destination_url = parse_transfer_url(transfer)
-        http = create_http(destination_url)
-        options = get_options(http, destination_url)
-
-        # Close the initial connection to imageio server. When qemu-img will
-        # try to access the server, HTTPConnection will reconnect
-        # automatically. If we keep this connection idle and qemu-img is too
-        # slow getting image extents, imageio server may close the connection,
-        # and the import will fail on the first write.
-        # See https://bugzilla.redhat.com/1916176.
-        http.close()
-
-        pool = create_http_pool(destination_url, host, options)
-    except:
-        cancel_transfer(connection, transfer)
-        raise
-
-    debug("imageio features: flush=%(can_flush)r "
-          "zero=%(can_zero)r unix_socket=%(unix_socket)r "
-          "max_readers=%(max_readers)r max_writers=%(max_writers)r"
-          % options)
-
-    # Save everything we need to make requests in the handle.
-    return {
-        'can_flush': options['can_flush'],
-        'can_zero': options['can_zero'],
-        'connection': connection,
-        'disk_id': disk.id,
-        'transfer': transfer,
-        'failed': False,
-        'pool': pool,
-        'connections': pool.qsize(),
-        'path': destination_url.path,
-    }
-
-
- at failing
 def can_trim(h):
     return False
 
-
- at failing
 def can_flush(h):
-    return h['can_flush']
+    return options['can_flush']
 
-
- at failing
 def can_fua(h):
     # imageio flush feature is is compatible with NBD_CMD_FLAG_FUA.
-    return h['can_flush']
+    return options['can_flush']
 
-
- at failing
 def get_size(h):
-    return params['disk_size']
-
+    return size
 
 # Any unexpected HTTP response status from the server will end up calling this
 # function which logs the full error, and raises a RuntimeError exception.
@@ -201,28 +110,25 @@ def request_failed(r, msg):
         body = "(Unable to read response body: %s)" % e
 
     # Log the full error if we're verbose.
-    debug("unexpected response from imageio server:")
-    debug(msg)
-    debug("%d: %s" % (status, reason))
-    debug(body)
+    nbdkit.debug("unexpected response from imageio server:")
+    nbdkit.debug(msg)
+    nbdkit.debug("%d: %s" % (status, reason))
+    nbdkit.debug(body)
 
     # Only a short error is included in the exception.
     raise RuntimeError("%s: %d %s: %r" % (msg, status, reason, body[:200]))
 
-
 # For documentation see:
 # https://github.com/oVirt/ovirt-imageio/blob/master/docs/random-io.md
 # For examples of working code to read/write from the server, see:
 # https://github.com/oVirt/ovirt-imageio/blob/master/daemon/test/server_test.py
 
-
- at failing
 def pread(h, buf, offset, flags):
     count = len(buf)
     headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)}
 
-    with http_context(h) as http:
-        http.request("GET", h['path'], headers=headers)
+    with http_context(pool) as http:
+        http.request("GET", destination_url.path, headers=headers)
 
         r = http.getresponse()
         # 206 = HTTP Partial Content.
@@ -248,15 +154,13 @@ def pread(h, buf, offset, flags):
                                    (offset, count, got))
                 got += n
 
-
- at failing
 def pwrite(h, buf, offset, flags):
     count = len(buf)
 
-    flush = "y" if (h['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n"
+    flush = "y" if (options['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n"
 
-    with http_context(h) as http:
-        http.putrequest("PUT", h['path'] + "?flush=" + flush)
+    with http_context(pool) as http:
+        http.putrequest("PUT", destination_url.path + "?flush=" + flush)
         # The oVirt server only uses the first part of the range, and the
         # content-length.
         http.putheader("Content-Range", "bytes %d-%d/*" %
@@ -277,17 +181,15 @@ def pwrite(h, buf, offset, flags):
 
         r.read()
 
-
- at failing
 def zero(h, count, offset, flags):
     # Unlike the trim and flush calls, there is no 'can_zero' method
     # so nbdkit could call this even if the server doesn't support
     # zeroing.  If this is the case we must emulate.
-    if not h['can_zero']:
+    if not options['can_zero']:
         emulate_zero(h, count, offset, flags)
         return
 
-    flush = bool(h['can_flush'] and (flags & nbdkit.FLAG_FUA))
+    flush = bool(options['can_flush'] and (flags & nbdkit.FLAG_FUA))
 
     # Construct the JSON request for zeroing.
     buf = json.dumps({'op': "zero",
@@ -298,8 +200,8 @@ def zero(h, count, offset, flags):
     headers = {"Content-Type": "application/json",
                "Content-Length": str(len(buf))}
 
-    with http_context(h) as http:
-        http.request("PATCH", h['path'], body=buf, headers=headers)
+    with http_context(pool) as http:
+        http.request("PATCH", destination_url.path, body=buf, headers=headers)
 
         r = http.getresponse()
         if r.status != 200:
@@ -309,12 +211,11 @@ def zero(h, count, offset, flags):
 
         r.read()
 
-
 def emulate_zero(h, count, offset, flags):
-    flush = "y" if (h['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n"
+    flush = "y" if (options['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n"
 
-    with http_context(h) as http:
-        http.putrequest("PUT", h['path'] + "?flush=" + flush)
+    with http_context(pool) as http:
+        http.putrequest("PUT", destination_url.path + "?flush=" + flush)
         http.putheader("Content-Range",
                        "bytes %d-%d/*" % (offset, offset + count - 1))
         http.putheader("Content-Length", str(count))
@@ -337,8 +238,6 @@ def emulate_zero(h, count, offset, flags):
 
         r.read()
 
-
- at failing
 def flush(h, flags):
     # Construct the JSON request for flushing.
     buf = json.dumps({'op': "flush"}).encode()
@@ -346,11 +245,10 @@ def flush(h, flags):
     headers = {"Content-Type": "application/json",
                "Content-Length": str(len(buf))}
 
-    # Wait until all inflight requests are completed, and send a flush request
-    # for all imageio connections.
-
-    for http in iter_http_pool(h):
-        http.request("PATCH", h['path'], body=buf, headers=headers)
+    # Wait until all inflight requests are completed, and send a flush
+    # request for all imageio connections.
+    for http in iter_http_pool(pool):
+        http.request("PATCH", destination_url.path, body=buf, headers=headers)
 
         r = http.getresponse()
         if r.status != 200:
@@ -358,50 +256,14 @@ def flush(h, flags):
 
         r.read()
 
-
 def close(h):
-    connection = h['connection']
-    transfer = h['transfer']
-    disk_id = h['disk_id']
-
-    # This is sometimes necessary because python doesn't set up
-    # sys.stderr to be line buffered and so debug, errors or
-    # exceptions printed previously might not be emitted before the
-    # plugin exits.
-    sys.stderr.flush()
-
-    close_http_pool(h)
-
-    # If the connection failed earlier ensure we cancel the transfer. Canceling
-    # the transfer will delete the disk.
-    if h['failed']:
-        try:
-            cancel_transfer(connection, transfer)
-        finally:
-            connection.close()
-        return
-
-    # Try to finalize the transfer. On errors the transfer may be paused by the
-    # system, and we need to cancel the transfer to remove the disk.
-    try:
-        finalize_transfer(connection, transfer, disk_id)
-    except:
-        cancel_transfer(connection, transfer)
-        raise
-    else:
-        # Write the disk ID file.  Only do this on successful completion.
-        with builtins.open(params['diskid_file'], 'w') as fp:
-            fp.write(disk_id)
-    finally:
-        connection.close()
-
+    close_http_pool(pool)
 
 # Modify http.client.HTTPConnection to work over a Unix domain socket.
 # Derived from uhttplib written by Erik van Zijst under an MIT license.
 # (https://pypi.org/project/uhttplib/)
 # Ported to Python 3 by Irit Goihman.
 
-
 class UnixHTTPConnection(HTTPConnection):
     def __init__(self, path, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
         self.path = path
@@ -414,318 +276,18 @@ class UnixHTTPConnection(HTTPConnection):
         self.sock.connect(self.path)
 
 
-# oVirt SDK operations
 
-
-def find_host(connection):
-    """Return the current host object or None."""
-    try:
-        with builtins.open("/etc/vdsm/vdsm.id") as f:
-            vdsm_id = f.readline().strip()
-    except Exception as e:
-        # This is most likely not an oVirt host.
-        debug("cannot read /etc/vdsm/vdsm.id, using any host: %s" % e)
-        return None
-
-    debug("hw_id = %r" % vdsm_id)
-
-    system_service = connection.system_service()
-    storage_name = params['output_storage']
-    data_centers = system_service.data_centers_service().list(
-        search='storage.name=%s' % storage_name,
-        case_sensitive=True,
-    )
-    if len(data_centers) == 0:
-        # The storage domain is not attached to a datacenter
-        # (shouldn't happen, would fail on disk creation).
-        debug("storange domain (%s) is not attached to a DC" % storage_name)
-        return None
-
-    datacenter = data_centers[0]
-    debug("datacenter = %s" % datacenter.name)
-
-    hosts_service = system_service.hosts_service()
-    hosts = hosts_service.list(
-        search="hw_id=%s and datacenter=%s and status=Up"
-               % (vdsm_id, datacenter.name),
-        case_sensitive=True,
-    )
-    if len(hosts) == 0:
-        # Couldn't find a host that's fulfilling the following criteria:
-        # - 'hw_id' equals to 'vdsm_id'
-        # - Its status is 'Up'
-        # - Belongs to the storage domain's datacenter
-        debug("cannot find a running host with hw_id=%r, "
-              "that belongs to datacenter '%s', "
-              "using any host" % (vdsm_id, datacenter.name))
-        return None
-
-    host = hosts[0]
-    debug("host.id = %r" % host.id)
-
-    return types.Host(id=host.id)
-
-
-def create_disk(connection):
-    """
-    Create a new disk for the transfer and wait until the disk is ready.
-
-    Returns disk object.
-    """
-    system_service = connection.system_service()
-    disks_service = system_service.disks_service()
-
-    if params['disk_format'] == "raw":
-        disk_format = types.DiskFormat.RAW
-    else:
-        disk_format = types.DiskFormat.COW
-
-    disk = disks_service.add(
-        disk=types.Disk(
-            id=params.get('disk_uuid'),
-            name=params['disk_name'],
-            description="Uploaded by virt-v2v",
-            format=disk_format,
-            # XXX For qcow2 disk on block storage, we should use the estimated
-            # size, based on qemu-img measure of the overlay.
-            initial_size=params['disk_size'],
-            provisioned_size=params['disk_size'],
-            # Handling this properly will be complex, see:
-            # https://www.redhat.com/archives/libguestfs/2018-March/msg00177.html
-            sparse=True,
-            storage_domains=[
-                types.StorageDomain(
-                    name=params['output_storage'],
-                )
-            ],
-        )
-    )
-
-    debug("disk.id = %r" % disk.id)
-
-    # Wait till the disk moved from LOCKED state to OK state, as the transfer
-    # can't start if the disk is locked.
-
-    disk_service = disks_service.disk_service(disk.id)
-    endt = time.time() + timeout
-    while True:
-        time.sleep(1)
-        disk = disk_service.get()
-        if disk.status == types.DiskStatus.OK:
-            break
-        if time.time() > endt:
-            raise RuntimeError(
-                "timed out waiting for disk %s to become unlocked" % disk.id)
-
-    return disk
-
-
-def create_transfer(connection, disk, host):
-    """
-    Create image transfer and wait until the transfer is ready.
-
-    Returns a transfer object.
-    """
-    system_service = connection.system_service()
-    transfers_service = system_service.image_transfers_service()
-
-    extra = {}
-    if transfer_supports_format():
-        extra["format"] = types.DiskFormat.RAW
-
-    transfer = transfers_service.add(
-        types.ImageTransfer(
-            disk=types.Disk(id=disk.id),
-            host=host,
-            inactivity_timeout=3600,
-            **extra,
-        )
-    )
-
-    # At this point the transfer owns the disk and will delete the disk if the
-    # transfer is canceled, or if finalizing the transfer fails.
-
-    debug("transfer.id = %r" % transfer.id)
-
-    # Get a reference to the created transfer service.
-    transfer_service = transfers_service.image_transfer_service(transfer.id)
-
-    # Wait until transfer's phase change from INITIALIZING to TRANSFERRING. On
-    # errors transfer's phase can change to PAUSED_SYSTEM or FINISHED_FAILURE.
-    # If the transfer was paused, we need to cancel it to remove the disk,
-    # otherwise the system will remove the disk and transfer shortly after.
-
-    endt = time.time() + timeout
-    while True:
-        time.sleep(1)
-        try:
-            transfer = transfer_service.get()
-        except sdk.NotFoundError:
-            # The system has removed the disk and the transfer.
-            raise RuntimeError("transfer %s was removed" % transfer.id)
-
-        if transfer.phase == types.ImageTransferPhase.FINISHED_FAILURE:
-            # The system will remove the disk and the transfer soon.
-            raise RuntimeError(
-                "transfer %s has failed" % transfer.id)
-
-        if transfer.phase == types.ImageTransferPhase.PAUSED_SYSTEM:
-            transfer_service.cancel()
-            raise RuntimeError(
-                "transfer %s was paused by system" % transfer.id)
-
-        if transfer.phase == types.ImageTransferPhase.TRANSFERRING:
-            break
-
-        if transfer.phase != types.ImageTransferPhase.INITIALIZING:
-            transfer_service.cancel()
-            raise RuntimeError(
-                "unexpected transfer %s phase %s"
-                % (transfer.id, transfer.phase))
-
-        if time.time() > endt:
-            transfer_service.cancel()
-            raise RuntimeError(
-                "timed out waiting for transfer %s" % transfer.id)
-
-    return transfer
-
-
-def cancel_transfer(connection, transfer):
-    """
-    Cancel a transfer, removing the transfer disk.
-    """
-    debug("canceling transfer %s" % transfer.id)
-    transfer_service = (connection.system_service()
-                        .image_transfers_service()
-                        .image_transfer_service(transfer.id))
-    transfer_service.cancel()
-
-
-def finalize_transfer(connection, transfer, disk_id):
-    """
-    Finalize a transfer, making the transfer disk available.
-
-    If finalizing succeeds, the transfer's disk status will change to OK
-    and transfer's phase will change to FINISHED_SUCCESS. Unfortunately,
-    the disk status is modified before the transfer finishes, and oVirt
-    may still hold a lock on the disk at this point.
-
-    The only way to make sure that the disk is unlocked, is to wait
-    until the transfer phase switches FINISHED_SUCCESS. Unfortunately
-    oVirt makes this hard to use because the transfer is removed shortly
-    after switching the phase to the final phase. However if the
-    transfer was removed, we can be sure that the disk is not locked,
-    since oVirt releases the locks before removing the transfer.
-
-    On errors, the transfer's phase will change to FINISHED_FAILURE and
-    the disk status will change to ILLEGAL and it will be removed. Again
-    the transfer will be removed shortly after that.
-
-    If oVirt fails to finalize the transfer, transfer's phase will
-    change to PAUSED_SYSTEM. In this case the disk's status will change
-    to ILLEGAL and it will not be removed.
-
-    oVirt 4.4.7 made waiting for transfer easier by keeping transfers
-    after they complete, but we must support older versions so we have
-    generic code that work with any version.
-
-    For more info see:
-    - http://ovirt.github.io/ovirt-engine-api-model/4.4/#services/image_transfer
-    - http://ovirt.github.io/ovirt-engine-sdk/master/types.m.html#ovirtsdk4.types.ImageTransfer
-    """
-    debug("finalizing transfer %s" % transfer.id)
-    transfer_service = (connection.system_service()
-                        .image_transfers_service()
-                        .image_transfer_service(transfer.id))
-
-    start = time.time()
-
-    transfer_service.finalize()
-
-    while True:
-        time.sleep(1)
-        try:
-            transfer = transfer_service.get()
-        except sdk.NotFoundError:
-            # Transfer was removed (ovirt < 4.4.7). We need to check the
-            # disk status to understand if the transfer was successful.
-            # Due to the way oVirt does locking, we know that the disk
-            # is unlocked at this point so we can check only once.
-
-            debug("transfer %s was removed, checking disk %s status"
-                  % (transfer.id, disk_id))
-
-            disk_service = (connection.system_service()
-                            .disks_service()
-                            .disk_service(disk_id))
-
-            try:
-                disk = disk_service.get()
-            except sdk.NotFoundError:
-                raise RuntimeError(
-                    "transfer %s failed: disk %s was removed"
-                    % (transfer.id, disk_id))
-
-            debug("disk %s is %s" % (disk_id, disk.status))
-
-            if disk.status == types.DiskStatus.OK:
-                break
-
-            raise RuntimeError(
-                "transfer %s failed: disk is %s" % (transfer.id, disk.status))
-        else:
-            # Transfer exists, check if it reached one of the final
-            # phases, or we timed out.
-
-            debug("transfer %s is %s" % (transfer.id, transfer.phase))
-
-            if transfer.phase == types.ImageTransferPhase.FINISHED_SUCCESS:
-                break
-
-            if transfer.phase == types.ImageTransferPhase.FINISHED_FAILURE:
-                raise RuntimeError(
-                    "transfer %s has failed" % (transfer.id,))
-
-            if transfer.phase == types.ImageTransferPhase.PAUSED_SYSTEM:
-                raise RuntimeError(
-                    "transfer %s was paused by system" % (transfer.id,))
-
-            if time.time() > start + timeout:
-                raise RuntimeError(
-                    "timed out waiting for transfer %s to finalize, "
-                    "transfer is %s"
-                    % (transfer.id, transfer.phase))
-
-    debug("transfer %s finalized in %.3f seconds"
-          % (transfer.id, time.time() - start))
-
-
-def transfer_supports_format():
-    """
-    Return True if transfer supports the "format" argument, enabing the NBD
-    bakend on imageio side, which allows uploading to qcow2 images.
-
-    This feature was added in ovirt 4.3. We assume that the SDK version matches
-    engine version.
-    """
-    sig = inspect.signature(types.ImageTransfer)
-    return "format" in sig.parameters
-
-
-# Connection pool managment
-
-
-def create_http_pool(url, host, options):
+# Connection pool.
+def create_http_pool(url, options):
     pool = queue.Queue()
 
     count = min(options["max_readers"],
                 options["max_writers"],
                 MAX_CONNECTIONS)
 
-    debug("creating http pool connections=%d" % count)
+    nbdkit.debug("creating http pool connections=%d" % count)
 
-    unix_socket = options["unix_socket"] if host is not None else None
+    unix_socket = options["unix_socket"] if is_host else None
 
     for i in range(count):
         http = create_http(url, unix_socket=unix_socket)
@@ -733,22 +295,19 @@ def create_http_pool(url, host, options):
 
     return pool
 
-
 @contextmanager
-def http_context(h):
+def http_context(pool):
     """
     Context manager yielding an imageio http connection from the pool. Blocks
     until a connection is available.
     """
-    pool = h["pool"]
     http = pool.get()
     try:
         yield http
     finally:
         pool.put(http)
 
-
-def iter_http_pool(h):
+def iter_http_pool(pool):
     """
     Wait until all inflight requests are done, and iterate on imageio
     connections.
@@ -756,11 +315,10 @@ def iter_http_pool(h):
     The pool is empty during iteration. New requests issued during iteration
     will block until iteration is done.
     """
-    pool = h["pool"]
     locked = []
 
     # Lock the pool by taking the connection out.
-    while len(locked) < h["connections"]:
+    while len(locked) < pool.qsize():
         locked.append(pool.get())
 
     try:
@@ -771,44 +329,23 @@ def iter_http_pool(h):
         for http in locked:
             pool.put(http)
 
-
-def close_http_pool(h):
+def close_http_pool(pool):
     """
     Wait until all inflight requests are done, close all connections and remove
     them from the pool.
 
     No request can be served by the pool after this call.
     """
-    debug("closing http pool")
+    nbdkit.debug("closing http pool")
 
-    pool = h["pool"]
     locked = []
 
-    while len(locked) < h["connections"]:
+    while len(locked) < pool.qsize():
         locked.append(pool.get())
 
     for http in locked:
         http.close()
 
-
-# oVirt imageio operations
-
-
-def parse_transfer_url(transfer):
-    """
-    Returns a parsed transfer url, preferring direct transfer if possible.
-    """
-    if params['rhv_direct']:
-        if transfer.transfer_url is None:
-            raise RuntimeError("direct upload to host not supported, "
-                               "requires ovirt-engine >= 4.2 and only works "
-                               "when virt-v2v is run within the oVirt/RHV "
-                               "environment, eg. on an oVirt node.")
-        return urlparse(transfer.transfer_url)
-    else:
-        return urlparse(transfer.proxy_url)
-
-
 def create_http(url, unix_socket=None):
     """
     Create http connection for transfer url.
@@ -816,32 +353,31 @@ def create_http(url, unix_socket=None):
     Returns HTTPConnection.
     """
     if unix_socket:
-        debug("creating unix http connection socket=%r" % unix_socket)
+        nbdkit.debug("creating unix http connection socket=%r" % unix_socket)
         try:
             return UnixHTTPConnection(unix_socket)
         except Exception as e:
             # Very unlikely, but we can recover by using https.
-            debug("cannot create unix socket connection: %s" % e)
+            nbdkit.debug("cannot create unix socket connection: %s" % e)
 
     if url.scheme == "https":
         context = \
             ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
-                                       cafile=params['rhv_cafile'])
-        if params['insecure']:
+                                       cafile=cafile)
+        if insecure:
             context.check_hostname = False
             context.verify_mode = ssl.CERT_NONE
 
-        debug("creating https connection host=%s port=%s" %
-              (url.hostname, url.port))
+        nbdkit.debug("creating https connection host=%s port=%s" %
+                     (url.hostname, url.port))
         return HTTPSConnection(url.hostname, url.port, context=context)
     elif url.scheme == "http":
-        debug("creating http connection host=%s port=%s" %
-              (url.hostname, url.port))
+        nbdkit.debug("creating http connection host=%s port=%s" %
+                     (url.hostname, url.port))
         return HTTPConnection(url.hostname, url.port)
     else:
         raise RuntimeError("unknown URL scheme (%s)" % url.scheme)
 
-
 def get_options(http, url):
     """
     Send OPTIONS request to imageio server and return options dict.
diff --git a/v2v/rhv-upload-transfer.py b/v2v/rhv-upload-transfer.py
new file mode 100644
index 000000000..5abbfe23f
--- /dev/null
+++ b/v2v/rhv-upload-transfer.py
@@ -0,0 +1,278 @@
+# -*- python -*-
+# oVirt or RHV upload start transfer used by ‘virt-v2v -o rhv-upload’
+# Copyright (C) 2018-2021 Red Hat Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import inspect
+import json
+import logging
+import sys
+import time
+from urllib.parse import urlparse
+
+import ovirtsdk4 as sdk
+import ovirtsdk4.types as types
+
+# Timeout to wait for oVirt disks to change status, or the transfer
+# object to finish initializing [seconds].
+timeout = 5 * 60
+
+def debug(s):
+    if params['verbose']:
+        print(s, file=sys.stderr)
+        sys.stderr.flush()
+
+def find_host(connection):
+    """Return the current host object or None."""
+    try:
+        with open("/etc/vdsm/vdsm.id") as f:
+            vdsm_id = f.readline().strip()
+    except Exception as e:
+        # This is most likely not an oVirt host.
+        debug("cannot read /etc/vdsm/vdsm.id, using any host: %s" % e)
+        return None
+
+    debug("hw_id = %r" % vdsm_id)
+
+    system_service = connection.system_service()
+    storage_name = params['output_storage']
+    data_centers = system_service.data_centers_service().list(
+        search='storage.name=%s' % storage_name,
+        case_sensitive=True,
+    )
+    if len(data_centers) == 0:
+        # The storage domain is not attached to a datacenter
+        # (shouldn't happen, would fail on disk creation).
+        debug("storange domain (%s) is not attached to a DC" % storage_name)
+        return None
+
+    datacenter = data_centers[0]
+    debug("datacenter = %s" % datacenter.name)
+
+    hosts_service = system_service.hosts_service()
+    hosts = hosts_service.list(
+        search="hw_id=%s and datacenter=%s and status=Up"
+               % (vdsm_id, datacenter.name),
+        case_sensitive=True,
+    )
+    if len(hosts) == 0:
+        # Couldn't find a host that's fulfilling the following criteria:
+        # - 'hw_id' equals to 'vdsm_id'
+        # - Its status is 'Up'
+        # - Belongs to the storage domain's datacenter
+        debug("cannot find a running host with hw_id=%r, "
+              "that belongs to datacenter '%s', "
+              "using any host" % (vdsm_id, datacenter.name))
+        return None
+
+    host = hosts[0]
+    debug("host.id = %r" % host.id)
+
+    return types.Host(id=host.id)
+
+def create_disk(connection):
+    """
+    Create a new disk for the transfer and wait until the disk is ready.
+
+    Returns disk object.
+    """
+    system_service = connection.system_service()
+    disks_service = system_service.disks_service()
+
+    if params['disk_format'] == "raw":
+        disk_format = types.DiskFormat.RAW
+    else:
+        disk_format = types.DiskFormat.COW
+
+    disk = disks_service.add(
+        disk=types.Disk(
+            id=params.get('disk_uuid'),
+            name=params['disk_name'],
+            description="Uploaded by virt-v2v",
+            format=disk_format,
+            # XXX For qcow2 disk on block storage, we should use the estimated
+            # size, based on qemu-img measure of the overlay.
+            initial_size=params['disk_size'],
+            provisioned_size=params['disk_size'],
+            # Handling this properly will be complex, see:
+            # https://www.redhat.com/archives/libguestfs/2018-March/msg00177.html
+            sparse=True,
+            storage_domains=[
+                types.StorageDomain(
+                    name=params['output_storage'],
+                )
+            ],
+        )
+    )
+
+    debug("disk.id = %r" % disk.id)
+
+    # Wait till the disk moved from LOCKED state to OK state, as the transfer
+    # can't start if the disk is locked.
+
+    disk_service = disks_service.disk_service(disk.id)
+    endt = time.time() + timeout
+    while True:
+        time.sleep(1)
+        disk = disk_service.get()
+        if disk.status == types.DiskStatus.OK:
+            break
+        if time.time() > endt:
+            raise RuntimeError(
+                "timed out waiting for disk %s to become unlocked" % disk.id)
+
+    return disk
+
+def create_transfer(connection, disk, host):
+    """
+    Create image transfer and wait until the transfer is ready.
+
+    Returns a transfer object.
+    """
+    system_service = connection.system_service()
+    transfers_service = system_service.image_transfers_service()
+
+    extra = {}
+    if transfer_supports_format():
+        extra["format"] = types.DiskFormat.RAW
+
+    transfer = transfers_service.add(
+        types.ImageTransfer(
+            disk=types.Disk(id=disk.id),
+            host=host,
+            inactivity_timeout=3600,
+            **extra,
+        )
+    )
+
+    # At this point the transfer owns the disk and will delete the disk if the
+    # transfer is canceled, or if finalizing the transfer fails.
+
+    debug("transfer.id = %r" % transfer.id)
+
+    # Get a reference to the created transfer service.
+    transfer_service = transfers_service.image_transfer_service(transfer.id)
+
+    # Wait until transfer's phase change from INITIALIZING to TRANSFERRING. On
+    # errors transfer's phase can change to PAUSED_SYSTEM or FINISHED_FAILURE.
+    # If the transfer was paused, we need to cancel it to remove the disk,
+    # otherwise the system will remove the disk and transfer shortly after.
+
+    endt = time.time() + timeout
+    while True:
+        time.sleep(1)
+        try:
+            transfer = transfer_service.get()
+        except sdk.NotFoundError:
+            # The system has removed the disk and the transfer.
+            raise RuntimeError("transfer %s was removed" % transfer.id)
+
+        if transfer.phase == types.ImageTransferPhase.FINISHED_FAILURE:
+            # The system will remove the disk and the transfer soon.
+            raise RuntimeError(
+                "transfer %s has failed" % transfer.id)
+
+        if transfer.phase == types.ImageTransferPhase.PAUSED_SYSTEM:
+            transfer_service.cancel()
+            raise RuntimeError(
+                "transfer %s was paused by system" % transfer.id)
+
+        if transfer.phase == types.ImageTransferPhase.TRANSFERRING:
+            break
+
+        if transfer.phase != types.ImageTransferPhase.INITIALIZING:
+            transfer_service.cancel()
+            raise RuntimeError(
+                "unexpected transfer %s phase %s"
+                % (transfer.id, transfer.phase))
+
+        if time.time() > endt:
+            transfer_service.cancel()
+            raise RuntimeError(
+                "timed out waiting for transfer %s" % transfer.id)
+
+    return transfer
+
+def transfer_supports_format():
+    """
+    Return True if transfer supports the "format" argument, enabing the NBD
+    bakend on imageio side, which allows uploading to qcow2 images.
+
+    This feature was added in ovirt 4.3. We assume that the SDK version matches
+    engine version.
+    """
+    sig = inspect.signature(types.ImageTransfer)
+    return "format" in sig.parameters
+
+def get_transfer_url(transfer):
+    """
+    Returns the transfer url, preferring direct transfer if possible.
+    """
+    if params['rhv_direct']:
+        if transfer.transfer_url is None:
+            raise RuntimeError("direct upload to host not supported, "
+                               "requires ovirt-engine >= 4.2 and only works "
+                               "when virt-v2v is run within the oVirt/RHV "
+                               "environment, eg. on an oVirt node.")
+        return transfer.transfer_url
+    else:
+        return transfer.proxy_url
+
+# Parameters are passed in via a JSON doc from the OCaml code.
+# Because this Python code ships embedded inside virt-v2v there
+# is no formal API here.
+params = None
+
+if len(sys.argv) != 2:
+    raise RuntimeError("incorrect number of parameters")
+
+# Parameters are passed in via a JSON document.
+with open(sys.argv[1], 'r') as fp:
+    params = json.load(fp)
+
+# What is passed in is a password file, read the actual password.
+with open(params['output_password'], 'r') as fp:
+    output_password = fp.read()
+output_password = output_password.rstrip()
+
+# Parse out the username from the output_conn URL.
+parsed = urlparse(params['output_conn'])
+username = parsed.username or "admin at internal"
+
+connection = sdk.Connection(
+    url=params['output_conn'],
+    username=username,
+    password=output_password,
+    ca_file=params['rhv_cafile'],
+    log=logging.getLogger(),
+    insecure=params['insecure'],
+)
+
+# Use the local host if possible.
+host = find_host(connection) if params['rhv_direct'] else None
+disk = create_disk(connection)
+
+transfer = create_transfer(connection, disk, host)
+destination_url = get_transfer_url(transfer)
+
+# Send the destination URL, transfer ID, and host flag back to OCaml code.
+results = {
+    "transfer_id": transfer.id,
+    "destination_url": destination_url,
+    "is_host": host is not None,
+}
+
+json.dump(results, sys.stdout)
-- 
2.32.0




More information about the Libguestfs mailing list