[Ovirt-devel] [PATCH server] Replace host-status with db-omatic.

Ian Main imain at redhat.com
Fri Nov 21 22:16:50 UTC 2008

This patch replaces host-status with db-omatic (formerly qpid-db-sync).
This uses the qpid console to recieve events from libvirt-qpid on various
nodes and update state in the database directly.  It also uses heartbeating
from the libvirt-qpid agents so it knows when a host or agent dies.

Signed-off-by: Ian Main <imain at redhat.com>
 conf/ovirt-db-omatic           |   53 +++++++
 conf/ovirt-host-status         |   53 -------
 ovirt-server.spec.in           |   12 +-
 scripts/ovirt-server-install   |    2 +-
 src/db-omatic/db_omatic.rb     |  316 ++++++++++++++++++++++++++++++++++++++++
 src/host-status/host-status.rb |  240 ------------------------------
 6 files changed, 376 insertions(+), 300 deletions(-)
 create mode 100755 conf/ovirt-db-omatic
 delete mode 100755 conf/ovirt-host-status
 create mode 100755 src/db-omatic/db_omatic.rb
 delete mode 100755 src/host-status/host-status.rb

diff --git a/conf/ovirt-db-omatic b/conf/ovirt-db-omatic
new file mode 100755
index 0000000..f8337e0
--- /dev/null
+++ b/conf/ovirt-db-omatic
@@ -0,0 +1,53 @@
+# ovirt-db-omatic       startup script for ovirt-db-omatic
+# chkconfig: - 97 03
+# description: ovirt-db-omatic is an essential component of the \
+#    ovirt VM manager.
+[ -r /etc/sysconfig/ovirt-rails ] && . /etc/sysconfig/ovirt-rails
+export RAILS_ENV="${RAILS_ENV:-production}"
+. /etc/init.d/functions
+start() {
+    echo -n "Starting ovirt-db-omatic: "
+    daemon $DAEMON
+    RETVAL=$?
+    echo
+stop() {
+    echo -n "Shutting down ovirt-db-omatic: "
+    killproc db_omatic.rb
+    RETVAL=$?
+    echo
+case "$1" in
+    start)
+	start
+	;;
+    stop)
+	stop
+	;;
+    restart)
+	stop
+	start
+	;;
+    status)
+	status $DAEMON
+	;;
+    *)
+      echo "Usage: ovirt-db-omatic {start|stop|restart|status}"
+      exit 1
+  ;;
+exit $RETVAL
diff --git a/conf/ovirt-host-status b/conf/ovirt-host-status
deleted file mode 100755
index 0b43552..0000000
--- a/conf/ovirt-host-status
+++ /dev/null
@@ -1,53 +0,0 @@
-# ovirt-host-status       startup script for ovirt-host-status
-# chkconfig: - 97 03
-# description: ovirt-host-status is an essential component of the \
-#    ovirt VM manager.
-[ -r /etc/sysconfig/ovirt-rails ] && . /etc/sysconfig/ovirt-rails
-export RAILS_ENV="${RAILS_ENV:-production}"
-. /etc/init.d/functions
-start() {
-    echo -n "Starting ovirt-host-status: "
-    daemon $DAEMON
-    RETVAL=$?
-    echo
-stop() {
-    echo -n "Shutting down ovirt-host-status: "
-    killproc host-status.rb
-    RETVAL=$?
-    echo
-case "$1" in
-    start)
-	start
-	;;
-    stop)
-	stop
-	;;
-    restart)
-	stop
-	start
-	;;
-    status)
-	status $DAEMON
-	;;
-    *)
-      echo "Usage: ovirt-host-status {start|stop|restart|status}"
-      exit 1
-  ;;
-exit $RETVAL
diff --git a/ovirt-server.spec.in b/ovirt-server.spec.in
index d9e3f78..be24fbf 100644
--- a/ovirt-server.spec.in
+++ b/ovirt-server.spec.in
@@ -73,11 +73,11 @@ mkdir %{buildroot}
 touch %{buildroot}%{_localstatedir}/log/%{name}/mongrel.log
 touch %{buildroot}%{_localstatedir}/log/%{name}/rails.log
 touch %{buildroot}%{_localstatedir}/log/%{name}/taskomatic.log
-touch %{buildroot}%{_localstatedir}/log/%{name}/host-status.log
+touch %{buildroot}%{_localstatedir}/log/%{name}/db-omatic.log
 %{__install} -p -m0644 %{pbuild}/conf/%{name}.conf %{buildroot}%{_sysconfdir}/httpd/conf.d
 %{__install} -Dp -m0755 %{pbuild}/conf/ovirt-host-browser %{buildroot}%{_initrddir}
-%{__install} -Dp -m0755 %{pbuild}/conf/ovirt-host-status %{buildroot}%{_initrddir}
+%{__install} -Dp -m0755 %{pbuild}/conf/ovirt-db-omatic %{buildroot}%{_initrddir}
 %{__install} -Dp -m0755 %{pbuild}/conf/ovirt-host-collect %{buildroot}%{_initrddir}
 %{__install} -Dp -m0755 %{pbuild}/conf/ovirt-mongrel-rails %{buildroot}%{_initrddir}
 %{__install} -Dp -m0755 %{pbuild}/conf/ovirt-mongrel-rails.sysconf %{buildroot}%{_sysconfdir}/sysconfig/ovirt-mongrel-rails
@@ -147,7 +147,7 @@ fi
 # on; otherwise, we respect the choices the administrator already has made.
 # check this by seeing if each daemon is already installed
 %daemon_chkconfig_post -d ovirt-host-browser
-%daemon_chkconfig_post -d ovirt-host-status
+%daemon_chkconfig_post -d ovirt-db-omatic
 %daemon_chkconfig_post -d ovirt-host-collect
 %daemon_chkconfig_post -d ovirt-mongrel-rails
 %daemon_chkconfig_post -d ovirt-taskomatic
@@ -160,12 +160,12 @@ docs_dir=$(rpm -q --queryformat '/usr/share/%{NAME}' ovirt-docs)
 if [ "$1" = 0 ] ; then
   /sbin/service ovirt-host-browser stop > /dev/null 2>&1
-  /sbin/service ovirt-host-status stop > /dev/null 2>&1
+  /sbin/service ovirt-db-omatic stop > /dev/null 2>&1
   /sbin/service ovirt-host-collect stop > /dev/null 2>&1
   /sbin/service ovirt-mongrel-rails stop > /dev/null 2>&1
   /sbin/service ovirt-taskomatic stop > /dev/null 2>&1
   /sbin/chkconfig --del ovirt-host-browser
-  /sbin/chkconfig --del ovirt-host-status
+  /sbin/chkconfig --del ovirt-db-omatic
   /sbin/chkconfig --del ovirt-host-collect
   /sbin/chkconfig --del ovirt-mongrel-rails
   /sbin/chkconfig --del ovirt-taskomatic
@@ -178,7 +178,7 @@ fi
diff --git a/scripts/ovirt-server-install b/scripts/ovirt-server-install
index afa2a64..5eb1afe 100755
--- a/scripts/ovirt-server-install
+++ b/scripts/ovirt-server-install
@@ -20,7 +20,7 @@ STEP_FILE=/etc/ntp/step-tickers
-OVIRT_SVCS="ovirt-host-browser ovirt-host-keyadd ovirt-host-status \
+OVIRT_SVCS="ovirt-host-browser ovirt-host-keyadd ovirt-db-omatic \
             ovirt-host-collect ovirt-mongrel-rails ovirt-taskomatic"
 ENABLE_SVCS="ntpdate ntpd httpd postgresql libvirtd collectd"
diff --git a/src/db-omatic/db_omatic.rb b/src/db-omatic/db_omatic.rb
new file mode 100755
index 0000000..d93e935
--- /dev/null
+++ b/src/db-omatic/db_omatic.rb
@@ -0,0 +1,316 @@
+$: << File.join(File.dirname(__FILE__), "../dutils")
+require "rubygems"
+require "qpid"
+require 'monitor'
+require 'dutils'
+require 'daemons'
+require 'optparse'
+include Daemonize
+class DbOmatic < Qpid::Qmf::Console
+    # Use monitor mixin for mutual exclusion around checks to heartbeats
+    # and updates to objects/heartbeats.
+    include MonitorMixin
+    def initialize()
+        super()
+        @cached_objects = {}
+        @heartbeats = {}
+        database_connect
+    end
+    def update_domain_state(domain, state_override = nil)
+        vm = Vm.find(:first, :conditions => [ "uuid = ?", domain['uuid'] ])
+        if vm == nil
+          puts "VM Not found in database, must be created by user.  giving up."
+          next
+        end
+        if state_override != nil
+            state = state_override
+        else
+            # FIXME: Some of these translations don't seem right.  Shouldn't we be using
+            # the libvirt states throughout ovirt?
+            case domain['state']
+                when "nostate"
+                    state = Vm::STATE_PENDING
+                when "running"
+                    state = Vm::STATE_RUNNING
+                when "blocked"
+                    state = Vm::STATE_SUSPENDED #?
+                when "paused"
+                    state = Vm::STATE_SUSPENDED
+                when "shutdown"
+                    state = Vm::STATE_STOPPED
+                when "shutoff"
+                    state = Vm::STATE_STOPPED
+                when "crashed"
+                    state = Vm::STATE_STOPPED
+                else
+                    state = Vm::STATE_PENDING
+            end
+        end
+        puts "Updating VM #{domain['name']} to state #{state}"
+        vm.state = state
+        vm.save
+    end
+    def update_host_state(host_info, state)
+        db_host = Host.find(:first, :conditions => [ "hostname = ?", host_info['hostname'] ])
+        if db_host
+            puts "Marking host #{host_info['hostname']} as state #{state}."
+            db_host.state = state
+            db_host.hypervisor_type = host_info['hypervisorType']
+            db_host.arch = host_info['model']
+            db_host.memory = host_info['memory']
+            # XXX: Could be added..
+            #db_host.mhz = host_info['mhz']
+            # XXX: Not even sure what this is.. :)
+            #db_host.lock_version = 2
+            # XXX: This would just be for init..
+            #db_host.is_disabled = 0
+            db_host.save
+        else
+            # FIXME: This would be a newly registered host.  We could put it in the database.
+            puts "Unknown host, probably not registered yet??"
+        end
+    end
+    def object_props(broker, obj)
+        target = obj.klass_key[0]
+        return if target != "com.redhat.libvirt"
+        type = obj.klass_key[1]
+        # I just sync this whole thing because there shouldn't be a lot of contention here..
+        synchronize do
+            values = @cached_objects[obj.object_id.to_s]
+            new_object = false
+            if values == nil
+                values = {}
+                @cached_objects[obj.object_id.to_s] = values
+                # Save the agent and broker bank so that we can tell what objects
+                # are expired when the heartbeat for them stops.
+                values[:broker_bank] = obj.object_id.broker_bank
+                values[:agent_bank] = obj.object_id.agent_bank
+                values[:class_type] = obj.klass_key[1]
+                values[:timed_out] = false
+                puts "New object type #{type}"
+                new_object = true
+            end
+            domain_state_change = false
+            obj.properties.each do |key, newval|
+                if values[key.to_s] != newval
+                    values[key.to_s] = newval
+                    #puts "new value for property #{key} : #{newval}"
+                    if type == "domain" and key.to_s == "state"
+                        domain_state_change = true
+                    end
+                end
+            end
+            if domain_state_change
+                update_domain_state(values)
+            end
+            if new_object
+                if type == "node"
+                    update_host_state(values, Host::STATE_AVAILABLE)
+                end
+            end
+        end
+    end
+    def object_stats(broker, obj)
+        target = obj.klass_key[0]
+        return if target != "com.redhat.libvirt"
+        type = obj.klass_key[1]
+        synchronize do
+            values = @cached_objects[obj.object_id.to_s]
+            if !values
+                values = {}
+                @cached_objects[obj.object_id.to_s] = values
+                values[:broker_bank] = obj.object_id.broker_bank
+                values[:agent_bank] = obj.object_id.agent_bank
+                values[:class_type] = obj.klass_key[1]
+                values[:timed_out] = false
+            end
+            obj.statistics.each do |key, newval|
+                if values[key.to_s] != newval
+                    values[key.to_s] = newval
+                    #puts "new value for statistic #{key} : #{newval}"
+                end
+            end
+        end
+    end
+    def heartbeat(agent, timestamp)
+        return if agent == nil
+        synchronize do
+            bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}"
+            @heartbeats[bank_key] = [agent, timestamp]
+        end
+    end
+    def del_agent(agent)
+        agent_disconnected(agent)
+    end
+    # This method marks objects associated with the given agent as timed out/invalid.  Called either
+    # when the agent heartbeats out, or we get a del_agent callback.
+    def agent_disconnected(agent)
+        @cached_objects.keys.each do |objkey|
+            if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and
+               @cached_objects[objkey][:agent_bank] == agent.agent_bank
+                values = @cached_objects[objkey]
+                puts "Marking object of type #{values[:class_type]} as timed out."
+                if values[:timed_out] == false
+                    if values[:class_type] == 'node'
+                        update_host_state(values, Host::STATE_UNAVAILABLE)
+                    elsif values[:class_type] == 'domain'
+                        update_domain_state(values, Vm::STATE_UNREACHABLE)
+                    end
+                end
+            values[:timed_out] = true
+            end
+        end
+    end
+    # The opposite of above, this is called when an agent is alive and well and makes sure
+    # all of the objects associated with it are marked as valid.
+    def agent_connected(agent)
+        @cached_objects.keys.each do |objkey|
+            if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and
+               @cached_objects[objkey][:agent_bank] == agent.agent_bank
+                values = @cached_objects[objkey]
+                if values[:timed_out] == true
+                    puts "Marking object of type #{values[:class_type]} as in service."
+                    if values[:class_type] == 'node'
+                        update_host_state(values, Host::STATE_AVAILABLE)
+                    elsif values[:class_type] == 'domain'
+                        update_domain_state(values)
+                    end
+                    values[:timed_out] = false
+                end
+            end
+        end
+    end
+    # This cleans up the database on startup so that everything is marked unavailable etc.
+    # Once everything comes online it will all be marked as available/up again.
+    def db_init_cleanup()
+        db_host = Host.find(:all)
+        db_host.each do |host|
+            puts "Marking host #{host.hostname} unavailable"
+            host.state = Host::STATE_UNAVAILABLE
+            host.save
+        end
+        db_vm = Vm.find(:all)
+        db_vm.each do |vm|
+            puts "Marking vm #{vm.description} as stopped."
+            vm.state = Vm::STATE_STOPPED
+            vm.save
+        end
+    end
+    # This is the mainloop that is called into as a separate thread.  This just loops through
+    # and makes sure all the agents are still reporting.  If they aren't they get marked as
+    # down.
+    def check_heartbeats()
+        while true
+            sleep(5)
+            synchronize do
+                # Get seconds from the epoch
+                t = Time.new.to_i
+                @heartbeats.keys.each do | key |
+                    agent, timestamp = @heartbeats[key]
+                    # Heartbeats from qpid are in microseconds, we just need seconds..
+                    s = timestamp / 1000000000
+                    delta = t - s
+                    if delta > 30
+                        # No heartbeat for 30 seconds.. deal with dead/disconnected agent.
+                        agent_disconnected(agent)
+                        @heartbeats.delete(key)
+                    else
+                        agent_connected(agent)
+                    end
+                end
+            end
+        end
+    end
+$logfile = '/var/log/ovirt-server/qpid-db-sync.log'
+def main()
+    do_daemon = true
+    opts = OptionParser.new do |opts|
+        opts.on("-h", "--help", "Print help message") do
+            puts opts
+            exit
+        end
+        opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
+            do_daemon = false
+        end
+    end
+    begin
+        opts.parse!(ARGV)
+    rescue OptionParser::InvalidOption
+        puts opts
+        exit
+    end
+    if do_daemon
+        # XXX: This gets around a problem with paths for the database stuff.
+        # Normally daemonize would chdir to / but the paths for the database
+        # stuff are relative so it breaks it.. It's either this or rearrange
+        # things so the db stuff is included after daemonizing.
+        pwd = Dir.pwd
+        daemonize
+        Dir.chdir(pwd)
+        STDOUT.reopen $logfile, 'a'
+        STDERR.reopen STDOUT
+    end
+    dbsync = DbOmatic.new()
+    s = Qpid::Qmf::Session.new(:console => dbsync, :rcv_events => false)
+    b = s.add_broker("amqp://localhost:5672")
+    dbsync.db_init_cleanup()
+    # Call into mainloop..
+    dbsync.check_heartbeats()
diff --git a/src/host-status/host-status.rb b/src/host-status/host-status.rb
deleted file mode 100755
index cd9e30f..0000000
--- a/src/host-status/host-status.rb
+++ /dev/null
@@ -1,240 +0,0 @@
-# Copyright (C) 2008 Red Hat, Inc.
-# Written by Chris Lalancette <clalance at redhat.com>
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; version 2 of the License.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# GNU General Public License for more details.
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
-# MA  02110-1301, USA.  A copy of the GNU General Public License is
-# also available at http://www.gnu.org/copyleft/gpl.html.
-$: << File.join(File.dirname(__FILE__), "../dutils")
-require 'rubygems'
-require 'libvirt'
-require 'optparse'
-require 'daemons'
-include Daemonize
-$logfile = '/var/log/ovirt-server/host-status.log'
-do_daemon = true
-sleeptime = 20
-opts = OptionParser.new do |opts|
-  opts.on("-h", "--help", "Print help message") do
-    puts opts
-    exit
-  end
-  opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
-    do_daemon = !n
-  end
-  opts.on("-s N", Integer, "--sleep", "Seconds to sleep between iterations (default is 5 seconds)") do |s|
-    sleeptime = s
-  end
-  opts.parse!(ARGV)
-rescue OptionParser::InvalidOption
-  puts opts
-  exit
-if do_daemon
-  daemonize
-  STDOUT.reopen $logfile, 'a'
-# connects to the db in here
-require 'dutils'
-def check_state(vm, dom_info, host)
-  puts 'checking state of vm ' + vm.description
-  case dom_info.state
-  when Libvirt::Domain::NOSTATE, Libvirt::Domain::SHUTDOWN,
-    Libvirt::Domain::SHUTOFF, Libvirt::Domain::CRASHED then
-    if Vm::RUNNING_STATES.include?(vm.state)
-      # OK, the host thinks this VM is off, while the database thinks it
-      # is running; we have to kick taskomatic
-      kick_taskomatic(Vm::STATE_STOPPED, vm, host.id)
-    end
-  when Libvirt::Domain::RUNNING, Libvirt::Domain::BLOCKED then
-    if not Vm::RUNNING_STATES.include?(vm.state)
-      # OK, the host thinks this VM is running, but it's not marked as running
-      # in the database; kick taskomatic
-      kick_taskomatic(Vm::STATE_RUNNING, vm, host.id)
-    end
-  when Libvirt::Domain::PAUSED then
-    if vm.state != Vm::STATE_SUSPENDING and vm.state != Vm::STATE_SUSPENDED
-      kick_taskomatic(Vm::STATE_SUSPENDED, vm, host.id)
-    end
-  else
-    puts "Unknown vm state...skipping"
-  end
-def kick_taskomatic(msg, vm, host_id = nil)
-  print "Kicking taskomatic, state is %s\n" % msg
-  task = VmTask.new
-  task.user = "host-status"
-  task.action = VmTask::ACTION_UPDATE_STATE_VM
-  task.state = Task::STATE_QUEUED
-  task.args = host_id ? [msg,host_id].join(',') : msg
-  task.created_at = Time.now
-  task.time_started = Time.now
-  task.task_target = vm
-  task.save
-def check_status(host)
-  # This is in a new process, we need a new database connection.
-  database_connect
-  begin
-    puts "Connecting to host " + host.hostname
-    conn = Libvirt::open("qemu+tcp://" + host.hostname + "/system")
-  rescue
-    # we couldn't contact the host for whatever reason.  Since we can't get
-    # to this host, we have to mark all vms on it as disconnected or stopped
-    # or such.
-    if host.state != Host::STATE_UNAVAILABLE
-      puts "Updating host state to unavailable: " + host.hostname
-      host.state = Host::STATE_UNAVAILABLE
-      host.save
-    end
-    Vm.find(:all, :conditions => [ "host_id = ?", host.id ]).each do |vm|
-      # Since we can't reach the host on which the vms reside, we mark these
-      # as STATE_UNREACHABLE.  If they come back up we can mark them as
-      # running again, else they'll be stopped.  At least for now the user
-      # will know what's going on.
-      #
-      # If this causes too much trouble in the UI, this can be changed to
-      # STATE_STOPPED for now until it is resolved of another solution is
-      # brought forward.
-      if vm.state != Vm::STATE_UNREACHABLE:
-        kick_taskomatic(Vm::STATE_UNREACHABLE, vm)
-      end
-    end
-    return
-  end
-  if host.state != Host::STATE_AVAILABLE
-    puts "Updating host state to available: " + host.hostname
-    host.state = Host::STATE_AVAILABLE
-    host.save
-  end
-  begin
-    vm_ids = conn.list_domains
-  rescue
-    puts "Failed to request domain list on host " + host.hostname
-    conn.close
-    return
-  end
-  puts "** Host alive, checking vms by id **"
-  # Here we're going through every vm listed through libvirt.  This
-  # really only lets us find ones that are started that shouldn't be.
-  vm_ids.each do |vm_id|
-    puts "VM ID: %d" % [vm_id]
-    begin
-      dom = conn.lookup_domain_by_id(vm_id)
-    rescue
-      puts "Failed to find domain " + vm.description + " with vm_id ", vm_id
-      next
-    end
-    vm_uuid = dom.uuid
-    info = dom.info
-    puts "VM UUID: %s" % [vm_uuid]
-    info = dom.info
-    vm = Vm.find(:first, :conditions => [ "uuid = ?", vm_uuid ])
-    if vm == nil
-      puts "VM Not found in database, must be created by user.  giving up."
-      next
-    end
-    check_state(vm, info, host)
-  end
-  puts "** Checking all vms as appear in the database **"
-  # Now we get a list of all vms that should be on this system and see if
-  # they are all running.
-  Vm.find(:all, :conditions => [ "host_id = ?", host.id ]).each do |vm|
-    begin
-      puts "Looking up domain by uuid #{vm.uuid}"
-      dom = conn.lookup_domain_by_uuid(vm.uuid)
-    rescue
-      # OK.  We couldn't find the UUID that we thought was there.  The only
-      # explanation is that the domain is dead.
-      puts "Failed to find domain " + vm.description + ", marking as dead"
-      kick_taskomatic(Vm::STATE_STOPPED, vm)
-      next
-    end
-    info = dom.info
-    check_state(vm, info, host)
-  end
-  conn.close
-loop do
-  # fork() seems to really mess with our db connection.  Need to have this
-  # in the main connection as well.  I verified it's not leaking connections/fds.
-  database_connect
-  hosts = Host.find(:all)
-  p_count = 0
-  hosts.each do |host|
-    p_count += 1
-    fork do
-      check_status(host)
-      exit 0
-    end
-    # Only allow up to n_hosts / 5 processes running at a time.  If we go above this
-    # Then we wait for one to exit before continuing.  This guarantees it will take
-    # at most 5 timeouts to check all hosts.
-    if p_count > hosts.length / 5
-      Process.wait
-      p_count -= 1
-    end
-  end
-  while p_count > 0
-    Process.wait
-    p_count -= 1
-  end
-  STDOUT.flush
-  sleep sleeptime

More information about the ovirt-devel mailing list