[libvirt] [PATCH v2 6/6] libvirtaio: add .drain() coroutine

Daniel P. Berrange berrange at redhat.com
Mon Sep 25 13:53:01 UTC 2017


On Thu, Sep 14, 2017 at 02:41:12AM +0200, Wojtek Porczyk wrote:
> The intended use is to ensure that the implementation is empty, which is
> one way to ensure that all connections were properly closed and file
> descriptors reclaimed.
> 
> Signed-off-by: Wojtek Porczyk <woju at invisiblethingslab.com>
> ---
>  libvirtaio.py | 36 ++++++++++++++++++++++++++++++++++--
>  1 file changed, 34 insertions(+), 2 deletions(-)
> 
> diff --git a/libvirtaio.py b/libvirtaio.py
> index 97a7f6c..1c432dd 100644
> --- a/libvirtaio.py
> +++ b/libvirtaio.py
> @@ -269,10 +269,27 @@ class virEventAsyncIOImpl(object):
>          self.descriptors = DescriptorDict(self)
>          self.log = logging.getLogger(self.__class__.__name__)
>  
> +        # NOTE invariant: _finished.is_set() iff _pending == 0
> +        self._pending = 0
> +        self._finished = asyncio.Event(loop=loop)
> +        self._finished.set()
> +
>      def __repr__(self):
>          return '<{} callbacks={} descriptors={}>'.format(
>              type(self).__name__, self.callbacks, self.descriptors)
>  
> +    def _pending_inc(self):
> +        '''Increase the count of pending affairs. Do not use directly.'''
> +        self._pending += 1
> +        self._finished.clear()
> +
> +    def _pending_dec(self):
> +        '''Decrease the count of pending affairs. Do not use directly.'''
> +        assert self._pending > 0
> +        self._pending -= 1
> +        if self._pending == 0:
> +            self._finished.set()
> +
>      def register(self):
>          '''Register this instance as event loop implementation'''
>          # pylint: disable=bad-whitespace
> @@ -293,7 +310,20 @@ class virEventAsyncIOImpl(object):
>          This is a coroutine.
>          '''
>          self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
> -        return libvirt.virEventInvokeFreeCallback(opaque)
> +        ret = libvirt.virEventInvokeFreeCallback(opaque)
> +        self._pending_dec()
> +        return ret
> +
> +    @asyncio.coroutine
> +    def drain(self):
> +        '''Wait for the implementation to become idle.
> +
> +        This is a coroutine.
> +        '''
> +        self.log.debug('drain()')
> +        if self._pending:
> +            yield from self._finished.wait()
> +        self.log.debug('drain ended')

What is responsible for calling 'drain'  ?

>  
>      def is_idle(self):
>          '''Returns False if there are leftovers from a connection
> @@ -301,7 +331,7 @@ class virEventAsyncIOImpl(object):
>          Those may happen if there are sematical problems while closing
>          a connection. For example, not deregistered events before .close().
>          '''
> -        return not self.callbacks
> +        return not self.callbacks and not self._pending
>  
>      def _add_handle(self, fd, event, cb, opaque):
>          '''Register a callback for monitoring file handle events
> @@ -324,6 +354,7 @@ class virEventAsyncIOImpl(object):
>                  fd, event, callback.iden)
>          self.callbacks[callback.iden] = callback
>          self.descriptors[fd].add_handle(callback)
> +        self._pending_inc()
>          return callback.iden
>  
>      def _update_handle(self, watch, event):
> @@ -378,6 +409,7 @@ class virEventAsyncIOImpl(object):
>                  timeout, callback.iden)
>          self.callbacks[callback.iden] = callback
>          callback.update(timeout=timeout)
> +        self._pending_inc()
>          return callback.iden
>  
>      def _update_timeout(self, timer, timeout):

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|




More information about the libvir-list mailing list