[libvirt] [PATCH 2/3] libvirtaio: add allow for moving callbacks to other event loop
Daniel P. Berrange
berrange at redhat.com
Fri Sep 1 09:08:18 UTC 2017
On Thu, Aug 31, 2017 at 09:40:23PM +0200, Wojtek Porczyk wrote:
> The virEvent implementation is tied to a particular loop. When spinning
> another loop, the callbacks have to be moved to another implementation,
> so they will have a chance to be invoked, should they be scheduled. If
> not, file descriptors will be leaking.
>
> Signed-off-by: Wojtek Porczyk <woju at invisiblethingslab.com>
> ---
> libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------
> 1 file changed, 51 insertions(+), 13 deletions(-)
>
> diff --git a/libvirtaio.py b/libvirtaio.py
> index fc868bd..d161cd1 100644
> --- a/libvirtaio.py
> +++ b/libvirtaio.py
> @@ -195,9 +195,10 @@ class FDCallback(Callback):
> return '<{} iden={} fd={} event={}>'.format(
> self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
>
> - def update(self, event):
> + def update(self, event=None):
> '''Update the callback and fix descriptor's watchers'''
> - self.event = event
> + if event is not None:
> + self.event = event
> self.descriptor.update()
>
> #
> @@ -238,20 +239,21 @@ class TimeoutCallback(Callback):
> self.cb(self.iden, self.opaque)
> self.impl.log.debug('timer %r callback ended', self.iden)
>
> - def update(self, timeout):
> + def update(self, timeout=None):
> '''Start or the timer, possibly updating timeout'''
> - self.timeout = timeout
> -
> - if self.timeout >= 0 and self._task is None:
> - self.impl.log.debug('timer %r start', self.iden)
> - self._task = ensure_future(self._timer(),
> - loop=self.impl.loop)
> + if timeout is not None:
> + self.timeout = timeout
>
> - elif self.timeout < 0 and self._task is not None:
> + if self._task is not None:
> self.impl.log.debug('timer %r stop', self.iden)
> self._task.cancel() # pylint: disable=no-member
> self._task = None
>
> + if self.timeout >= 0:
> + self.impl.log.debug('timer %r start', self.iden)
> + self._task = ensure_future(self._timer(),
> + loop=self.impl.loop)
> +
> def close(self):
> '''Stop the timer and call ff callback'''
> super(TimeoutCallback, self).close()
> @@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object):
> self.callbacks = {}
> self.descriptors = DescriptorDict(self)
> self.log = logging.getLogger(self.__class__.__name__)
> + self.pending_tasks = set()
>
> def register(self):
> '''Register this instance as event loop implementation'''
> @@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object):
> self._add_timeout, self._update_timeout, self._remove_timeout)
> return self
>
> + def takeover(self, other):
> + '''Take over other implementation, probably registered on another loop
> +
> + :param virEventAsyncIOImpl other: other implementation to be taken over
> + '''
> + self.log.warning('%r taking over %r', self, other)
> +
> + while other.callbacks:
> + iden, callback = other.callbacks.popitem()
> + self.log.debug(' takeover %d %r', iden, callback)
> + assert callback.iden == iden
> + callback.impl = self
> + self.callbacks[iden] = callback
> +
> + if isinstance(callback, FDCallback):
> + fd = callback.descriptor.fd
> + assert callback is other.descriptors[fd].remove_handle(iden)
> + self.descriptors[fd].add_handle(callback)
> +
> def schedule_ff_callback(self, iden, opaque):
> '''Schedule a ff callback from one of the handles or timers'''
> - ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
> + fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
> + self.pending_tasks.add(fut)
> + fut.add_done_callback(self.pending_tasks.remove)
>
> @asyncio.coroutine
> def _ff_callback(self, iden, opaque):
> @@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object):
> self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
> return libvirt.virEventInvokeFreeCallback(opaque)
>
> + @asyncio.coroutine
> + def drain(self):
> + self.log.debug('drain()')
> + if self.pending_tasks:
> + yield from asyncio.wait(self.pending_tasks, loop=self.loop)
> +
> def is_idle(self):
> '''Returns False if there are leftovers from a connection
>
> Those may happen if there are sematical problems while closing
> a connection. For example, not deregistered events before .close().
> '''
> - return not self.callbacks
> + return not self.callbacks and not self.pending_tasks
>
> def _add_handle(self, fd, event, cb, opaque):
> '''Register a callback for monitoring file handle events
> @@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object):
> callback = self.callbacks.pop(timer)
> callback.close()
>
> +
> +_current_impl = None
> def virEventRegisterAsyncIOImpl(loop=None):
> '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
>
> The implementation object is returned, but in normal usage it can safely be
> discarded.
> '''
> - return virEventAsyncIOImpl(loop=loop).register()
> + global _current_impl
> + impl = virEventAsyncIOImpl(loop=loop)
> + impl.register()
> + if _current_impl is not None:
> + impl.takeover(_current_impl)
> + _current_impl = impl
> + return impl
IIUC, you are trying to make it possible to register multiple event
loop impls. This is *not* supported usage of libvirt. You must
call 'virEventRegisterImpl' before opening any connection, and once
called you are forbidden to call it again.
Regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
More information about the libvir-list
mailing list