[libvirt] [PATCH v2 30/38] virNetClientStream: Wire up VIR_NET_STREAM_SKIP
Michal Privoznik
mprivozn at redhat.com
Tue May 9 08:17:05 UTC 2017
On 05/05/2017 05:43 PM, John Ferlan wrote:
>
>
> On 04/20/2017 06:01 AM, Michal Privoznik wrote:
>> Whenever server sends a client stream packet (either regular with
>> actual data or stream skip one) it is queued on @st->rx. So the
>> list is a mixture of both types of stream packets. So now that we
>> have all the helpers needed we can wire their processing up. But
>> since virNetClientStreamRecvPacket doesn't support
>> VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received
>> skips into zeroes repeating requested times.
>>
>
> Up to this point - I thought I had a good idea of what was going on, but
> this patch loses me.
>
> I thought there was an "ordered" message queue to be processed...
> ordered in so much as we're "reading" a file and sending either a 'data'
> segment w/ data or a 'skip' segment with some number of bytes to skip.
> Where it could be:
>
> start...skip...data...[skip...data...]end
> start...data...skip...[data...skip...]end
> start...data...end
> start...skip...end
>
> So why does the code process and sum up the skips?
Because our streams are slightly more generic than just a file transfer.
I mean, we use it for just any data transfer. Therefore it could be used
even for this
start -> data -> skip -> skip -> data -> end.
You won't have this with regular files stored on a disk, but then again,
virStream can be used (and is used) for generic data transfer.
vol-download an vol-upload is just a subset of its usage.
Now, with that example, it makes sense to merge those skips into one
big, doesn't it? But I agree, it looks a bit weird. I can drop the
merging - it's not a performance issue. I was just trying to save caller
couple of calls to our APIs.
>
> Is this because of some implementation detail (that I already forgot) of
> the message queue where signals are done after each "data..." or
> "skip...", so it won't matter?
>
> Why not have everything you need in place before you wire this up -
> patch 25 and 30 seem to be able to go after "most of" patch 31.
>
> John
>
> I'm going to stop here.
Fair enough. I'm gonna send v2 with most of your review points worked
in. Except, for now I'm still gonna use Skip() and HoleSize() - changing
that now would be a non-trivial piece of work and thus I'd like to do it
just once, when we have clear agreement on naming. Hope you understand that.
>
>> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
>> ---
>> src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++--
>> 1 file changed, 42 insertions(+), 2 deletions(-)
>>
>> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
>> index c773524..ff35137 100644
>> --- a/src/rpc/virnetclientstream.c
>> +++ b/src/rpc/virnetclientstream.c
>> @@ -296,6 +296,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
>>
>> virObjectLock(st);
>>
>> + /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
>> + * here just yet. We want in order processing! */
>> virNetMessageQueuePush(&st->rx, tmp_msg);
>>
>> virNetClientStreamEventTimerUpdate(st);
>> @@ -359,7 +361,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
>> }
>>
>>
>> -static int ATTRIBUTE_UNUSED
>> +static int
>> virNetClientStreamHandleSkip(virNetClientPtr client,
>> virNetClientStreamPtr st)
>> {
>> @@ -435,6 +437,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>> virCheckFlags(0, -1);
>>
>> virObjectLock(st);
>> +
>> + reread:
>> if (!st->rx && !st->incomingEOF) {
>> virNetMessagePtr msg;
>> int ret;
>> @@ -466,8 +470,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>> }
>>
>> VIR_DEBUG("After IO rx=%p", st->rx);
>> +
>> + while (st->rx &&
>> + st->rx->header.type == VIR_NET_STREAM_SKIP) {
>> + /* Handle skip sent to us by server. */
>> +
>> + if (virNetClientStreamHandleSkip(client, st) < 0)
>> + goto cleanup;
You can see my reasoning from above in effect here.
>> + }
>> +
>> + if (!st->rx && !st->incomingEOF && !st->skipLength) {
>> + if (nonblock) {
>> + VIR_DEBUG("Non-blocking mode and no data available");
>> + rv = -2;
>> + goto cleanup;
>> + }
>> +
>> + /* We have consumed all packets from incoming queue but those
>> + * were only skip packets, no data. Read the stream again. */
>> + goto reread;
>> + }
>> +
>> want = nbytes;
>> - while (want && st->rx) {
>> +
>> + if (st->skipLength) {
>> + /* Pretend skipLength zeroes was read from stream. */
>> + size_t len = want;
>> +
>> + if (len > st->skipLength)
>> + len = st->skipLength;
>> +
>> + memset(data, 0, len);
>> + st->skipLength -= len;
>> + want -= len;
>> + }
>> +
>> + while (want &&
>> + st->rx &&
>> + st->rx->header.type == VIR_NET_STREAM) {
>> virNetMessagePtr msg = st->rx;
>> size_t len = want;
>>
>>
Michal
More information about the libvir-list
mailing list