[libvirt] [PATCH 2/3] libvirtaio: add allow for moving callbacks to other event loop

Wojtek Porczyk woju at invisiblethingslab.com
Thu Aug 31 19:40:23 UTC 2017


The virEvent implementation is tied to a particular loop. When spinning
another loop, the callbacks have to be moved to another implementation,
so they will have a chance to be invoked, should they be scheduled. If
not, file descriptors will be leaking.

Signed-off-by: Wojtek Porczyk <woju at invisiblethingslab.com>
---
 libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------
 1 file changed, 51 insertions(+), 13 deletions(-)

diff --git a/libvirtaio.py b/libvirtaio.py
index fc868bd..d161cd1 100644
--- a/libvirtaio.py
+++ b/libvirtaio.py
@@ -195,9 +195,10 @@ class FDCallback(Callback):
         return '<{} iden={} fd={} event={}>'.format(
             self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
 
-    def update(self, event):
+    def update(self, event=None):
         '''Update the callback and fix descriptor's watchers'''
-        self.event = event
+        if event is not None:
+            self.event = event
         self.descriptor.update()
 
 #
@@ -238,20 +239,21 @@ class TimeoutCallback(Callback):
             self.cb(self.iden, self.opaque)
             self.impl.log.debug('timer %r callback ended', self.iden)
 
-    def update(self, timeout):
+    def update(self, timeout=None):
         '''Start or the timer, possibly updating timeout'''
-        self.timeout = timeout
-
-        if self.timeout >= 0 and self._task is None:
-            self.impl.log.debug('timer %r start', self.iden)
-            self._task = ensure_future(self._timer(),
-                loop=self.impl.loop)
+        if timeout is not None:
+            self.timeout = timeout
 
-        elif self.timeout < 0 and self._task is not None:
+        if self._task is not None:
             self.impl.log.debug('timer %r stop', self.iden)
             self._task.cancel()  # pylint: disable=no-member
             self._task = None
 
+        if self.timeout >= 0:
+            self.impl.log.debug('timer %r start', self.iden)
+            self._task = ensure_future(self._timer(),
+                loop=self.impl.loop)
+
     def close(self):
         '''Stop the timer and call ff callback'''
         super(TimeoutCallback, self).close()
@@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object):
         self.callbacks = {}
         self.descriptors = DescriptorDict(self)
         self.log = logging.getLogger(self.__class__.__name__)
+        self.pending_tasks = set()
 
     def register(self):
         '''Register this instance as event loop implementation'''
@@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object):
             self._add_timeout, self._update_timeout, self._remove_timeout)
         return self
 
+    def takeover(self, other):
+        '''Take over other implementation, probably registered on another loop
+
+        :param virEventAsyncIOImpl other: other implementation to be taken over
+        '''
+        self.log.warning('%r taking over %r', self, other)
+
+        while other.callbacks:
+            iden, callback = other.callbacks.popitem()
+            self.log.debug('  takeover %d %r', iden, callback)
+            assert callback.iden == iden
+            callback.impl = self
+            self.callbacks[iden] = callback
+
+            if isinstance(callback, FDCallback):
+                fd = callback.descriptor.fd
+                assert callback is other.descriptors[fd].remove_handle(iden)
+                self.descriptors[fd].add_handle(callback)
+
     def schedule_ff_callback(self, iden, opaque):
         '''Schedule a ff callback from one of the handles or timers'''
-        ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
+        fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
+        self.pending_tasks.add(fut)
+        fut.add_done_callback(self.pending_tasks.remove)
 
     @asyncio.coroutine
     def _ff_callback(self, iden, opaque):
@@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object):
         self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
         return libvirt.virEventInvokeFreeCallback(opaque)
 
+    @asyncio.coroutine
+    def drain(self):
+        self.log.debug('drain()')
+        if self.pending_tasks:
+            yield from asyncio.wait(self.pending_tasks, loop=self.loop)
+
     def is_idle(self):
         '''Returns False if there are leftovers from a connection
 
         Those may happen if there are sematical problems while closing
         a connection. For example, not deregistered events before .close().
         '''
-        return not self.callbacks
+        return not self.callbacks and not self.pending_tasks
 
     def _add_handle(self, fd, event, cb, opaque):
         '''Register a callback for monitoring file handle events
@@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object):
         callback = self.callbacks.pop(timer)
         callback.close()
 
+
+_current_impl = None
 def virEventRegisterAsyncIOImpl(loop=None):
     '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
 
     The implementation object is returned, but in normal usage it can safely be
     discarded.
     '''
-    return virEventAsyncIOImpl(loop=loop).register()
+    global _current_impl
+    impl = virEventAsyncIOImpl(loop=loop)
+    impl.register()
+    if _current_impl is not None:
+        impl.takeover(_current_impl)
+    _current_impl = impl
+    return impl
-- 
2.9.4
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: Digital signature
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20170831/b54356ab/attachment-0001.sig>


More information about the libvir-list mailing list