[libvirt] [PATCH 2/3] libvirtaio: add allow for moving callbacks to other event loop

Daniel P. Berrange berrange at redhat.com
Fri Sep 1 09:08:18 UTC 2017


On Thu, Aug 31, 2017 at 09:40:23PM +0200, Wojtek Porczyk wrote:
> The virEvent implementation is tied to a particular loop. When spinning
> another loop, the callbacks have to be moved to another implementation,
> so they will have a chance to be invoked, should they be scheduled. If
> not, file descriptors will be leaking.
> 
> Signed-off-by: Wojtek Porczyk <woju at invisiblethingslab.com>
> ---
>  libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------
>  1 file changed, 51 insertions(+), 13 deletions(-)
> 
> diff --git a/libvirtaio.py b/libvirtaio.py
> index fc868bd..d161cd1 100644
> --- a/libvirtaio.py
> +++ b/libvirtaio.py
> @@ -195,9 +195,10 @@ class FDCallback(Callback):
>          return '<{} iden={} fd={} event={}>'.format(
>              self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
>  
> -    def update(self, event):
> +    def update(self, event=None):
>          '''Update the callback and fix descriptor's watchers'''
> -        self.event = event
> +        if event is not None:
> +            self.event = event
>          self.descriptor.update()
>  
>  #
> @@ -238,20 +239,21 @@ class TimeoutCallback(Callback):
>              self.cb(self.iden, self.opaque)
>              self.impl.log.debug('timer %r callback ended', self.iden)
>  
> -    def update(self, timeout):
> +    def update(self, timeout=None):
>          '''Start or the timer, possibly updating timeout'''
> -        self.timeout = timeout
> -
> -        if self.timeout >= 0 and self._task is None:
> -            self.impl.log.debug('timer %r start', self.iden)
> -            self._task = ensure_future(self._timer(),
> -                loop=self.impl.loop)
> +        if timeout is not None:
> +            self.timeout = timeout
>  
> -        elif self.timeout < 0 and self._task is not None:
> +        if self._task is not None:
>              self.impl.log.debug('timer %r stop', self.iden)
>              self._task.cancel()  # pylint: disable=no-member
>              self._task = None
>  
> +        if self.timeout >= 0:
> +            self.impl.log.debug('timer %r start', self.iden)
> +            self._task = ensure_future(self._timer(),
> +                loop=self.impl.loop)
> +
>      def close(self):
>          '''Stop the timer and call ff callback'''
>          super(TimeoutCallback, self).close()
> @@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object):
>          self.callbacks = {}
>          self.descriptors = DescriptorDict(self)
>          self.log = logging.getLogger(self.__class__.__name__)
> +        self.pending_tasks = set()
>  
>      def register(self):
>          '''Register this instance as event loop implementation'''
> @@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object):
>              self._add_timeout, self._update_timeout, self._remove_timeout)
>          return self
>  
> +    def takeover(self, other):
> +        '''Take over other implementation, probably registered on another loop
> +
> +        :param virEventAsyncIOImpl other: other implementation to be taken over
> +        '''
> +        self.log.warning('%r taking over %r', self, other)
> +
> +        while other.callbacks:
> +            iden, callback = other.callbacks.popitem()
> +            self.log.debug('  takeover %d %r', iden, callback)
> +            assert callback.iden == iden
> +            callback.impl = self
> +            self.callbacks[iden] = callback
> +
> +            if isinstance(callback, FDCallback):
> +                fd = callback.descriptor.fd
> +                assert callback is other.descriptors[fd].remove_handle(iden)
> +                self.descriptors[fd].add_handle(callback)
> +
>      def schedule_ff_callback(self, iden, opaque):
>          '''Schedule a ff callback from one of the handles or timers'''
> -        ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
> +        fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
> +        self.pending_tasks.add(fut)
> +        fut.add_done_callback(self.pending_tasks.remove)
>  
>      @asyncio.coroutine
>      def _ff_callback(self, iden, opaque):
> @@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object):
>          self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
>          return libvirt.virEventInvokeFreeCallback(opaque)
>  
> +    @asyncio.coroutine
> +    def drain(self):
> +        self.log.debug('drain()')
> +        if self.pending_tasks:
> +            yield from asyncio.wait(self.pending_tasks, loop=self.loop)
> +
>      def is_idle(self):
>          '''Returns False if there are leftovers from a connection
>  
>          Those may happen if there are sematical problems while closing
>          a connection. For example, not deregistered events before .close().
>          '''
> -        return not self.callbacks
> +        return not self.callbacks and not self.pending_tasks
>  
>      def _add_handle(self, fd, event, cb, opaque):
>          '''Register a callback for monitoring file handle events
> @@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object):
>          callback = self.callbacks.pop(timer)
>          callback.close()
>  
> +
> +_current_impl = None
>  def virEventRegisterAsyncIOImpl(loop=None):
>      '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
>  
>      The implementation object is returned, but in normal usage it can safely be
>      discarded.
>      '''
> -    return virEventAsyncIOImpl(loop=loop).register()
> +    global _current_impl
> +    impl = virEventAsyncIOImpl(loop=loop)
> +    impl.register()
> +    if _current_impl is not None:
> +        impl.takeover(_current_impl)
> +    _current_impl = impl
> +    return impl

IIUC, you are trying to make it possible to register multiple event
loop impls.  This is *not* supported usage of libvirt. You must
call 'virEventRegisterImpl' before opening any connection, and once
called you are forbidden to call it again.

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|




More information about the libvir-list mailing list