[libvirt] [PATCH v2 14/38] Introduce virStreamSparseRecvAll

John Ferlan jferlan at redhat.com
Thu May 4 21:33:58 UTC 2017



On 04/20/2017 06:01 AM, Michal Privoznik wrote:
> This is just a wrapper over new functions that have been just
> introduced: virStreamRecvFlags(), virStreamHoleSize(). It's very
> similar to virStreamRecvAll() except it handles sparse streams
> well.

You could have some API name adjustments not only in the commit message
but also within code.

> 
> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
> ---
>  include/libvirt/libvirt-stream.h |  28 ++++++++-
>  src/libvirt-stream.c             | 119 +++++++++++++++++++++++++++++++++++++++
>  src/libvirt_public.syms          |   1 +
>  3 files changed, 145 insertions(+), 3 deletions(-)
> 
> diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h
> index 23fcc26..e5f5126 100644
> --- a/include/libvirt/libvirt-stream.h
> +++ b/include/libvirt/libvirt-stream.h
> @@ -102,9 +102,9 @@ int virStreamSendAll(virStreamPtr st,
>   * @nbytes: size of the data array
>   * @opaque: optional application provided data
>   *
> - * The virStreamSinkFunc callback is used together
> - * with the virStreamRecvAll function for libvirt to
> - * provide the data that has been received.
> + * The virStreamSinkFunc callback is used together with the
> + * virStreamRecvAll or virStreamSparseRecvAll functions for
> + * libvirt to provide the data that has been received.
>   *
>   * The callback will be invoked multiple times,
>   * providing data in small chunks. The application
> @@ -127,6 +127,28 @@ int virStreamRecvAll(virStreamPtr st,
>                       virStreamSinkFunc handler,
>                       void *opaque);
>  
> +/**
> + * virStreamSinkHoleFunc:
> + * @st: the stream object
> + * @length: stream hole size
> + * @opaque: optional application provided data
> + *
> + * This callback is used together with the virStreamSparseRecvAll
> + * function for libvirt to provide the size of a hole that
> + * occurred in the stream.

The callback may be invoked multiple times as holes are found during
processing a stream. The application should create the hole in the
stream target and then return. A return value of -1 at any time will
abort the receive operation.

> + *
> + * Returns 0 on success,
> + *        -1 upon error
> + */
> +typedef int (*virStreamSinkHoleFunc)(virStreamPtr st,
> +                                     unsigned long long length,
> +                                     void *opaque);
> +
> +int virStreamSparseRecvAll(virStreamPtr stream,
> +                           virStreamSinkFunc handler,
> +                           virStreamSinkHoleFunc holeHandler,
> +                           void *opaque);
> +
>  typedef enum {
>      VIR_STREAM_EVENT_READABLE  = (1 << 0),
>      VIR_STREAM_EVENT_WRITABLE  = (1 << 1),
> diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
> index 1162d33..81190cc 100644
> --- a/src/libvirt-stream.c
> +++ b/src/libvirt-stream.c
> @@ -660,6 +660,125 @@ virStreamRecvAll(virStreamPtr stream,
>  
>  
>  /**
> + * virStreamSparseRecvAll:
> + * @stream: pointer to the stream object
> + * @handler: sink callback for writing data to application
> + * @holeHandler: stream hole callback for skipping holes
> + * @opaque: application defined data
> + *
> + * Receive the entire data stream, sending the data to the
> + * requested data sink. This is simply a convenient alternative

s/sink./sink @handler and calling the skip @holeHandler to generate
holes for sparse stream targets.

> + * to virStreamRecv, for apps that do blocking-I/O.

s/virStreamRecv/virStreamRecvAll/

> + *
> + * An example using this with a hypothetical file download
> + * API looks like:
> + *
> + *   int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) {
> + *       int *fd = opaque;
> + *
> + *       return write(*fd, buf, nbytes);
> + *   }
> + *
> + *   int myskip(virStreamPtr st, unsigned long long offset, void *opaque) {
> + *       int *fd = opaque;
> + *
> + *       return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0;
> + *   }
> + *
> + *   virStreamPtr st = virStreamNew(conn, 0);
> + *   int fd = open("demo.iso", O_WRONLY);
> + *
> + *   virConnectDownloadSparseFile(conn, st);
> + *   if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) {
> + *      ...report an error ...
> + *      goto done;
> + *   }
> + *   if (virStreamFinish(st) < 0)
> + *      ...report an error...
> + *   virStreamFree(st);
> + *   close(fd);
> + *
> + * Note that @opaque data is shared between both @handler and
> + * @holeHandler callbacks.
> + *
> + * Returns 0 if all the data was successfully received. The caller
> + * should invoke virStreamFinish(st) to flush the stream upon
> + * success and then virStreamFree

s/Free/Free().

> + *
> + * Returns -1 upon any error, with virStreamAbort() already
> + * having been called,  so the caller need only call
> + * virStreamFree()

s/,  so/, so/
s/Free()/Free()./

(e.g. lose the extra space and add a period)

Could virStreamFree(). go on the previous line?

> + */
> +int
> +virStreamSparseRecvAll(virStreamPtr stream,
> +                       virStreamSinkFunc handler,
> +                       virStreamSinkHoleFunc holeHandler,
> +                       void *opaque)
> +{
> +    char *bytes = NULL;
> +    size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
> +    const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE;
> +    int ret = -1;
> +
> +    VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
> +              stream, handler, holeHandler, opaque);
> +
> +    virResetLastError();
> +
> +    virCheckStreamReturn(stream, -1);
> +    virCheckNonNullArgGoto(handler, cleanup);
> +    virCheckNonNullArgGoto(holeHandler, cleanup);
> +
> +    if (stream->flags & VIR_STREAM_NONBLOCK) {
> +        virReportError(VIR_ERR_OPERATION_INVALID, "%s",
> +                       _("data sinks cannot be used for non-blocking streams"));
> +        goto cleanup;
> +    }
> +
> +    if (VIR_ALLOC_N(bytes, want) < 0)
> +        goto cleanup;
> +
> +    for (;;) {
> +        int got, offset = 0;
> +        unsigned long long holeLen;
> +        got = virStreamRecvFlags(stream, bytes, want, flags);
> +        if (got == -3) {
> +            if (virStreamHoleSize(stream, &holeLen) < 0) {
> +                virStreamAbort(stream);
> +                goto cleanup;
> +            }
> +
> +            if (holeHandler(stream, holeLen, opaque) < 0) {
> +                virStreamAbort(stream);
> +                goto cleanup;
> +            }

We could continue; here, right? Although, I suppose (0 < -3) would fail
anyway, but if someone changed that someday and didn't read this part...

An ACK ends up being a bit dependent upon previous patches... What's
here seems fine to me though.

John
> +        } else if (got < 0) {
> +            goto cleanup;
> +        } else if (got == 0) {
> +            break;
> +        }
> +        while (offset < got) {
> +            int done;
> +            done = (handler)(stream, bytes + offset, got - offset, opaque);
> +            if (done < 0) {
> +                virStreamAbort(stream);
> +                goto cleanup;
> +            }
> +            offset += done;
> +        }
> +    }
> +    ret = 0;
> +
> + cleanup:
> +    VIR_FREE(bytes);
> +
> +    if (ret != 0)
> +        virDispatchError(stream->conn);
> +
> +    return ret;
> +}
> +
> +/**
>   * virStreamEventAddCallback:
>   * @stream: pointer to the stream object
>   * @events: set of events to monitor
> diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
> index 0e34eee..008dc59 100644
> --- a/src/libvirt_public.syms
> +++ b/src/libvirt_public.syms
> @@ -764,6 +764,7 @@ LIBVIRT_3.3.0 {
>          virStreamHoleSize;
>          virStreamRecvFlags;
>          virStreamSkip;
> +        virStreamSparseRecvAll;
>  } LIBVIRT_3.1.0;
>  
>  # .... define new API here using predicted next version number ....
> 




More information about the libvir-list mailing list