[libvirt] [PATCH 00/27] Introduce sparse streams

Michal Privoznik mprivozn at redhat.com
Mon May 9 14:43:44 UTC 2016


On 05.05.2016 18:10, Daniel P. Berrange wrote:
> On Thu, Apr 28, 2016 at 12:04:47PM +0200, Michal Privoznik wrote:
>> <snip/>
> 
> While it looks ok-ish in the context of the RecvAll function,
> because you have skipFunc + recvFunc both being invoked
> asynchronously, this design feels a quite odd when used
> in combination with the plain virStreamRecv().
> 
> Just for a minute lets consider
> 
>  st = virStreamNew(conn, 0);
>  virStreamRegisterSkip(st, skipFunc, &fd);
>  virStorageVolDownload(st, ...);
> 
>  while (1) {
>     char buf[4096];
>     virStreamRecv(st, buf, sizeof(buf));
> 
>     ..do something with buf..
>  }
> 
> I think it is quite unpleasant to have the skipFunc be called
> out of band to the code dealing with the Recv.
> 
> 
> 
> I think it is preferrable that we have an explicit synchronous
> API for consuming the hole.
> 
> The same really applies from POV of the upload side - I think
> we should be able to synchronously push the hole in, rather
> than having the asynchronous InData callback.
> 
> IIUC, the reason you've chosen this async callback approach
> is because the virStream{Send,Recv,SendAll,RecvAll} methods
> are not extensible to cope with holes.
> 
> 
> I'm thinking though that we'd get a more attractive public API
> by creating  hole friendly versions of virStream{Send,Recv,etc}
> instead of going the extra callback route.
> 
> 
> The download side could we made to work if we had a flag for
> virStreamRecv that instructed it to only read data up to the
> next hole, and defined an explicit error code to use to indicate
> that we've hit a hole.
> 
> This would be used thus:
> 
>  while (1) {
>     char buf[4096];
> 
>     int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE);
>     if (ret == -1) {
>         virErrorPtr err = virGetLastError();
> 	if (err.code == VIR_ERR_STREAM_HOLE) {
> 	   ret = virStreamHoleSize(st, buf);

Could this be made even more friendlier by returning a special value
(say -2) in case of a hole?

> 	   ...seek ret bytes in target...
> 	} else {
> 	   return -1;
> 	}
>     } else {
>         ...write buf to target...
>     }
>  }
> 

Okay, so imagine we have STREAM_SKIP packet incoming what should happen
if it is processed by bare virStreamRead()? IOW user requests sparse
streams but sticks to calling old virStreamRecv().

One thing that I'm worried here about is that @flags of
virStreamRecvFlags() would not be transferred to the sender side as one
might expect.

> 
> To make this work with virStreamRecvAll, we would need to add
> a virStreamSparseRecvAll() which had two callbacks eg
> 
>   typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, unsigned long long len):
> 
>   virStreamSparseRecvAll(virStreamPtr stream,
>                          virStreamSinkFunc handler,
>                          virStreamSinkHoleFunc holeHandler,
>                          void *opaque) {
>     while (1) {
>       char buf[4096];
> 
>       int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE);
>       if (ret == -1) {
>         virErrorPtr err = virGetLastError();
> 	if (err.code == VIR_ERR_STREAM_HOLE) {
> 	   ret = virStreamHoleSize(st, buf);
> 	   holeHandler(st, ret);
> 	} else {
> 	   return -1;
>   	}
>       } else {
>          handler(st, buf, ret);
>       }
>    }
>  }
> 
> 
> Now considering the upload side of things, the virStreamSend
> function doesn't actually need cahnges, though it could do
> with a flags parameter for best practice. We just need the
> virStreamSkip() function you already add.
> 
>  while (1) {
>     char buf[4096];
>     if (..in hole...) {
>        ..get hole size...
>        virStreamSkip(st, len);
>     } else {
>        ...read N bytes...
>        virStreamSend(st, buf, len);
>     }
>  }
> 
> 
> The SendAll method is again more complicated as the callback we pass
> into it is insufficient to figure out if we have holes. We could
> add a virStreamSparseSendAll() which had two callbacks again:
> 
>  typedef int (*virStreamSourceHoleFunc)(holeHandler);
> 
> This returns the length of the current hole, or 0 if not at
> a hole.
> 
>  virStreamSparseSendAll(virStreamPtr stream,
>                         virStreamSourceFunc handler,
>                         virStreamSourceHoleFunc holeHandler,
>                         void *opaque)
>    while (1) {
>       char buf[4096];
>       int ret = holeHandler(st);
>       if (ret > 0) {
>          virStreamSkip(st, ret);
>       } else {
>          ret = handler(st, buf, sizeof(buf);
>          virStreamSend(st, buf, ret);
>       }
>    }
>  }
> 
> 
> So we would avoid the  virStreamInData and virStreamRegisterInData
> and virStreamRegisterSkip and virStreamSkipCallback methods

Okay. Makes sense. Let me see if I could reuse my RPC implementation,
because that had been the hardest part to implement of them all.

Michal




More information about the libvir-list mailing list