[libvirt] [PATCH 01/11] Add public API definition for data stream handling

Daniel P. Berrange berrange at redhat.com
Fri Sep 25 11:25:50 UTC 2009


On Fri, Sep 25, 2009 at 12:09:34PM +0200, Daniel Veillard wrote:
> On Mon, Aug 24, 2009 at 09:51:04PM +0100, Daniel P. Berrange wrote:
> > @@ -1448,6 +1466,81 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle,
> >                            virEventAddTimeoutFunc addTimeout,
> >                            virEventUpdateTimeoutFunc updateTimeout,
> >                            virEventRemoveTimeoutFunc removeTimeout);
> > +
> > +enum {
> > +    VIR_STREAM_NONBLOCK = (1 << 0),
> > +};
> > +
> > +virStreamPtr virStreamNew(virConnectPtr conn,
> > +                          unsigned int flags);
> 
>  Would flags be sufficient if we were to encode some priorities to
> streams? If we end up limiting the number of active streams, giving
> higher priority or some reserved slots for quicker operations may
> be important, flags should be sufficient for this if the need arise
> so I don't see a problem but I raise the point.

I guess it would be sufficient if you wanted todo LOW/MEDIUM/HIGH
static priorties.

If we wanted to get more advanced, we could always introduce an
extra API, since there is always a point between virStreamNew()
and then using it, eg in virConnectUploadFile(), when you can
do more setup calls. We already allow event callbacks to be
registered this way, so we could in future add a 

   virStreamSetPriority(virStreamPtr st, ....options ...);

if we needed more than just plain flags.


On the subject of flags though, I've never been entirely sure
about whether it would be worth mandating the use of a flag(s)
for indicating I/O direction at time of stream creation. 
Currently this is implicit base on the API that the stream is
later used with,

eg, currently

     virStreamPtr st = virStreamNew(conn, 0);

     virConnectUploadFile(conn, st, filename);

implicitly configures the stream for writing, but I constantly
wonder whether we ought to make it explicit via a flag like

    virStreamPtr st = virStreamNew(conn, VIR_STREAM_WRITE);

    virConnectUploadFile(conn, st, filename);

and require that the call of virStreamNew() provide either
VIR_STREAM_WRITE, or VIR_STREAM_READ, or both. ANd then also
have the methods using streams like virConnectUploadFile
check that the flags match.

If we wanted to mandate use of READ/WRITE flags for stream
creation, we'd obviously need todo it from teh start, since
we couldn't add that as a mandatory flag once the API is
released & in use by apps.

> 
> > +int virStreamRef(virStreamPtr st);
> > +
> > +int virStreamSend(virStreamPtr st,
> > +                  const char *data,
> > +                  size_t nbytes);
> > +
> > +int virStreamRecv(virStreamPtr st,
> > +                  char *data,
> > +                  size_t nbytes);
> > +
> > +
> > +typedef int (*virStreamSourceFunc)(virStreamPtr st,
> > +                                   char *data,
> > +                                   size_t nbytes,
> > +                                   void *opaque);
> 
>   I think the signature is fine but we need to document all the
> arguments and the return values.
> 
> > +int virStreamSendAll(virStreamPtr st,
> > +                     virStreamSourceFunc handler,
> > +                     void *opaque);
> 
>   Hum, I had to look at the comment to really understand. I'm not
> 100% sure, maybe we should allow for handler() to be called multiple
> time, not just once.
>   For example I would allow virStreamSendAll() code to call handler
> multiple time, until the handler like a read() returns 0 (or -1 on
> error), in any case the signature should be documented fully.

This method is essentially just a convenient way to call the
virStreamSend() in a loop, for apps that are happy todo blocking
data I/O. As such the handler() will definitely be called multiple
times to fetch the data to be sent.  You can see how it was used
later on in this patch, where virStreamSendAll is implemented.

I expect most apps would use virStreamSend() though, to allow
them todo interruptible & non-blocking I/O


> > +typedef int (*virStreamSinkFunc)(virStreamPtr st,
> > +                                 const char *data,
> > +                                 size_t nbytes,
> > +                                 void *opaque);
> 
>   Same thing do we allow a sink function to be called repeatedly ?
> If we want to allow this in some ways we will need an extra argument
> to indicate the end of the stream. Even if we don't plan this yet, I
> would suggest to add a flags to allow for this possibility in the
> future. With a chunk size of 256K at the protocol level it may not
> be a good idea to keep the full data in memory, so I would allow
> for this interface to call the sink multiple times. And IMHO it's best
> to pass the indication of end of transfer directly at the sink level
> rather than wait for the virStreamFree() coming from the user.
> 
> > +int virStreamRecvAll(virStreamPtr st,
> > +                     virStreamSinkFunc handler,
> > +                     void *opaque);
>
>   Okay

Same as for SendAll, this API will invoke the handler multilpe
times to write out data that is being received. In both cases
the implementation is invoking the handler with 64kb buffers
to avoid pulling lots of data into memory.

> > +typedef enum {
> > +    VIR_STREAM_EVENT_READABLE  = (1 << 0),
> > +    VIR_STREAM_EVENT_WRITABLE  = (1 << 1),
> > +    VIR_STREAM_EVENT_ERROR     = (1 << 2),
> > +    VIR_STREAM_EVENT_HANGUP    = (1 << 3),
> > +} virStreamEventType;
> > +
> > +
> > +/**
> > + * virStreamEventCallback:
> > + *
> > + * @stream: stream on which the event occurred
> > + * @events: bitset of events from virEventHandleType constants
> > + * @opaque: user data registered with handle
> > + *
> > + * Callback for receiving stream events. The callback will
> > + * be invoked once for each event which is pending.
> > + */
> > +typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque);
> > +
> > +int virStreamEventAddCallback(virStreamPtr stream,
> > +                              int events,
> > +                              virStreamEventCallback cb,
> > +                              void *opaque,
> > +                              virFreeCallback ff);
> > +
> > +int virStreamEventUpdateCallback(virStreamPtr stream,
> > +                                 int events);
> > +
> > +int virStreamEventRemoveCallback(virStreamPtr stream);
> > +
> > +
> > +int virStreamFinish(virStreamPtr st);
> > +int virStreamAbort(virStreamPtr st);
> 
> For those 2 maybe add a flag, to allow for example background disconnection.
> Even if the stream wasn't created ASYNC, we may want sometime to
> abruptly end without waiting.

The virStreamAbort() operation is always asynchronous, regardless of
whether the stream is non-blocking.  The virStreamFinish() operation
has to be synchronous, because for it  to be useful you need to do
a round-trip to flush any error being sent back from the server. So
I don't think we need any flags here. If you want to close it abruptly
then virStreamAbort() is suitable, if you want to close it cleanly
then virStreamFinish() is suitable.

> > +/**
> > + * virStreamSendAll:
> > + * @stream: pointer to the stream object
> > + * @handler: source callback for reading data from application
> > + * @opaque: application defined data
> > + *
> > + * Send the entire data stream, reading the data from the
> > + * requested data source. This is simply a convenient alternative
> > + * to virStreamSend, for apps that do blocking-I/o.
> > + *
> > + * A example using this with a hypothetical file upload
> > + * API looks like
> > + *
> > + *   int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) {
> > + *       int *fd = opaque;
> > + *
> > + *       return read(*fd, buf, nbytes);
> > + *   }
> > + *
> > + *   virStreamPtr st = virStreamNew(conn, 0);
> > + *   int fd = open("demo.iso", O_RDONLY)
> > + *
> > + *   virConnectUploadFile(conn, st);
> > + *   virStreamSendAll(st, mysource, &fd);
> > + *   virStreamFree(st);
> > + *   close(fd);
> 
> 
>  Hum, the clasic example of blocking I/Os is if people use multithreaded
> apps.
>   What is a second thread calls calls virStreamAbort() while
> virStreamSendAll() is in progress, e.g. the user clicked on an abort
> button from the UI ?
>   Same question for virStreamRecvAll() ?

If you want to abort a stream part way  through, then you should not
be using virStreamSendAll/RecvAll. These APIs are optional convenience
APIs for apps which are happy todo a blocking operation. Apps which
need ability to abort part way through can use the virStreamRecv()
and virStreamSend() APIs instead.

That all said, it is safe for another thread to call virStreamAbort(),
it will cause this Send/RecvAll method to return an error on the next 
virStreamSend/Recv call it makes

> 
> > + * Returns 0 if all the data was succesfully sent. The stream
> > + * will be marked as finished on success, so the caller need
> > + * only call virStreamFree().
> > + *
> > + * Returns -1 upon any error, with the stream being marked as
> > + * aborted, so the caller need only call virStreamFree()
> > + */
> > +int virStreamSendAll(virStreamPtr stream,
> > +                     virStreamSourceFunc handler,
> > +                     void *opaque)
> 
> 
> > + *   virConnectUploadFile(conn, st);
> > + *   virStreamRecvAll(st, mysink, &fd);
> > + *   virStreamFree(st);
> > + *   close(fd);
> 
>  Would the current API allow for the sink callback to close the fd()
> at the end of the transfer ? Right now, I don't think so because we
> don't know what is the last callback (assuming multiple ones).

The sink/source callbacks do not need to close the FD since this is easily
done with RecvAll/SendAll returns control to the application. THis is
in fact important, because it is not until RecvAll/SendAll returns that
you can call virStreamFinish to check for success. If it did not suceed
then you may want do other cleanup before closing the FD, such as 
unlinking the file

Regards,
Daniel
-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|




More information about the libvir-list mailing list