[libvirt] [PATCH v2 6/6] libvirtaio: add .drain() coroutine

Wojtek Porczyk woju at invisiblethingslab.com
Thu Sep 14 00:41:12 UTC 2017


The intended use is to ensure that the implementation is empty, which is
one way to ensure that all connections were properly closed and file
descriptors reclaimed.

Signed-off-by: Wojtek Porczyk <woju at invisiblethingslab.com>
---
 libvirtaio.py | 36 ++++++++++++++++++++++++++++++++++--
 1 file changed, 34 insertions(+), 2 deletions(-)

diff --git a/libvirtaio.py b/libvirtaio.py
index 97a7f6c..1c432dd 100644
--- a/libvirtaio.py
+++ b/libvirtaio.py
@@ -269,10 +269,27 @@ class virEventAsyncIOImpl(object):
         self.descriptors = DescriptorDict(self)
         self.log = logging.getLogger(self.__class__.__name__)
 
+        # NOTE invariant: _finished.is_set() iff _pending == 0
+        self._pending = 0
+        self._finished = asyncio.Event(loop=loop)
+        self._finished.set()
+
     def __repr__(self):
         return '<{} callbacks={} descriptors={}>'.format(
             type(self).__name__, self.callbacks, self.descriptors)
 
+    def _pending_inc(self):
+        '''Increase the count of pending affairs. Do not use directly.'''
+        self._pending += 1
+        self._finished.clear()
+
+    def _pending_dec(self):
+        '''Decrease the count of pending affairs. Do not use directly.'''
+        assert self._pending > 0
+        self._pending -= 1
+        if self._pending == 0:
+            self._finished.set()
+
     def register(self):
         '''Register this instance as event loop implementation'''
         # pylint: disable=bad-whitespace
@@ -293,7 +310,20 @@ class virEventAsyncIOImpl(object):
         This is a coroutine.
         '''
         self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
-        return libvirt.virEventInvokeFreeCallback(opaque)
+        ret = libvirt.virEventInvokeFreeCallback(opaque)
+        self._pending_dec()
+        return ret
+
+    @asyncio.coroutine
+    def drain(self):
+        '''Wait for the implementation to become idle.
+
+        This is a coroutine.
+        '''
+        self.log.debug('drain()')
+        if self._pending:
+            yield from self._finished.wait()
+        self.log.debug('drain ended')
 
     def is_idle(self):
         '''Returns False if there are leftovers from a connection
@@ -301,7 +331,7 @@ class virEventAsyncIOImpl(object):
         Those may happen if there are sematical problems while closing
         a connection. For example, not deregistered events before .close().
         '''
-        return not self.callbacks
+        return not self.callbacks and not self._pending
 
     def _add_handle(self, fd, event, cb, opaque):
         '''Register a callback for monitoring file handle events
@@ -324,6 +354,7 @@ class virEventAsyncIOImpl(object):
                 fd, event, callback.iden)
         self.callbacks[callback.iden] = callback
         self.descriptors[fd].add_handle(callback)
+        self._pending_inc()
         return callback.iden
 
     def _update_handle(self, watch, event):
@@ -378,6 +409,7 @@ class virEventAsyncIOImpl(object):
                 timeout, callback.iden)
         self.callbacks[callback.iden] = callback
         callback.update(timeout=timeout)
+        self._pending_inc()
         return callback.iden
 
     def _update_timeout(self, timer, timeout):
-- 
2.9.4




More information about the libvir-list mailing list