[libvirt] [PATCH v2 34/38] daemon: Don't call virStreamInData so often

Michal Privoznik mprivozn at redhat.com
Thu Apr 20 10:02:03 UTC 2017


While virStreamInData() should be a small and quick function, in
our implementation it seeks multiple times. Moreover, it is
called even if we know that we are in data. Well, quite. If we
track its return values and do some basic math with them, we can
end up calling virStreamInData right at the boundaries of a data
section or a hole and nowhere else.

Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
---
 src/storage/storage_util.c |   4 +-
 src/util/virfdstream.c     | 236 ++++++++++++++++++++++++++++++++++++++++-----
 src/util/virfdstream.h     |   1 +
 3 files changed, 217 insertions(+), 24 deletions(-)

diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c
index a2d89af..3576435 100644
--- a/src/storage/storage_util.c
+++ b/src/storage/storage_util.c
@@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED,
     /* Not using O_CREAT because the file is required to already exist at
      * this point */
     ret = virFDStreamOpenBlockDevice(stream, target_path,
-                                     offset, len, O_WRONLY);
+                                     offset, len, false, O_WRONLY);
 
  cleanup:
     VIR_FREE(path);
@@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED,
     }
 
     ret = virFDStreamOpenBlockDevice(stream, target_path,
-                                     offset, len, O_RDONLY);
+                                     offset, len, false, O_RDONLY);
 
  cleanup:
     VIR_FREE(path);
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index 0350494..8203680 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
@@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream");
 
 typedef enum {
     VIR_FDSTREAM_MSG_TYPE_DATA,
+    VIR_FDSTREAM_MSG_TYPE_SKIP,
 } virFDStreamMsgType;
 
 typedef struct _virFDStreamMsg virFDStreamMsg;
@@ -66,6 +67,9 @@ struct _virFDStreamMsg {
             size_t len;
             size_t offset;
         } data;
+        struct {
+            size_t len;
+        } skip;
     } stream;
 };
 
@@ -198,6 +202,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg)
     case VIR_FDSTREAM_MSG_TYPE_DATA:
         VIR_FREE(msg->stream.data.buf);
         break;
+    case VIR_FDSTREAM_MSG_TYPE_SKIP:
+        /* nada */
+        break;
     }
 
     VIR_FREE(msg);
@@ -385,6 +392,7 @@ struct _virFDStreamThreadData {
     virStreamPtr st;
     size_t length;
     bool doRead;
+    bool sparse;
     int fdin;
     char *fdinname;
     int fdout;
@@ -407,34 +415,68 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
 
 static ssize_t
 virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+                        bool sparse,
                         const int fdin,
                         const int fdout,
                         const char *fdinname,
                         const char *fdoutname,
+                        size_t *dataLen,
                         size_t buflen)
 {
     virFDStreamMsgPtr msg = NULL;
+    int inData = 0;
+    unsigned long long sectionLen = 0;
     char *buf = NULL;
     ssize_t got;
 
+    if (sparse && *dataLen == 0) {
+        if (virFileInData(fdin, &inData, &sectionLen) < 0)
+            goto error;
+
+        if (inData)
+            *dataLen = sectionLen;
+    }
+
     if (VIR_ALLOC(msg) < 0)
         goto error;
 
-    if (VIR_ALLOC_N(buf, buflen) < 0)
-        goto error;
-
-    if ((got = saferead(fdin, buf, buflen)) < 0) {
-        virReportSystemError(errno,
-                             _("Unable to read %s"),
-                             fdinname);
-        goto error;
+    if (sparse && *dataLen == 0) {
+        msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP;
+        msg->stream.skip.len = sectionLen;
+        got = sectionLen;
+
+        /* HACK. The message queue is one directional. So caller
+         * cannot make us skip the hole. Do that for them instead. */
+        if (sectionLen &&
+            lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
+            virReportSystemError(errno,
+                                 _("unable to seek in %s"),
+                                 fdinname);
+            goto error;
+        }
+    } else {
+        if (sparse &&
+            buflen > *dataLen)
+            buflen = *dataLen;
+
+        if (VIR_ALLOC_N(buf, buflen) < 0)
+            goto error;
+
+        if ((got = saferead(fdin, buf, buflen)) < 0) {
+            virReportSystemError(errno,
+                                 _("Unable to read %s"),
+                                 fdinname);
+            goto error;
+        }
+
+        msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+        msg->stream.data.buf = buf;
+        msg->stream.data.len = got;
+        buf = NULL;
+        if (sparse)
+            *dataLen -= got;
     }
 
-    msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
-    msg->stream.data.buf = buf;
-    msg->stream.data.len = got;
-    buf = NULL;
-
     virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
     msg = NULL;
 
@@ -449,6 +491,7 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
 
 static ssize_t
 virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
+                         bool sparse,
                          const int fdin,
                          const int fdout,
                          const char *fdinname,
@@ -456,6 +499,7 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
 {
     ssize_t got;
     virFDStreamMsgPtr msg = fdst->msg;
+    off_t off;
     bool pop = false;
 
     switch (msg->type) {
@@ -474,6 +518,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
 
         pop = msg->stream.data.offset == msg->stream.data.len;
         break;
+
+    case VIR_FDSTREAM_MSG_TYPE_SKIP:
+        if (!sparse) {
+            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                           _("unexpected stream skip"));
+            return -1;
+        }
+
+        got = msg->stream.skip.len;
+        off = lseek(fdout, got, SEEK_CUR);
+        if (off == (off_t) -1) {
+            virReportSystemError(errno,
+                                 _("unable to seek in %s"),
+                                 fdoutname);
+            return -1;
+        }
+
+        if (ftruncate(fdout, off) < 0) {
+            virReportSystemError(errno,
+                                 _("unable to truncate %s"),
+                                 fdoutname);
+            return -1;
+        }
+
+        pop = true;
+        break;
     }
 
     if (pop) {
@@ -491,6 +561,7 @@ virFDStreamThread(void *opaque)
     virFDStreamThreadDataPtr data = opaque;
     virStreamPtr st = data->st;
     size_t length = data->length;
+    bool sparse = data->sparse;
     int fdin = data->fdin;
     char *fdinname = data->fdinname;
     int fdout = data->fdout;
@@ -499,6 +570,7 @@ virFDStreamThread(void *opaque)
     bool doRead = fdst->threadDoRead;
     size_t buflen = 256 * 1024;
     size_t total = 0;
+    size_t dataLen = 0;
 
     virObjectRef(fdst);
     virObjectLock(fdst);
@@ -533,12 +605,12 @@ virFDStreamThread(void *opaque)
         }
 
         if (doRead)
-            got = virFDStreamThreadDoRead(fdst,
+            got = virFDStreamThreadDoRead(fdst, sparse,
                                           fdin, fdout,
                                           fdinname, fdoutname,
-                                          buflen);
+                                          &dataLen, buflen);
         else
-            got = virFDStreamThreadDoWrite(fdst,
+            got = virFDStreamThreadDoWrite(fdst, sparse,
                                            fdin, fdout,
                                            fdinname, fdoutname);
 
@@ -808,6 +880,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
             }
         }
 
+        /* Shortcut, if the stream is in the trailing hole,
+         * return 0 immediately. */
+        if (msg->type == VIR_FDSTREAM_MSG_TYPE_SKIP &&
+            msg->stream.skip.len == 0) {
+            ret = 0;
+            goto cleanup;
+        }
+
         if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
             /* Nope, nope, I'm outta here */
             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -858,11 +938,120 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
 }
 
 
+static int
+virFDStreamSkip(virStreamPtr st,
+                unsigned long long length)
+{
+    virFDStreamDataPtr fdst = st->privateData;
+    virFDStreamMsgPtr msg = NULL;
+    off_t off;
+    int ret = -1;
+
+    virObjectLock(fdst);
+    if (fdst->length) {
+        if (length > fdst->length - fdst->offset)
+            length = fdst->length - fdst->offset;
+        fdst->offset += length;
+    }
+
+    if (fdst->thread) {
+        /* Things are a bit complicated here. But bear with me. If FDStream is
+         * in a read mode, then if the message at the queue head is SKIP, just
+         * pop it. The thread has lseek()-ed anyway. If however, the FDStream
+         * is in write mode, then tell the thread to do the lseek() for us.
+         * Under no circumstances we can do the lseek() ourselves here. We
+         * might mess up file position for the thread. */
+        if (fdst->threadDoRead) {
+            msg = fdst->msg;
+            if (msg->type != VIR_FDSTREAM_MSG_TYPE_SKIP) {
+                virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                               _("Invalid stream skip"));
+                goto cleanup;
+            }
+
+            virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
+        } else {
+            if (VIR_ALLOC(msg) < 0)
+                goto cleanup;
+
+            msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP;
+            msg->stream.skip.len = length;
+            virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
+            msg = NULL;
+        }
+    } else {
+        off = lseek(fdst->fd, length, SEEK_CUR);
+        if (off == (off_t) -1) {
+            virReportSystemError(errno, "%s",
+                                 _("unable to seek"));
+            goto cleanup;
+        }
+
+        if (ftruncate(fdst->fd, off) < 0) {
+            virReportSystemError(errno, "%s",
+                                 _("unable to truncate"));
+            goto cleanup;
+        }
+    }
+
+    ret = 0;
+ cleanup:
+    virObjectUnlock(fdst);
+    virFDStreamMsgFree(msg);
+    return ret;
+}
+
+
+static int
+virFDStreamInData(virStreamPtr st,
+                  int *inData,
+                  unsigned long long *length)
+{
+    virFDStreamDataPtr fdst = st->privateData;
+    int ret = -1;
+
+    virObjectLock(fdst);
+
+    if (fdst->thread) {
+        virFDStreamMsgPtr msg;
+
+        while (!(msg = fdst->msg)) {
+            if (fdst->threadQuit) {
+                *inData = *length = 0;
+                ret = 0;
+                goto cleanup;
+            } else {
+                virObjectUnlock(fdst);
+                virCondSignal(&fdst->threadCond);
+                virObjectLock(fdst);
+            }
+        }
+
+        if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) {
+            *inData = 1;
+            *length = msg->stream.data.len - msg->stream.data.offset;
+        } else {
+            *inData = 0;
+            *length = msg->stream.skip.len;
+        }
+        ret = 0;
+    } else {
+        ret = virFileInData(fdst->fd, inData, length);
+    }
+
+ cleanup:
+    virObjectUnlock(fdst);
+    return ret;
+}
+
+
 static virStreamDriver virFDStreamDrv = {
     .streamSend = virFDStreamWrite,
     .streamRecv = virFDStreamRead,
     .streamFinish = virFDStreamClose,
     .streamAbort = virFDStreamAbort,
+    .streamSkip = virFDStreamSkip,
+    .streamInData = virFDStreamInData,
     .streamEventAddCallback = virFDStreamAddCallback,
     .streamEventUpdateCallback = virFDStreamUpdateCallback,
     .streamEventRemoveCallback = virFDStreamRemoveCallback
@@ -1003,7 +1192,8 @@ virFDStreamOpenFileInternal(virStreamPtr st,
                             unsigned long long length,
                             int oflags,
                             int mode,
-                            bool forceIOHelper)
+                            bool forceIOHelper,
+                            bool sparse)
 {
     int fd = -1;
     int pipefds[2] = { -1, -1 };
@@ -1070,6 +1260,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
 
         threadData->st = virObjectRef(st);
         threadData->length = length;
+        threadData->sparse = sparse;
 
         if ((oflags & O_ACCMODE) == O_RDONLY) {
             threadData->fdin = fd;
@@ -1119,7 +1310,7 @@ int virFDStreamOpenFile(virStreamPtr st,
     }
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
-                                       oflags, 0, false);
+                                       oflags, 0, false, false);
 }
 
 int virFDStreamCreateFile(virStreamPtr st,
@@ -1132,7 +1323,7 @@ int virFDStreamCreateFile(virStreamPtr st,
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
                                        oflags | O_CREAT, mode,
-                                       false);
+                                       false, false);
 }
 
 #ifdef HAVE_CFMAKERAW
@@ -1148,7 +1339,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
     if (virFDStreamOpenFileInternal(st, path,
                                     offset, length,
                                     oflags | O_CREAT, 0,
-                                    false) < 0)
+                                    false, false) < 0)
         return -1;
 
     fdst = st->privateData;
@@ -1185,7 +1376,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
                                        oflags | O_CREAT, 0,
-                                       false);
+                                       false, false);
 }
 #endif /* !HAVE_CFMAKERAW */
 
@@ -1193,11 +1384,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
                                const char *path,
                                unsigned long long offset,
                                unsigned long long length,
+                               bool sparse,
                                int oflags)
 {
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
-                                       oflags, 0, true);
+                                       oflags, 0, true, sparse);
 }
 
 int virFDStreamSetInternalCloseCb(virStreamPtr st,
diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h
index 34c4c3f..887c991 100644
--- a/src/util/virfdstream.h
+++ b/src/util/virfdstream.h
@@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
                                const char *path,
                                unsigned long long offset,
                                unsigned long long length,
+                               bool sparse,
                                int oflags);
 
 int virFDStreamSetInternalCloseCb(virStreamPtr st,
-- 
2.10.2




More information about the libvir-list mailing list