[libvirt-users] Trouble using virStream with callbacks

Jonathan Lebon jlebon at redhat.com
Wed Sep 18 21:14:31 UTC 2013


I am trying to write a simple app which connects a channel obtained
from virDomainOpenChannel() to stdin/stdout (based in part on the
snippet at [1]). However, it seems like the data received back from
the stream is delayed by one iteration. It would be hard to explain
this by simply showing the output, so here's a timeline instead:

1. start the program on the host
2. write "msg from host<Enter>"
3. socat on the guest sees this right away
4. from the guest's socat, write "msg from guest<Enter>"
5. the reply does not show up on the host until AFTER you press
   <Enter> (which of course also sends a newline to the guest)
6. this 'one line delay' occurs throughout the conversation,
   receiving guest replies to things sent two <Enter>s ago

Doing socat on both sides work as expected. Also, using a blocked
stream and a thread for sending and another for receiving works
as well. It would be nice however if I could get rid of the threads
in favour of callbacks. Not sure if I'm missing something obvious.

Thanks,

Jonathan

[1] https://www.redhat.com/archives/libvir-list/2012-December/msg01084.html

---

#include <ctype.h>
#include <err.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <libvirt/libvirt.h>
#include <fcntl.h>

#define CONNECT_URI "qemu:///system"
#define DOMAIN "TestVM"
#define CHANNEL "channel.0"
#define BUF_SIZE 80

int stream_active = 1;

void
stdin_to_stream(int watch, int fd, int events, void *opaque)
{
   virStreamPtr stream = *((virStreamPtr*)(opaque));
   if (events & VIR_EVENT_HANDLE_READABLE) {
      char buf[1024];
      int bytes_read = read(fd, buf, sizeof(buf));
      if (bytes_read > 0)
         virStreamSend(stream, buf, bytes_read);
   }
   if (events & (VIR_EVENT_HANDLE_ERROR|VIR_EVENT_HANDLE_HANGUP)) {
      stream_active = 0;
   }
   return;
}

void
stream_to_stdout(virStreamPtr stream, int events, void *opaque)
{
   if (events & VIR_EVENT_HANDLE_READABLE) {
      char buf[1024];
      int bytes_read = virStreamRecv(stream, buf, sizeof(buf));
      if (bytes_read > 0) {
         fwrite(buf, bytes_read, 1, stdout);
         fflush(stdout);
      }
   }
   if (events & (VIR_EVENT_HANDLE_ERROR|VIR_EVENT_HANDLE_HANGUP)) {
      stream_active = 0;
   }
   return;
}

int main(int argc, char *argv[])
{
   virConnectPtr conn;
   virDomainPtr dom;
   virStreamPtr st;
   int bytes_read;
   char buf[BUF_SIZE];

   if ((conn = virConnectOpen(CONNECT_URI)) == NULL)
      errx(1, "virConnectOpen");

   if ((dom = virDomainLookupByName(conn, DOMAIN)) == NULL)
      errx(1, "virDomainLookupByName");

   if ((st = virStreamNew(conn, VIR_STREAM_NONBLOCK)) == NULL)
      errx(1, "virStreamNew");

   if (virDomainOpenChannel(dom, CHANNEL, st, 0) == -1)
      errx(1, "virDomainOpenChannel");

   if (virEventRegisterDefaultImpl() != 0)
      errx(1, "virEventRegisterDefaultImpl");

   if (virStreamEventAddCallback(st, 1|4|8, stream_to_stdout, NULL, NULL) != 0)
      errx(1, "virStreamEventAddCallback");

   int flags = fcntl(fileno(stdin), F_GETFL) | O_NONBLOCK;
   fcntl(fileno(stdin), F_SETFL, flags);

   if (virEventAddHandle(fileno(stdin), 1|4|8, stdin_to_stream, &st, NULL) < 0)
      errx(1, "virEventAddHandle");

   while (stream_active) {
      if (virEventRunDefaultImpl() != 0) {
         errx(1, "virEventRunDefaultImpl");
         break;
      }
   }

   if (virStreamFinish(st) < 0)
      errx(1, "virStreamFinish");
   if (virStreamFree(st) < 0)
      errx(1, "virStreamFree");
   return 0;
}




More information about the libvirt-users mailing list