[libvirt] [libvirt-java] [PATCH 1/3] Implement interface ByteChannel for Stream class

Claudio Bley cbley at av-test.de
Wed Jan 8 16:02:47 UTC 2014


This makes the Stream class a native citizen of the Java API.

It can be used with the NIO Channel API, as well as (In,Out)putStream's
using the java.nio.channels.Channels convenience wrappers.
---
 src/main/java/org/libvirt/Stream.java      | 171 ++++++++++++++++++++++++++++-
 src/main/java/org/libvirt/jna/Libvirt.java |   6 +-
 2 files changed, 172 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/libvirt/Stream.java b/src/main/java/org/libvirt/Stream.java
index 404c9a0..975e1b6 100644
--- a/src/main/java/org/libvirt/Stream.java
+++ b/src/main/java/org/libvirt/Stream.java
@@ -1,12 +1,48 @@
 package org.libvirt;
 
+import java.io.IOException;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonReadableChannelException;
+import java.nio.channels.NonWritableChannelException;
+
 import org.libvirt.jna.Libvirt;
 import org.libvirt.jna.StreamPointer;
 import static org.libvirt.Library.libvirt;
 
 import com.sun.jna.NativeLong;
 
-public class Stream {
+/**
+ * The Stream class is used to transfer data between a libvirt daemon
+ * and a client.
+ * <p>
+ * It implements the ByteChannel interface.
+ * <p>
+ * Basic usage:
+ *
+ * <pre>
+ * {@code
+ * ByteBuffer buf = ByteBuffer.allocate(1024);
+ * Stream str = conn.streamNew(0);
+ *
+ * ... // open the stream e.g. calling Domain.screenshot
+ *
+ * while (str.read(buf) != -1) {
+ *     buf.flip();
+ *     ... // do something with the data
+ *     buf.compact();
+ * }
+ * }
+ * </pre>
+ * <p>
+ * If you want to use this class as an InputStream or OutputStream,
+ * convert it using the {@link java.nio.channels.Channels#newInputStream
+ *  Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream
+ *  Channels.newOutputStream} respectively.
+ */
+public class Stream implements ByteChannel {
 
     public static int VIR_STREAM_NONBLOCK = (1 << 0);
 
@@ -20,6 +56,56 @@ public class Stream {
      */
     private Connect virConnect;
 
+    private final static int CLOSED   =  0;
+    private final static int READABLE =  1;
+    private final static int WRITABLE =  2;
+    private final static int OPEN     = READABLE | WRITABLE;
+    private final static int EOF      =  4;
+
+    /* The status of the stream. A stream starts its live in the
+     * "CLOSED" state.
+     *
+     * It will be opened for input / output by another libvirt
+     * operation (e.g. virStorageVolDownload), which means it will
+     * be in state "READABLE" or "WRITABLE", exclusively.
+     *
+     * It will reach state "EOF", if {@link finish()} is called.
+     *
+     * It will be in the "CLOSED" state again, after calling abort()
+     * or close().
+     */
+    private int state = CLOSED;
+
+    void markReadable() {
+        assert !isWritable()
+            : "A Stream cannot be readable and writable at the same time";
+
+        state |= READABLE;
+    }
+
+    void markWritable() {
+        assert !isReadable()
+            : "A Stream cannot be readable and writable at the same time";
+
+        state |= WRITABLE;
+    }
+
+    boolean isReadable() {
+        return (state & READABLE) != 0;
+    }
+
+    boolean isWritable() {
+        return (state & WRITABLE) != 0;
+    }
+
+    protected boolean isEOF() {
+        return (state & EOF) != 0;
+    }
+
+    private void markEOF() {
+        state |= EOF;
+    }
+
     Stream(Connect virConnect, StreamPointer VSP) {
         this.virConnect = virConnect;
         this.VSP = VSP;
@@ -32,6 +118,7 @@ public class Stream {
     public int abort() throws LibvirtException {
         int returnValue = libvirt.virStreamAbort(VSP);
         processError();
+        this.state = CLOSED;
         return returnValue;
     }
 
@@ -70,6 +157,7 @@ public class Stream {
     public int finish() throws LibvirtException {
         int returnValue = libvirt.virStreamFinish(VSP);
         processError();
+        markEOF();
         return returnValue;
     }
 
@@ -83,6 +171,7 @@ public class Stream {
     public int free() throws LibvirtException {
         int success = 0;
         if (VSP != null) {
+            closeStream();
             success = libvirt.virStreamFree(VSP);
             processError();
             VSP = null;
@@ -108,11 +197,82 @@ public class Stream {
      * @throws LibvirtException
      */
     public int receive(byte[] data) throws LibvirtException {
-        int returnValue = libvirt.virStreamRecv(VSP, data, new NativeLong(data.length));
+        return receive(ByteBuffer.wrap(data));
+    }
+
+    protected int receive(ByteBuffer buffer) throws LibvirtException {
+        int returnValue = libvirt.virStreamRecv(VSP, buffer, new NativeLong(buffer.remaining()));
         processError();
+        buffer.position(buffer.position() + returnValue);
         return returnValue;
     }
 
+    @Override
+    public int read(ByteBuffer buffer) throws IOException {
+        if (!isOpen()) throw new ClosedChannelException();
+        if (!isReadable()) throw new NonReadableChannelException();
+        if (isEOF()) return -1;
+
+        try {
+            int ret = receive(buffer);
+
+            switch (ret) {
+            case 0:
+                finish();
+                return -1;
+
+            case -2:
+                throw new UnsupportedOperationException("non-blocking I/O stream not yet supported");
+
+            default:
+                return ret;
+            }
+        } catch (LibvirtException e) {
+            throw new IOException("could not read from stream", e);
+        }
+    }
+
+    @Override
+    public int write(ByteBuffer buffer) throws IOException {
+        if (!isOpen()) throw new ClosedChannelException();
+        if (!isWritable()) throw new NonWritableChannelException();
+
+        int pos = buffer.position();
+
+        try {
+            while (buffer.hasRemaining()) {
+                int ret = send(buffer);
+
+                if (ret == -2)
+                    throw new UnsupportedOperationException("non-blocking I/O stream not yet supported");
+            }
+            return buffer.position() - pos;
+        } catch (LibvirtException e) {
+            throw new IOException("could not write to stream", e);
+        }
+    }
+
+    protected void closeStream() throws LibvirtException {
+        if (isOpen() && !isEOF()) {
+            abort();
+        }
+        this.state = CLOSED;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            closeStream();
+        } catch (LibvirtException e) {
+            throw new IOException("error while closing Stream", e);
+        }
+    }
+
+    @Override
+    public boolean isOpen() {
+        return (this.state & OPEN) != 0;
+    }
+
     /**
      * Batch receive method
      *
@@ -174,8 +334,13 @@ public class Stream {
      * @since  1.5.2
      */
     public int send(byte[] data) throws LibvirtException {
-        int returnValue = libvirt.virStreamSend(VSP, data, new NativeLong(data.length));
+        return send(ByteBuffer.wrap(data));
+    }
+
+    protected int send(ByteBuffer buffer) throws LibvirtException {
+        int returnValue = libvirt.virStreamSend(VSP, buffer, new NativeLong(buffer.remaining()));
         processError();
+        buffer.position(buffer.position() + returnValue);
         return returnValue;
     }
 
diff --git a/src/main/java/org/libvirt/jna/Libvirt.java b/src/main/java/org/libvirt/jna/Libvirt.java
index 98f2125..fe74087 100644
--- a/src/main/java/org/libvirt/jna/Libvirt.java
+++ b/src/main/java/org/libvirt/jna/Libvirt.java
@@ -8,6 +8,8 @@ import com.sun.jna.Pointer;
 import com.sun.jna.ptr.IntByReference;
 import com.sun.jna.ptr.LongByReference;
 
+import java.nio.ByteBuffer;
+
 /**
  * The libvirt interface which is exposed via JNA. The complete API is
  * documented at http://www.libvirt.org/html/libvirt-libvirt.html.
@@ -368,9 +370,9 @@ public interface Libvirt extends Library {
     int virStreamFinish(StreamPointer virStreamPtr) ;
     int virStreamFree(StreamPointer virStreamPtr) ;
     StreamPointer virStreamNew(ConnectionPointer virConnectPtr, int flags) ;
-    int virStreamSend(StreamPointer virStreamPtr, byte[] data, NativeLong size);
+    int virStreamSend(StreamPointer virStreamPtr, ByteBuffer data, NativeLong size);
     int virStreamSendAll(StreamPointer virStreamPtr, Libvirt.VirStreamSourceFunc handler, Pointer opaque);
-    int virStreamRecv(StreamPointer virStreamPtr, byte[] data, NativeLong length);
+    int virStreamRecv(StreamPointer virStreamPtr, ByteBuffer data, NativeLong length);
     int virStreamRecvAll(StreamPointer virStreamPtr, Libvirt.VirStreamSinkFunc handler, Pointer opaque);
 
     //DomainSnapshot Methods
-- 
1.8.5.2.msysgit.0




More information about the libvir-list mailing list