[Libguestfs] [PATCH] output/rhv-upload-plugin: Support extents

Nir Soffer nsoffer at redhat.com
Sun Dec 19 07:14:03 UTC 2021


Add extents support using imageio /extents API:
https://ovirt.github.io/ovirt-imageio/images.html#extents

Imageio does not support yet partial extents, so we cache entire image
extents, and search for the requested range. The search can probably be
faster using binary search, but it seems good enough as is.

Calling pwrite() and zero() invalidates the cached extents.

This can be useful for checking if an image is zero before copy, or
calculating the allocated size after the copy.

There is one caveat - imageio reports holes using qemu:allocation-depth,
not using base:allocation. So qcow2 images report holes for unallocated
areas, and raw images never reports holes. To get the actual allocation
in RHV, oVirt API should be used.

Testing is complicated:

1. Start ovirt-imageio server
2. Add ticket for test image using nbd socket
3. Start qemu-nbd serving the nbd socket
4. Run nbdkit with rhv-upload-plugin with and imageio https url

I tested various images by getting extents with nbdinfo and qemu-img map
and copying image from rhv-upload-plugin with qemu-img convert.

To get extents using nbdinfo, we need a fix in nbdkit python plugin:
https://listman.redhat.com/archives/libguestfs/2021-December/msg00196.html
---
 output/rhv-upload-plugin.py | 79 +++++++++++++++++++++++++++++++++++++
 1 file changed, 79 insertions(+)

diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py
index 1cb837dd..4f0c02a5 100644
--- a/output/rhv-upload-plugin.py
+++ b/output/rhv-upload-plugin.py
@@ -42,20 +42,25 @@ url = None
 cafile = None
 insecure = False
 is_ovirt_host = False
 
 # List of options read from imageio server.
 options = None
 
 # Pool of HTTP connections.
 pool = None
 
+# Cached extents for entire image. The imageio server does not support yet
+# getting partial extents, and getting all extents may be expensive, so we
+# cache the entire image extents.
+cached_extents = None
+
 
 # Parse parameters.
 def config(key, value):
     global cafile, url, is_ovirt_host, insecure, size
 
     if key == "cafile":
         cafile = value
     elif key == "insecure":
         insecure = value.lower() in ['true', '1']
     elif key == "is_ovirt_host":
@@ -123,20 +128,24 @@ def can_fua(h):
     return options['can_flush']
 
 
 def can_multi_conn(h):
     # We can always handle multiple connections, and the number of NBD
     # connections is independent of the number of HTTP clients in the
     # pool.
     return True
 
 
+def can_extents(h):
+    return options["can_extents"]
+
+
 def get_size(h):
     return size
 
 
 # Any unexpected HTTP response status from the server will end up calling this
 # function which logs the full error, and raises a RuntimeError exception.
 def request_failed(r, msg):
     status = r.status
     reason = r.reason
     try:
@@ -184,48 +193,54 @@ def pread(h, buf, offset, flags):
             while got < count:
                 n = r.readinto(view[got:])
                 if n == 0:
                     request_failed(r,
                                    "short read offset %d size %d got %d" %
                                    (offset, count, got))
                 got += n
 
 
 def pwrite(h, buf, offset, flags):
+    global cached_extents
+
     count = len(buf)
 
     flush = "y" if (options['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n"
 
     with http_context(pool) as http:
         http.putrequest("PUT", url.path + "?flush=" + flush)
         # The oVirt server only uses the first part of the range, and the
         # content-length.
         http.putheader("Content-Range", "bytes %d-%d/*" %
                        (offset, offset + count - 1))
         http.putheader("Content-Length", str(count))
         http.endheaders()
 
         try:
             http.send(buf)
         except BrokenPipeError:
             pass
 
+        cached_extents = None
+
         r = http.getresponse()
         if r.status != 200:
             request_failed(r,
                            "could not write sector offset %d size %d" %
                            (offset, count))
 
         r.read()
 
 
 def zero(h, count, offset, flags):
+    global cached_extents
+
     # Unlike the trim and flush calls, there is no 'can_zero' method
     # so nbdkit could call this even if the server doesn't support
     # zeroing.  If this is the case we must emulate.
     if not options['can_zero']:
         emulate_zero(h, count, offset, flags)
         return
 
     flush = bool(options['can_flush'] and (flags & nbdkit.FLAG_FUA))
 
     # Construct the JSON request for zeroing.
@@ -233,39 +248,45 @@ def zero(h, count, offset, flags):
                       'offset': offset,
                       'size': count,
                       'flush': flush}).encode()
 
     headers = {"Content-Type": "application/json",
                "Content-Length": str(len(buf))}
 
     with http_context(pool) as http:
         http.request("PATCH", url.path, body=buf, headers=headers)
 
+        cached_extents = None
+
         r = http.getresponse()
         if r.status != 200:
             request_failed(r,
                            "could not zero sector offset %d size %d" %
                            (offset, count))
 
         r.read()
 
 
 def emulate_zero(h, count, offset, flags):
+    global cached_extents
+
     flush = "y" if (options['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n"
 
     with http_context(pool) as http:
         http.putrequest("PUT", url.path + "?flush=" + flush)
         http.putheader("Content-Range",
                        "bytes %d-%d/*" % (offset, offset + count - 1))
         http.putheader("Content-Length", str(count))
         http.endheaders()
 
+        cached_extents = None
+
         try:
             buf = bytearray(128 * 1024)
             while count > len(buf):
                 http.send(buf)
                 count -= len(buf)
             http.send(memoryview(buf)[:count])
         except BrokenPipeError:
             pass
 
         r = http.getresponse()
@@ -289,20 +310,76 @@ def flush(h, flags):
     for http in iter_http_pool(pool):
         http.request("PATCH", url.path, body=buf, headers=headers)
 
         r = http.getresponse()
         if r.status != 200:
             request_failed(r, "could not flush")
 
         r.read()
 
 
+def extents(h, count, offset, flags):
+    global cached_extents
+
+    if cached_extents is None:
+        cached_extents = get_all_extents()
+
+    end = offset + count
+
+    for ext in cached_extents:
+        start = ext["start"]
+        length = ext["length"]
+
+        # Stop when extent exceeds the requested range.
+        if start >= end:
+            return
+
+        # Skip over extents before the requested range.
+        if start + length <= offset:
+            continue
+
+        extent_type = 0
+
+        if ext["zero"]:
+            extent_type |= nbdkit.EXTENT_ZERO
+
+        # Old imageio server did not report holes. Note that imageio reports
+        # holes only for unallocated area in qcow2 image, so this flag may not
+        # be very useful.
+        if ext.get("hole"):
+            extent_type |= nbdkit.EXTENT_HOLE
+
+        # The first extent may start before the requested range.
+        if start < offset:
+            length -= offset - start
+            start = offset
+
+        # The last extent may end after the requested range.
+        if start + length > end:
+            length = end - start
+
+        yield start, length, extent_type
+
+
+def get_all_extents():
+    with http_context(pool) as http:
+        http.request("GET", url.path + "/extents")
+
+        r = http.getresponse()
+        if r.status != 200:
+            request_failed(r, "could not get extents")
+
+        data = r.read()
+
+    return json.loads(data)
+
+
 # Modify http.client.HTTPConnection to work over a Unix domain socket.
 # Derived from uhttplib written by Erik van Zijst under an MIT license.
 # (https://pypi.org/project/uhttplib/)
 # Ported to Python 3 by Irit Goihman.
 class UnixHTTPConnection(HTTPConnection):
     def __init__(self, path, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
         self.path = path
         HTTPConnection.__init__(self, "localhost", timeout=timeout)
 
     def connect(self):
@@ -425,28 +502,30 @@ def get_options(http, url):
     http.request("OPTIONS", url.path)
     r = http.getresponse()
     data = r.read()
 
     if r.status == 200:
         j = json.loads(data)
         features = j["features"]
         return {
             "can_flush": "flush" in features,
             "can_zero": "zero" in features,
+            "can_extents": "extents" in features,
             "unix_socket": j.get('unix_socket'),
             "max_readers": j.get("max_readers", 1),
             "max_writers": j.get("max_writers", 1),
         }
 
     elif r.status == 405 or r.status == 204:
         # Old imageio servers returned either 405 Method Not Allowed or
         # 204 No Content (with an empty body).
         return {
             "can_flush": False,
             "can_zero": False,
+            "can_extents": False,
             "unix_socket": None,
             "max_readers": 1,
             "max_writers": 1,
         }
     else:
         raise RuntimeError("could not use OPTIONS request: %d: %s" %
                            (r.status, r.reason))
-- 
2.33.1




More information about the Libguestfs mailing list