[Ovirt-devel] [PATCH server] Add qpid-db-sync.rb (host-status replacement) to WUI

Ian Main imain at redhat.com
Thu Nov 20 04:14:51 UTC 2008


This patch adds a first cut at qpid-db-sync, a host-status replacement.
I have tested this a fair bit and it seems to do all that host-status does
and do it much quicker.  Note that it is not yet enabled.  I will send
another patch to enable it after some review.

There is a lot more synchronization yet to be done by this daemon.  Eventually
this will be responsible for updating all the state information to the database
including pools, volumes, some statistics etc.  This will be done in conjunction
with taskomatic changes.

Signed-off-by: Ian Main <imain at redhat.com>
---
 src/qpid-db-sync/qpid-db-sync.rb |  316 ++++++++++++++++++++++++++++++++++++++
 1 files changed, 316 insertions(+), 0 deletions(-)
 create mode 100755 src/qpid-db-sync/qpid-db-sync.rb

diff --git a/src/qpid-db-sync/qpid-db-sync.rb b/src/qpid-db-sync/qpid-db-sync.rb
new file mode 100755
index 0000000..0e1364e
--- /dev/null
+++ b/src/qpid-db-sync/qpid-db-sync.rb
@@ -0,0 +1,316 @@
+#!/usr/bin/ruby
+
+$: << File.join(File.dirname(__FILE__), "../dutils")
+
+require "rubygems"
+require "qpid"
+require 'monitor'
+require 'dutils'
+require 'daemons'
+require 'optparse'
+include Daemonize
+
+
+class QpidDbSync < Qpid::Qmf::Console
+
+    # Use monitor mixin for mutual exclusion around checks to heartbeats
+    # and updates to objects/heartbeats.
+
+    include MonitorMixin
+
+    def initialize()
+        super()
+        @cached_objects = {}
+        @heartbeats = {}
+
+        database_connect
+    end
+
+    def update_domain_state(domain, state_override = nil)
+        vm = Vm.find(:first, :conditions => [ "uuid = ?", domain['uuid'] ])
+        if vm == nil
+          puts "VM Not found in database, must be created by user.  giving up."
+          next
+        end
+
+        if state_override != nil
+            state = state_override
+        else
+            # FIXME: Some of these translations don't seem right.  Shouldn't we be using
+            # the libvirt states throughout ovirt?
+            case domain['state']
+                when "nostate"
+                    state = Vm::STATE_PENDING
+                when "running"
+                    state = Vm::STATE_RUNNING
+                when "blocked"
+                    state = Vm::STATE_SUSPENDED #?
+                when "paused"
+                    state = Vm::STATE_SUSPENDED
+                when "shutdown"
+                    state = Vm::STATE_STOPPED
+                when "shutoff"
+                    state = Vm::STATE_STOPPED
+                when "crashed"
+                    state = Vm::STATE_STOPPED
+                else
+                    state = Vm::STATE_PENDING
+            end
+        end
+
+        puts "Updating VM #{domain['name']} to state #{state}"
+        vm.state = state
+        vm.save
+    end
+
+    def update_host_state(host_info, state)
+        db_host = Host.find(:first, :conditions => [ "hostname = ?", host_info['hostname'] ])
+        if db_host
+            puts "Marking host #{host_info['hostname']} as state #{state}."
+            db_host.state = state
+            db_host.hypervisor_type = host_info['hypervisorType']
+            db_host.arch = host_info['model']
+            db_host.memory = host_info['memory']
+            # XXX: Could be added..
+            #db_host.mhz = host_info['mhz']
+            # XXX: Not even sure what this is.. :)
+            #db_host.lock_version = 2
+            # XXX: This would just be for init..
+            #db_host.is_disabled = 0
+            db_host.save
+        else
+            # FIXME: This would be a newly registered host.  We could put it in the database.
+            puts "Unknown host, probably not registered yet??"
+        end
+    end
+
+    def object_props(broker, obj)
+        target = obj.klass_key[0]
+        return if target != "com.redhat.libvirt"
+
+        type = obj.klass_key[1]
+
+        # I just sync this whole thing because there shouldn't be a lot of contention here..
+        synchronize do
+            values = @cached_objects[obj.object_id.to_s]
+
+            new_object = false
+
+            if values == nil
+                values = {}
+                @cached_objects[obj.object_id.to_s] = values
+
+                # Save the agent and broker bank so that we can tell what objects
+                # are expired when the heartbeat for them stops.
+                values[:broker_bank] = obj.object_id.broker_bank
+                values[:agent_bank] = obj.object_id.agent_bank
+                values[:class_type] = obj.klass_key[1]
+                values[:timed_out] = false
+                puts "New object type #{type}"
+
+                new_object = true
+            end
+
+            domain_state_change = false
+
+            obj.properties.each do |key, newval|
+                if values[key.to_s] != newval
+                    values[key.to_s] = newval
+                    #puts "new value for property #{key} : #{newval}"
+                    if type == "domain" and key.to_s == "state"
+                        domain_state_change = true
+                    end
+                end
+            end
+
+            if domain_state_change
+                update_domain_state(values)
+            end
+
+            if new_object
+                if type == "node"
+                    update_host_state(values, Host::STATE_AVAILABLE)
+                end
+            end
+        end
+    end
+
+    def object_stats(broker, obj)
+        target = obj.klass_key[0]
+        return if target != "com.redhat.libvirt"
+        type = obj.klass_key[1]
+
+        synchronize do
+            values = @cached_objects[obj.object_id.to_s]
+            if !values
+                values = {}
+                @cached_objects[obj.object_id.to_s] = values
+
+                values[:broker_bank] = obj.object_id.broker_bank
+                values[:agent_bank] = obj.object_id.agent_bank
+                values[:class_type] = obj.klass_key[1]
+                values[:timed_out] = false
+            end
+            obj.statistics.each do |key, newval|
+                if values[key.to_s] != newval
+                    values[key.to_s] = newval
+                    #puts "new value for statistic #{key} : #{newval}"
+                end
+            end
+        end
+    end
+
+    def heartbeat(agent, timestamp)
+        return if agent == nil
+        synchronize do
+            bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}"
+            @heartbeats[bank_key] = [agent, timestamp]
+        end
+    end
+
+
+    def del_agent(agent)
+        agent_disconnected(agent)
+    end
+
+    # This method marks objects associated with the given agent as timed out/invalid.  Called either
+    # when the agent heartbeats out, or we get a del_agent callback.
+    def agent_disconnected(agent)
+        @cached_objects.keys.each do |objkey|
+            if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and
+               @cached_objects[objkey][:agent_bank] == agent.agent_bank
+
+                values = @cached_objects[objkey]
+                puts "Marking object of type #{values[:class_type]} as timed out."
+                if values[:timed_out] == false
+                    if values[:class_type] == 'node'
+                        update_host_state(values, Host::STATE_UNAVAILABLE)
+                    elsif values[:class_type] == 'domain'
+                        update_domain_state(values, Vm::STATE_UNREACHABLE)
+                    end
+                end
+            values[:timed_out] = true
+            end
+        end
+    end
+
+    # The opposite of above, this is called when an agent is alive and well and makes sure
+    # all of the objects associated with it are marked as valid.
+    def agent_connected(agent)
+
+        @cached_objects.keys.each do |objkey|
+            if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and
+               @cached_objects[objkey][:agent_bank] == agent.agent_bank
+
+                values = @cached_objects[objkey]
+                if values[:timed_out] == true
+                    puts "Marking object of type #{values[:class_type]} as in service."
+                    if values[:class_type] == 'node'
+                        update_host_state(values, Host::STATE_AVAILABLE)
+                    elsif values[:class_type] == 'domain'
+                        update_domain_state(values)
+                    end
+                    values[:timed_out] = false
+                end
+            end
+        end
+    end
+
+    # This cleans up the database on startup so that everything is marked unavailable etc.
+    # Once everything comes online it will all be marked as available/up again.
+    def db_init_cleanup()
+        db_host = Host.find(:all)
+        db_host.each do |host|
+            puts "Marking host #{host.hostname} unavailable"
+            host.state = Host::STATE_UNAVAILABLE
+            host.save
+        end
+
+        db_vm = Vm.find(:all)
+        db_vm.each do |vm|
+            puts "Marking vm #{vm.description} as stopped."
+            vm.state = Vm::STATE_STOPPED
+            vm.save
+        end
+    end
+
+
+    # This is the mainloop that is called into as a separate thread.  This just loops through
+    # and makes sure all the agents are still reporting.  If they aren't they get marked as
+    # down.
+    def check_heartbeats()
+        while true
+            sleep(5)
+
+            synchronize do
+                # Get seconds from the epoch
+                t = Time.new.to_i
+
+                @heartbeats.keys.each do | key |
+                    agent, timestamp = @heartbeats[key]
+
+                    # Heartbeats from qpid are in microseconds, we just need seconds..
+                    s = timestamp / 1000000000
+                    delta = t - s
+
+                    if delta > 30
+                        # No heartbeat for 30 seconds.. deal with dead/disconnected agent.
+                        agent_disconnected(agent)
+
+                        @heartbeats.delete(key)
+                    else
+                        agent_connected(agent)
+                    end
+                end
+            end
+        end
+    end
+end
+
+
+$logfile = '/var/log/ovirt-server/qpid-db-sync.log'
+
+def main()
+
+    do_daemon = true
+
+    opts = OptionParser.new do |opts|
+        opts.on("-h", "--help", "Print help message") do
+            puts opts
+            exit
+        end
+        opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
+            do_daemon = false
+        end
+    end
+    begin
+        opts.parse!(ARGV)
+    rescue OptionParser::InvalidOption
+        puts opts
+        exit
+    end
+
+    if do_daemon
+        # XXX: This gets around a problem with paths for the database stuff.
+        # Normally daemonize would chdir to / but the paths for the database
+        # stuff are relative so it breaks it.. It's either this or rearrange
+        # things so the db stuff is included after daemonizing.
+        pwd = Dir.pwd
+        daemonize
+        Dir.chdir(pwd)
+        STDOUT.reopen $logfile, 'a'
+        STDERR.reopen STDOUT
+    end
+
+    dbsync = QpidDbSync.new()
+    s = Qpid::Qmf::Session.new(:console => dbsync, :rcv_events => false)
+    b = s.add_broker("amqp://localhost:5672")
+
+    dbsync.db_init_cleanup()
+
+    # Call into mainloop..
+    dbsync.check_heartbeats()
+end
+
+main()
+
-- 
1.5.6.5




More information about the ovirt-devel mailing list