[Ovirt-devel] [PATCH server] Update daemons to use new QMF.
Ian Main
imain at redhat.com
Wed Nov 4 19:59:37 UTC 2009
This patch updates dbomatic, taskomatic and host-register to use the
new C++ wrapped ruby QMF bindings. It also fixes a couple of bugs
along the way including the 0 cpu bug for host-register. This is a
compilation of work done by myself and Arjun Roy.
Signed-off-by: Ian Main <imain at redhat.com>
---
src/db-omatic/db_omatic.rb | 111 ++++++-------
src/host-browser/host-register.rb | 337 ++++++++++++++++++++-----------------
src/libvirt-list.rb | 31 +++--
src/matahari-list.rb | 33 +++--
src/task-omatic/task_storage.rb | 10 +-
src/task-omatic/taskomatic.rb | 81 +++++----
6 files changed, 323 insertions(+), 280 deletions(-)
diff --git a/src/db-omatic/db_omatic.rb b/src/db-omatic/db_omatic.rb
index c400097..686ad71 100755
--- a/src/db-omatic/db_omatic.rb
+++ b/src/db-omatic/db_omatic.rb
@@ -3,18 +3,18 @@
$: << File.join(File.dirname(__FILE__), "../dutils")
$: << File.join(File.dirname(__FILE__), ".")
-require "rubygems"
-require "qpid"
+require 'rubygems'
require 'monitor'
require 'dutils'
require 'daemons'
require 'optparse'
require 'logger'
require 'vnc'
+require 'qmf'
+require 'socket'
include Daemonize
-
# This sad and pathetic readjustment to ruby logger class is
# required to fix the formatting because rails does the same
# thing but overrides it to just the message.
@@ -29,12 +29,9 @@ end
$logfile = '/var/log/ovirt-server/db-omatic.log'
-
-class DbOmatic < Qpid::Qmf::Console
-
+class DbOmatic < Qmf::ConsoleHandler
# Use monitor mixin for mutual exclusion around checks to heartbeats
# and updates to objects/heartbeats.
-
include MonitorMixin
def initialize()
@@ -77,7 +74,6 @@ class DbOmatic < Qpid::Qmf::Console
begin
ensure_credentials
-
database_connect
server, port = nil
@@ -91,8 +87,17 @@ class DbOmatic < Qpid::Qmf::Console
end
@logger.info "Connecting to amqp://#{server}:#{port}"
- @session = Qpid::Qmf::Session.new(:console => self, :manage_connections => true)
- @broker = @session.add_broker("amqp://#{server}:#{port}", :mechanism => 'GSSAPI')
+ @settings = Qmf::ConnectionSettings.new
+ @settings.host = server
+ @settings.port = port
+# @settings.mechanism = 'GSSAPI'
+# @settings.service = 'qpidd'
+ @settings.sendUserId = false
+
+ @connection = Qmf::Connection.new(@settings)
+ @qmfc = Qmf::Console.new(self)
+ @broker = @qmfc.add_connection(@connection)
+ @broker.wait_for_stable
db_init_cleanup
rescue Exception => ex
@@ -101,10 +106,8 @@ class DbOmatic < Qpid::Qmf::Console
end
end
-
def ensure_credentials()
get_credentials('qpidd')
-
Thread.new do
while true do
sleep(3600)
@@ -195,7 +198,7 @@ class DbOmatic < Qpid::Qmf::Console
if state == Vm::STATE_STOPPED
@logger.info "VM has moved to stopped, clearing VM attributes."
- qmf_vm = @session.object(:class => "domain", 'uuid' => vm.uuid)
+ qmf_vm = @qmfc.object(Qmf::Query.new(:class => "domain", 'uuid' => vm.uuid))
if qmf_vm
@logger.info "Deleting VM #{vm.description}."
result = qmf_vm.undefine
@@ -207,9 +210,9 @@ class DbOmatic < Qpid::Qmf::Console
# If we are running, update the node that the domain is running on
elsif state == Vm::STATE_RUNNING
@logger.info "VM is running, determine the node it is running on"
- qmf_vm = @session.object(:class => "domain", 'uuid' => vm.uuid)
+ qmf_vm = @qmfc.object(Qmf::Query.new(:class => "domain", 'uuid' => vm.uuid))
if qmf_vm
- qmf_host = @session.object(:class => "node", :object_id => qmf_vm.node)
+ qmf_host = @qmfc.object(Qmf::Query.new(:class => "node", :object_id => qmf_vm.node))
db_host = Host.find(:first, :conditions => ['hostname = ?', qmf_host.hostname])
@logger.info "VM #{vm.description} is running on node #{db_host.hostname}"
vm.host_id = db_host.id
@@ -273,7 +276,7 @@ class DbOmatic < Qpid::Qmf::Console
# Double check to make sure this host is still up.
begin
- qmf_host = @session.object(:class => 'node', 'hostname' => host_info['hostname'])
+ qmf_host = @qmfc.objects(Qmf::Query.new(:class => "node", 'hostname' => host_info['hostname']))
if !qmf_host
@logger.info "Host #{host_info['hostname']} is not up after waiting 20 seconds, skipping dead VM check."
else
@@ -301,16 +304,23 @@ class DbOmatic < Qpid::Qmf::Console
end
end
- def object_props(broker, obj)
- target = obj.schema.klass_key.package
+ def object_update(obj, hasProps, hasStats)
+ target = obj.object_class.package_name
+ type = obj.object_class.class_name
return if target != "com.redhat.libvirt"
- type = obj.schema.klass_key.klass_name
+ if hasProps
+ update_props(obj, type)
+ end
+ if hasStats
+ update_stats(obj, type)
+ end
+ end
+ def update_props(obj, type)
# I just sync this whole thing because there shouldn't be a lot of contention here..
synchronize do
values = @cached_objects[obj.object_id.to_s]
-
new_object = false
if values == nil
@@ -318,8 +328,7 @@ class DbOmatic < Qpid::Qmf::Console
# Save the agent and broker bank so that we can tell what objects
# are expired when the heartbeat for them stops.
- values[:broker_bank] = obj.object_id.broker_bank
- values[:agent_bank] = obj.object_id.agent_bank
+ values[:agent_key] = obj.object_id.agent_key
values[:obj_key] = obj.object_id.to_s
values[:class_type] = type
values[:timed_out] = false
@@ -370,53 +379,48 @@ class DbOmatic < Qpid::Qmf::Console
end
end
- def object_stats(broker, obj)
- target = obj.schema.klass_key.package
- return if target != "com.redhat.libvirt"
- type = obj.schema.klass_key.klass_name
-
+ def update_stats(obj, type)
synchronize do
values = @cached_objects[obj.object_id.to_s]
- if !values
+ if values == nil
values = {}
@cached_objects[obj.object_id.to_s] = values
-
- values[:broker_bank] = obj.object_id.broker_bank
- values[:agent_bank] = obj.object_id.agent_bank
+ values[:agent_key] = obj.object_id.agent_key
values[:class_type] = type
values[:timed_out] = false
values[:synced] = false
end
+
obj.statistics.each do |key, newval|
if values[key.to_s] != newval
values[key.to_s] = newval
- #puts "new value for statistic #{key} : #{newval}"
end
end
end
end
- def heartbeat(agent, timestamp)
- puts "heartbeat from agent #{agent}"
+ def agent_heartbeat(agent, timestamp)
+ puts "heartbeat from agent #{agent.key}"
return if agent == nil
synchronize do
- bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}"
- @heartbeats[bank_key] = [agent, timestamp]
+ @heartbeats[agent.key] = [agent, timestamp]
end
end
+ def agent_added(agent)
+ @logger.info("Agent connected: #{agent.key}")
+ end
- def del_agent(agent)
+ def agent_deleted(agent)
agent_disconnected(agent)
end
# This method marks objects associated with the given agent as timed out/invalid. Called either
# when the agent heartbeats out, or we get a del_agent callback.
def agent_disconnected(agent)
+ puts "agent_disconnected: #{agent.key}"
@cached_objects.keys.each do |objkey|
- if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and
- @cached_objects[objkey][:agent_bank] == agent.agent_bank
-
+ if @cached_objects[objkey][:agent_key] == agent.key
values = @cached_objects[objkey]
if values[:timed_out] == false
@logger.info "Marking object of type #{values[:class_type]} with key #{objkey} as timed out."
@@ -430,8 +434,7 @@ class DbOmatic < Qpid::Qmf::Console
values[:timed_out] = true
end
end
- bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}"
- @heartbeats.delete(bank_key)
+ @heartbeats.delete(agent.key)
end
# The opposite of above, this is called when an agent is alive and well and makes sure
@@ -439,9 +442,7 @@ class DbOmatic < Qpid::Qmf::Console
def agent_connected(agent)
@cached_objects.keys.each do |objkey|
- if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and
- @cached_objects[objkey][:agent_bank] == agent.agent_bank
-
+ if @cached_objects[objkey][:agent_key] == agent.key
values = @cached_objects[objkey]
if values[:timed_out] == true or values[:synced] == false
if values[:class_type] == 'node'
@@ -482,7 +483,7 @@ class DbOmatic < Qpid::Qmf::Console
# them to stopped. VMs that exist as QMF objects will get set appropriately when the objects
# appear on the bus.
begin
- qmf_vm = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+ qmf_vm = @qmfc.object(Qmf::Query.new(:class => "domain", 'uuid' => db_vm.uuid))
if qmf_vm == nil
set_stopped = true
end
@@ -498,15 +499,6 @@ class DbOmatic < Qpid::Qmf::Console
end
end
- def broker_connected(broker)
- @logger.info "Connected to broker."
- end
-
- def broker_disconnected(broker)
- @logger.error "Broker disconnected."
- end
-
-
# This is the mainloop that is called into as a separate thread. This just loops through
# and makes sure all the agents are still reporting. If they aren't they get marked as
# down.
@@ -527,7 +519,7 @@ class DbOmatic < Qpid::Qmf::Console
s = timestamp / 1000000000
delta = t - s
- puts "Checking time delta for agent #{agent} - #{delta}"
+ puts "Checking time delta for agent #{agent.key} - #{delta}"
if delta > 30
# No heartbeat for 30 seconds.. deal with dead/disconnected agent.
@@ -545,15 +537,10 @@ class DbOmatic < Qpid::Qmf::Console
end
end
-
def main()
-
+ Thread.abort_on_exception = true
dbsync = DbOmatic.new()
-
- # Call into mainloop..
dbsync.check_heartbeats()
-
end
main()
-
diff --git a/src/host-browser/host-register.rb b/src/host-browser/host-register.rb
index 06d8553..e57b077 100755
--- a/src/host-browser/host-register.rb
+++ b/src/host-browser/host-register.rb
@@ -4,12 +4,13 @@ $: << File.join(File.dirname(__FILE__), "../dutils")
$: << File.join(File.dirname(__FILE__), ".")
require 'rubygems'
-require 'qpid'
require 'monitor'
require 'dutils'
require 'daemons'
require 'optparse'
require 'logger'
+require 'qmf'
+require 'socket'
include Daemonize
@@ -27,13 +28,17 @@ end
$logfile = '/var/log/ovirt-server/host-register.log'
-class HostRegister < Qpid::Qmf::Console
+class HostRegister < Qmf::ConsoleHandler
# Use monitor mixin for mutual exclusion around checks to heartbeats
# and updates to objects/heartbeats.
include MonitorMixin
+ # def initialize: Takes no parameters
+ # On initialize, we get a connection to the database.
+ # We then query the name and address of the qpidd server
+ # using dnsmasq records, and connect to qpidd.
def initialize()
super()
@cached_hosts = {}
@@ -78,7 +83,6 @@ class HostRegister < Qpid::Qmf::Console
begin
ensure_credentials
-
database_connect
server, port = nil
@@ -92,8 +96,17 @@ class HostRegister < Qpid::Qmf::Console
end
@logger.info "Connecting to amqp://#{server}:#{port}"
- @session = Qpid::Qmf::Session.new(:console => self, :manage_connections => true)
- @broker = @session.add_broker("amqp://#{server}:#{port}", :mechanism => 'GSSAPI')
+ @settings = Qmf::ConnectionSettings.new
+ @settings.host = server
+ @settings.port = port
+ # @settings.mechanism = 'GSSAPI'
+ # @settings.service = 'qpidd'
+ @settings.sendUserId = false
+
+ @connection = Qmf::Connection.new(@settings)
+ @qmfc = Qmf::Console.new(self)
+ @broker = @qmfc.add_connection(@connection)
+ @broker.wait_for_stable
rescue Exception => ex
@logger.error "Error in hostregister: #{ex}"
@@ -101,6 +114,7 @@ class HostRegister < Qpid::Qmf::Console
end
end
+ ###### Utility Methods ######
def debugputs(msg)
puts msg if @debug == true and @do_daemon == false
end
@@ -116,20 +130,66 @@ class HostRegister < Qpid::Qmf::Console
end
end
- def broker_connected(broker)
- @logger.info 'Connected to broker.'
+ ###### QMF Callbacks ######
+ def agent_heartbeat(agent, timestamp)
+ return if agent == nil
+ synchronize do
+ bank_key = "#{agent.agent_bank}.#{agent.broker_bank}"
+ @heartbeats[bank_key] = [agent, timestamp]
+ end
end
- def broker_disconnected(broker)
- @logger.error 'Broker disconnected.'
+ def agent_added(agent)
+ agent_bank = agent.agent_bank
+ broker_bank = agent.broker_bank
+ key = "#{agent_bank}.#{broker_bank}"
+
+ puts "AGENT ADDED: #{key}"
+ debugputs "Agent #{agent_bank}.#{broker_bank} connected!"
+ agent_connected(agent_bank, broker_bank)
+
+ host_list = @qmfc.objects(:package => 'com.redhat.matahari', :class => 'host')
+ puts "host_list length is #{host_list.length}"
+ host_list.each do |host|
+ if host.object_id.agent_bank == agent_bank
+ # Grab the cpus and nics associated before we take any locks
+ cpu_info = @qmfc.objects(:package => 'com.redhat.matahari', :class => 'cpu', 'host' => host.object_id)
+ nic_info = @qmfc.objects(:package => 'com.redhat.matahari', :class => 'nic', 'host' => host.object_id)
+
+ # And pass it on to the real handler
+ update_host(host, cpu_info, nic_info)
+ end
+ end
end
- def agent_disconnected(agent)
+ def agent_deleted(agent)
+ agent_bank = agent.agent_bank
+ broker_bank = agent.broker_bank
+ key = "#{agent_bank}.#{broker_bank}"
+
+ debugputs "Agent #{key} disconnected!"
+ @heartbeats.delete(key)
+ agent_disconnected(agent_bank, broker_bank)
+ end
+
+ def object_update(obj, hasProps, hasStats)
+ target = obj.object_class.package_name
+ type = obj.object_class.class_name
+ return if target != 'com.redhat.matahari' or type != 'host' or hasProps == false
+
+ # Fix a race where the properties of an object are published by a reconnecting
+ # host (thus marking it active) right before the heartbeat timer considers it dead
+ # (and marks it inactive)
+ @heartbeats.delete("#{obj.object_id.agent_bank}.#{obj.object_id.broker_bank}")
+ end # def object_props
+
+ ###### Handlers for QMF Callbacks ######
+ def agent_disconnected(agent_bank, broker_bank)
synchronize do
- debugputs "Marking objects for agent #{agent.broker.broker_bank}.#{agent.agent_bank} inactive"
+ debugputs "Marking objects for agent #{broker_bank}.#{agent_bank} inactive"
@cached_hosts.keys.each do |objkey|
- if @cached_hosts[objkey][:broker_bank] == agent.broker.broker_bank and
- @cached_hosts[objkey][:agent_bank] == agent.agent_bank
+ if @cached_hosts[objkey][:broker_bank] == broker_bank and
+ @cached_hosts[objkey][:agent_bank] == agent_bank
cached_host = @cached_hosts[objkey]
cached_host[:active] = false
@@ -139,12 +199,12 @@ class HostRegister < Qpid::Qmf::Console
end # synchronize do
end
- def agent_connected(agent)
+ def agent_connected(agent_bank, broker_bank)
synchronize do
- debugputs "Marking objects for agent #{agent.broker.broker_bank}.#{agent.agent_bank} active"
+ debugputs "Marking objects for agent #{broker_bank}.#{agent_bank} active"
@cached_hosts.keys.each do |objkey|
- if @cached_hosts[objkey][:broker_bank] == agent.broker.broker_bank and
- @cached_hosts[objkey][:agent_bank] == agent.agent_bank
+ if @cached_hosts[objkey][:broker_bank] == broker_bank and
+ @cached_hosts[objkey][:agent_bank] == agent_bank
cached_host = @cached_hosts[objkey]
cached_host[:active] = true
@@ -154,123 +214,10 @@ class HostRegister < Qpid::Qmf::Console
end # synchronize do
end
- def update_cpus(host_qmf, host_db, cpu_info)
-
- @logger.info "Updating CPU info for host #{host_qmf.hostname}"
- debugputs "Broker reports #{cpu_info.length} cpus for host #{host_qmf.hostname}"
-
- # delete an existing CPUs and create new ones based on the data
- @logger.info "Deleting any existing CPUs for host #{host_qmf.hostname}"
- Cpu.delete_all(['host_id = ?', host_db.id])
-
- @logger.info "Saving new CPU records for host #{host_qmf.hostname}"
- cpu_info.each do |cpu|
- flags = (cpu.flags.length > 255) ? "#{cpu.flags[0..251]}..." : cpu.flags
- detail = Cpu.new(
- 'cpu_number' => cpu.cpunum,
- 'core_number' => cpu.corenum,
- 'number_of_cores' => cpu.numcores,
- 'vendor' => cpu.vendor,
- 'model' => cpu.model.to_s,
- 'family' => cpu.family.to_s,
- 'cpuid_level' => cpu.cpuid_lvl,
- 'speed' => cpu.speed.to_s,
- 'cache' => cpu.cache.to_s,
- 'flags' => flags)
-
- host_db.cpus << detail
-
- debugputs "Added new CPU for #{host_qmf.hostname}: "
- debugputs "CPU # : #{cpu.cpunum}"
- debugputs "Core # : #{cpu.corenum}"
- debugputs "Total Cores : #{cpu.numcores}"
- debugputs "Vendor : #{cpu.vendor}"
- debugputs "Model : #{cpu.model}"
- debugputs "Family : #{cpu.family}"
- debugputs "Cpuid_lvl : #{cpu.cpuid_lvl}"
- debugputs "Speed : #{cpu.speed}"
- debugputs "Cache : #{cpu.cache}"
- debugputs "Flags : #{flags}"
- end
-
- @logger.info "Saved #{cpu_info.length} cpus for #{host_qmf.hostname}"
- end
-
- def update_nics(host_qmf, host_db, nic_info)
-
- # Update the NIC details for this host:
- # -if the NIC exists, then update the IP address
- # -if the NIC does not exist, create it
- # -any nic not in this list is deleted
-
- @logger.info "Updating NIC records for host #{host_qmf.hostname}"
- debugputs "Broker reports #{nic_info.length} NICs for host"
-
- nics = Array.new
- nics_to_delete = Array.new
-
- host_db.nics.each do |nic|
- found = false
-
- nic_info.each do |detail|
- # if we have a match, then update the database and remove
- # the received data to avoid creating a dupe later
- @logger.info "Searching for existing record for: #{detail.macaddr.upcase} in host #{host_qmf.hostname}"
- if detail.macaddr.upcase == nic.mac
- @logger.info "Updating details for: #{detail.interface} [#{nic.mac}]}"
- nic.bandwidth = detail.bandwidth
- nic.interface_name = detail.interface
- nic.save!
- found = true
- nic_info.delete(detail)
- end
- end
-
- # if the record wasn't found, then remove it from the database
- unless found
- @logger.info "Marking NIC for removal: #{nic.interface_name} [#{nic.mac}]"
- nics_to_delete << nic
- end
- end
-
- debugputs "Deleting #{nics_to_delete.length} NICs that are no longer part of host #{host_qmf.hostname}"
- nics_to_delete.each do |nic|
- @logger.info "Removing NIC: #{nic.interface_name} [#{nic.mac}]"
- host_db.nics.delete(nic)
- end
-
- # iterate over any nics left and create new records for them.
- debugputs "Adding new records for #{nic_info.length} NICs to host #{host_qmf.hostname}"
- nic_info.each do |nic|
- detail = Nic.new(
- 'mac' => nic.macaddr.upcase,
- 'bandwidth' => nic.bandwidth,
- 'interface_name' => nic.interface,
- 'usage_type' => 1)
-
- host_db.nics << detail
-
- @logger.info "Added NIC #{nic.interface} with MAC #{nic.macaddr} to host #{host_qmf.hostname}"
- end
- end
-
- def object_props(broker, obj)
- target = obj.schema.klass_key.package
- type = obj.schema.klass_key.klass_name
- return if target != 'com.redhat.matahari' or type != 'host'
-
- # Fix a race where the properties of an object are published by a reconnecting
- # host (thus marking it active) right before the heartbeat timer considers it dead
- # (and marks it inactive)
- @heartbeats.delete("#{obj.object_id.agent_bank}.#{obj.object_id.broker_bank}")
-
+ def update_host(obj, cpu_info, nic_info)
already_cache = false
already_in_db = false
- # Grab the cpus and nics associated before we take any locks
- cpu_info = @session.objects(:class => 'cpu', 'host' => obj.object_id)
- nic_info = @session.objects(:class => 'nic', 'host' => obj.object_id)
-
synchronize do
cached_host = @cached_hosts[obj.object_id.to_s]
host = Host.find(:first, :conditions => ['hostname = ?', obj.hostname])
@@ -318,7 +265,7 @@ class HostRegister < Qpid::Qmf::Console
'memory' => obj.memory,
'is_disabled' => 0,
'hardware_pool' => HardwarePool.get_default_pool,
- # Let host-status mark it available when it
+ # Let db-omatic mark it available when it
# successfully connects to it via libvirt.
'state' => Host::STATE_UNAVAILABLE)
@@ -330,10 +277,11 @@ class HostRegister < Qpid::Qmf::Console
debugputs "memory: #{obj.memory}"
rescue Exception => error
- @logger.error "Error while creating record: #{error.message}"
- # We haven't added the host to the db, and it isn't cached, so we just
- # return without having done anything. To retry, the host will have to
- # restart its agent.
+ @logger.error "Error when creating record: #{error.message}"
+ @logger.error "Restart matahari on host #{obj.hostname}"
+ # We haven't added the host to the db, and it isn't cached,
+ # so we just return without having done anything. To retry,
+ # the host will have to restart its agent.
return
end
else
@@ -394,27 +342,106 @@ class HostRegister < Qpid::Qmf::Console
cached_host['hypervisor'] = obj.hypervisor
cached_host['arch'] = obj.arch
end # synchronize do
- end # def object_props
+ end # end update_host
- def heartbeat(agent, timestamp)
- return if agent == nil
- synchronize do
- bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}"
- @heartbeats[bank_key] = [agent, timestamp]
+ def update_cpus(host_qmf, host_db, cpu_info)
+
+ @logger.info "Updating CPU info for host #{host_qmf.hostname}"
+ debugputs "Broker reports #{cpu_info.length} cpus for host #{host_qmf.hostname}"
+
+ # delete an existing CPUs and create new ones based on the data
+ @logger.info "Deleting any existing CPUs for host #{host_qmf.hostname}"
+ Cpu.delete_all(['host_id = ?', host_db.id])
+
+ @logger.info "Saving new CPU records for host #{host_qmf.hostname}"
+ cpu_info.each do |cpu|
+ flags = (cpu.flags.length > 255) ? "#{cpu.flags[0..251]}..." : cpu.flags
+ detail = Cpu.new(
+ 'cpu_number' => cpu.cpunum,
+ 'core_number' => cpu.corenum,
+ 'number_of_cores' => cpu.numcores,
+ 'vendor' => cpu.vendor,
+ 'model' => cpu.model.to_s,
+ 'family' => cpu.family.to_s,
+ 'cpuid_level' => cpu.cpuid_lvl,
+ 'speed' => cpu.speed.to_s,
+ 'cache' => cpu.cache.to_s,
+ 'flags' => flags)
+
+ host_db.cpus << detail
+
+ debugputs "Added new CPU for #{host_qmf.hostname}: "
+ debugputs "CPU # : #{cpu.cpunum}"
+ debugputs "Core # : #{cpu.corenum}"
+ debugputs "Total Cores : #{cpu.numcores}"
+ debugputs "Vendor : #{cpu.vendor}"
+ debugputs "Model : #{cpu.model}"
+ debugputs "Family : #{cpu.family}"
+ debugputs "Cpuid_lvl : #{cpu.cpuid_lvl}"
+ debugputs "Speed : #{cpu.speed}"
+ debugputs "Cache : #{cpu.cache}"
+ debugputs "Flags : #{flags}"
end
- end
- def new_agent(agent)
- key = "#{agent.agent_bank}.#{agent.broker.broker_bank}"
- debugputs "Agent #{key} connected!"
- agent_connected(agent)
+ @logger.info "Saved #{cpu_info.length} cpus for #{host_qmf.hostname}"
end
- def del_agent(agent)
- key = "#{agent.agent_bank}.#{agent.broker.broker_bank}"
- debugputs "Agent #{key} disconnected!"
- @heartbeats.delete(key)
- agent_disconnected(agent)
+ def update_nics(host_qmf, host_db, nic_info)
+
+ # Update the NIC details for this host:
+ # -if the NIC exists, then update the IP address
+ # -if the NIC does not exist, create it
+ # -any nic not in this list is deleted
+
+ @logger.info "Updating NIC records for host #{host_qmf.hostname}"
+ debugputs "Broker reports #{nic_info.length} NICs for host"
+
+ nics = Array.new
+ nics_to_delete = Array.new
+
+ host_db.nics.each do |nic|
+ found = false
+
+ nic_info.each do |detail|
+ # if we have a match, then update the database and remove
+ # the received data to avoid creating a dupe later
+ @logger.info "Searching for existing record for: #{detail.macaddr.upcase} in host #{host_qmf.hostname}"
+ if detail.macaddr.upcase == nic.mac
+ @logger.info "Updating details for: #{detail.interface} [#{nic.mac}]}"
+ nic.bandwidth = detail.bandwidth
+ nic.interface_name = detail.interface
+ nic.save!
+ found = true
+ nic_info.delete(detail)
+ end
+ end
+
+ # if the record wasn't found, then remove it from the database
+ unless found
+ @logger.info "Marking NIC for removal: #{nic.interface_name} [#{nic.mac}]"
+ nics_to_delete << nic
+ end
+ end
+
+ debugputs "Deleting #{nics_to_delete.length} NICs that are no longer part of host #{host_qmf.hostname}"
+ nics_to_delete.each do |nic|
+ @logger.info "Removing NIC: #{nic.interface_name} [#{nic.mac}]"
+ host_db.nics.delete(nic)
+ end
+
+ # iterate over any nics left and create new records for them.
+ debugputs "Adding new records for #{nic_info.length} NICs to host #{host_qmf.hostname}"
+ nic_info.each do |nic|
+ detail = Nic.new(
+ 'mac' => nic.macaddr.upcase,
+ 'bandwidth' => nic.bandwidth,
+ 'interface_name' => nic.interface,
+ 'usage_type' => 1)
+
+ host_db.nics << detail
+
+ @logger.info "Added NIC #{nic.interface} with MAC #{nic.macaddr} to host #{host_qmf.hostname}"
+ end
end
def check_heartbeats()
@@ -436,7 +463,10 @@ class HostRegister < Qpid::Qmf::Console
# No heartbeat for 30 seconds.. deal with dead/disconnected agent.
debugputs "Agent #{key} timed out!"
@heartbeats.delete(key)
- agent_disconnected(agent)
+
+ agent_bank = agent.agent_bank
+ broker_bank = agent.broker_bank
+ agent_disconnected(agent_bank, broker_bank)
end
end
@@ -461,6 +491,7 @@ class HostRegister < Qpid::Qmf::Console
end # Class HostRegister
def main()
+ Thread.abort_on_exception = true
hostreg = HostRegister.new()
hostreg.check_heartbeats()
end
diff --git a/src/libvirt-list.rb b/src/libvirt-list.rb
index 54e8b7e..c81926a 100755
--- a/src/libvirt-list.rb
+++ b/src/libvirt-list.rb
@@ -2,21 +2,30 @@
$: << File.join(File.dirname(__FILE__), "./dutils")
-require "rubygems"
-require "qpid"
-require "dutils"
+require 'rubygems'
+require 'dutils'
+require 'qmf'
+require 'socket'
get_credentials('qpidd')
server, port = get_srv('qpidd', 'tcp')
raise "Unable to determine qpid server from DNS SRV record" if not server
-srv = "amqp://#{server}:#{port}"
-puts "Connecting to #{srv}.."
-s = Qpid::Qmf::Session.new()
-b = s.add_broker(srv, :mechanism => 'GSSAPI')
+puts "Connecting to #{server}, #{port}"
-nodes = s.objects(:class => "node")
+settings = Qmf::ConnectionSettings.new
+settings.host = server
+settings.port = port
+# settings.mechanism = 'GSSAPI'
+# settings.service = 'qpidd'
+
+connection = Qmf::Connection.new(settings)
+qmfc = Qmf::Console.new
+broker = qmfc.add_connection(connection)
+broker.wait_for_stable
+
+nodes = qmfc.objects(Qmf::Query.new(:class => "node"))
nodes.each do |node|
puts "node: #{node.hostname}"
for (key, val) in node.properties
@@ -24,7 +33,7 @@ nodes.each do |node|
end
# Find any domains that on the current node.
- domains = s.objects(:class => "domain", 'node' => node.object_id)
+ domains = qmfc.objects(Qmf::Query.new(:class => "domain", 'node' => node.object_id))
domains.each do |domain|
r = domain.getXMLDesc()
puts "getXMLDesc() status: #{r.status}"
@@ -39,7 +48,7 @@ nodes.each do |node|
end
end
- pools = s.objects(:class => "pool", 'node' => node.object_id)
+ pools = qmfc.objects(Qmf::Query.new(:class => "pool", 'node' => node.object_id))
pools.each do |pool|
puts " pool: #{pool.name}"
for (key, val) in pool.properties
@@ -54,7 +63,7 @@ nodes.each do |node|
end
# Find volumes that are part of the pool.
- volumes = s.objects(:class => "volume", 'pool' => pool.object_id)
+ volumes = qmfc.objects(Qmf::Query.new(:class => "volume", 'pool' => pool.object_id))
volumes.each do |volume|
puts " volume: #{volume.name}"
for (key, val) in volume.properties
diff --git a/src/matahari-list.rb b/src/matahari-list.rb
index ff714c5..8795019 100755
--- a/src/matahari-list.rb
+++ b/src/matahari-list.rb
@@ -2,21 +2,30 @@
$: << File.join(File.dirname(__FILE__), "./dutils")
-require "rubygems"
-require "qpid"
-require "dutils"
+require 'rubygems'
+require 'dutils'
+require 'qmf'
+require 'socket'
get_credentials('qpidd')
server, port = get_srv('qpidd', 'tcp')
raise "Unable to determine qpid server from DNS SRV record" if not server
-srv = "amqp://#{server}:#{port}"
-puts "Connecting to #{srv}.."
-s = Qpid::Qmf::Session.new()
-b = s.add_broker(srv, :mechanism => 'GSSAPI')
+puts "Connecting to #{server}, #{port}"
-hosts = s.objects(:class => "host")
+settings = Qmf::ConnectionSettings.new
+settings.host = server
+settings.port = port
+# settings.mechanism = 'GSSAPI'
+# settings.service = 'qpidd'
+
+connection = Qmf::Connection.new(settings)
+qmfc = Qmf::Console.new
+broker = qmfc.add_connection(connection)
+broker.wait_for_stable
+
+hosts = qmfc.objects(Qmf::Query.new(:class => 'host'))
hosts.each do |host|
puts "HOST: #{host.hostname}"
for (key, val) in host.properties
@@ -24,18 +33,18 @@ hosts.each do |host|
end
# List cpus for current host
- cpus = s.objects(:class => "cpu", 'host' => host.object_id)
+ cpus = qmfc.objects(Qmf::Query.new(:class => 'cpu', 'host' => host.object_id))
cpus.each do |cpu|
- puts " CPU:"
+ puts ' CPU:'
for (key, val) in cpu.properties
puts " property: #{key}, #{val}"
end
end # cpus.each
# List nics for current host
- nics = s.objects(:class => "nic", 'host' => host.object_id)
+ nics = qmfc.objects(Qmf::Query.new(:class => 'nic', 'host' => host.object_id))
nics.each do |nic|
- puts " NIC: "
+ puts ' NIC: '
for (key, val) in nic.properties
puts " property: #{key}, #{val}"
end
diff --git a/src/task-omatic/task_storage.rb b/src/task-omatic/task_storage.rb
index 77b0166..d698777 100644
--- a/src/task-omatic/task_storage.rb
+++ b/src/task-omatic/task_storage.rb
@@ -73,7 +73,7 @@ def task_storage_cobbler_setup(db_vm)
unless found
# Create a new transient NFS storage volume
# This volume is *not* persisted.
- image_volume = StorageVolume.factory("NFS", :filename => filename)
+ image_volume = StorageVolume.factory("NFS", :filename => filename, :key => filename)
image_volume.storage_pool
image_pool = StoragePool.factory(StoragePool::NFS)
@@ -116,13 +116,14 @@ class LibvirtPool
@xml.root.elements["target"].add_element("path")
end
- def connect(session, node)
- pools = session.objects(:class => 'pool', 'node' => node.object_id)
+ def connect(qmfc, node)
+ pools = qmfc.objects(:class => 'pool', 'node' => node.object_id)
pools.each do |pool|
result = pool.getXMLDesc
raise "Error getting xml description of pool: #{result.text}" unless result.status == 0
xml_desc = result.description
+
if self.xmlequal?(Document.new(xml_desc).root)
@remote_pool = pool
@logger.debug("Found existing storage pool #{pool.name} on host: #{node.hostname}")
@@ -134,7 +135,8 @@ class LibvirtPool
@logger.debug("Defining new storage pool: #{@xml.to_s} on host: #{node.hostname}")
result = node.storagePoolDefineXML(@xml.to_s, :timeout => 60 * 10)
raise "Error creating pool: #{result.text}" unless result.status == 0
- @remote_pool = session.object(:object_id => result.pool)
+ @remote_pool = qmfc.object(:object_id => result.pool)
+ obj_list = qmfc.objects(:object_id => result.pool)
raise "Error finding newly created remote pool." unless @remote_pool
# we need this because we don't want to "build" LVM pools, which would
diff --git a/src/task-omatic/taskomatic.rb b/src/task-omatic/taskomatic.rb
index ece60dc..13cf5af 100755
--- a/src/task-omatic/taskomatic.rb
+++ b/src/task-omatic/taskomatic.rb
@@ -23,7 +23,7 @@ $: << File.join(File.dirname(__FILE__), "../dutils")
$: << File.join(File.dirname(__FILE__), ".")
require 'rubygems'
-require 'qpid'
+require 'qmf'
require 'monitor'
require 'dutils'
require 'optparse'
@@ -115,10 +115,15 @@ class TaskOmatic
sleepy *= 2 if sleepy < 120
end
- @session = Qpid::Qmf::Session.new(:manage_connections => true)
- @logger.info "Connecting to amqp://#{server}:#{port}"
- @broker = @session.add_broker("amqp://#{server}:#{port}", :mechanism => 'GSSAPI')
+ settings = Qmf::ConnectionSettings.new
+ settings.host = server
+ settings.port = port
+ settings.sendUserId = false
+ @connection = Qmf::Connection.new(settings)
+ @qmfc = Qmf::Console.new
+ @broker = @qmfc.add_connection(@connection)
+ @broker.wait_for_stable
end
def ensure_credentials()
@@ -141,13 +146,13 @@ class TaskOmatic
# vm won't be returned. I think that's supposed to be for migration
# but it could break creation of VMs in certain conditions..
- vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+ vm = @qmfc.object(:class => "domain", 'uuid' => db_vm.uuid)
db_vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr|
# Now each of 'curr' is in the right hardware pool..
# now we check them out.
- node = @session.object(:class => "node", 'hostname' => curr.hostname)
+ node = @qmfc.object(:class => "node", 'hostname' => curr.hostname)
next unless node
# So now we expect if the node was found it's alive and well, then
@@ -205,12 +210,12 @@ class TaskOmatic
# activate the underlying physical device, and then do the logical one
if db_volume[:type] == "LvmStorageVolume"
phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume, @logger)
- phys_libvirt_pool.connect(@session, node)
+ phys_libvirt_pool.connect(@qmfc, node)
end
@logger.debug "Verifying mount of pool #{db_pool.ip_addr}:#{db_pool.type}:#{db_pool.target}:#{db_pool.export_path}"
libvirt_pool = LibvirtPool.factory(db_pool, @logger)
- libvirt_pool.connect(@session, node)
+ libvirt_pool.connect(@qmfc, node)
# OK, the pool should be all set. The last thing we need to do is get
# the path based on the volume key
@@ -220,12 +225,12 @@ class TaskOmatic
@logger.debug "Pool mounted: #{pool.name}; state: #{pool.state}"
- volume = @session.object(:class => 'volume',
+ volume = @qmfc.object(:class => 'volume',
'key' => volume_key,
'storagePool' => pool.object_id)
if volume == nil
@logger.info "Unable to find volume by key #{volume_key} attached to pool #{pool.name}, trying by filename..."
- volume = @session.object(:class => 'volume',
+ volume = @qmfc.object(:class => 'volume',
'name' => db_volume.filename,
'storagePool' => pool.object_id)
raise "Unable to find volume by key (#{volume_key}) or filename (#{db_volume.filename}), giving up." unless volume
@@ -254,11 +259,11 @@ class TaskOmatic
# This is rather silly because we only destroy pools if there are no
# more vms on the node. We should be reference counting the pools
# somehow so we know when they are no longer in use.
- vms = @session.objects(:class => 'domain', 'node' => node.object_id)
+ vms = @qmfc.objects(:class => 'domain', 'node' => node.object_id)
if vms.length > 0
return
end
- pools = @session.objects(:class => 'pool', 'node' => node.object_id)
+ pools = @qmfc.objects(:class => 'pool', 'node' => node.object_id)
# We do this in two passes, first undefine/destroys LVM pools, then
# we do physical pools.
@@ -281,13 +286,13 @@ class TaskOmatic
def task_shutdown_or_destroy_vm(task, action)
@logger.info "starting task_shutdown_or_destroy_vm"
db_vm = task.vm
- vm = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+ vm = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid)
if !vm
@logger.error "VM already shut down?"
return
end
- node = @session.object(:object_id => vm.node)
+ node = @qmfc.object(:object_id => vm.node)
raise "Unable to get node that vm is on??" unless node
if vm.state == "shutdown" or vm.state == "shutoff"
@@ -337,7 +342,7 @@ class TaskOmatic
@logger.info "starting task_start_vm"
db_vm = find_vm(task, false)
- vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+ vm = @qmfc.object(:class => "domain", 'uuid' => db_vm.uuid)
if vm
case vm.state
@@ -351,7 +356,7 @@ class TaskOmatic
end
db_host = find_capable_host(db_vm)
- node = @session.object(:class => "node", 'hostname' => db_host.hostname)
+ node = @qmfc.object(:class => "node", 'hostname' => db_host.hostname)
raise "Unable to find host #{db_host.hostname} to create VM on." unless node
@logger.info("VM will be started on node #{node.hostname}")
@@ -400,7 +405,7 @@ class TaskOmatic
result = node.domainDefineXML(xml.to_s)
raise "Error defining virtual machine: #{result.text}" unless result.status == 0
- domain = @session.object(:object_id => result.domain)
+ domain = @qmfc.object(:object_id => result.domain)
raise "Cannot find domain on host #{db_host.hostname}, cannot start virtual machine." unless domain
result = domain.create
@@ -432,7 +437,7 @@ class TaskOmatic
def task_suspend_vm(task)
@logger.info "starting task_suspend_vm"
db_vm = task.vm
- dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+ dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid)
raise "Unable to locate VM to suspend" unless dom
if dom.state != "running" and dom.state != "blocked"
@@ -450,7 +455,7 @@ class TaskOmatic
def task_resume_vm(task)
@logger.info "starting task_resume_vm"
db_vm = task.vm
- dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+ dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid)
raise "Unable to locate VM to resume" unless dom
if dom.state == "running"
@@ -478,7 +483,7 @@ class TaskOmatic
# need to put it on the storage server and mark it in the database
# where the image is stored.
db_vm = task.vm
- dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+ dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid)
raise "Unable to locate VM to save" unless dom
filename = "/tmp/#{dom.uuid}.save"
@@ -495,7 +500,7 @@ class TaskOmatic
# FIXME: This is also broken, see task_save_vm FIXME.
db_vm = task.vm
- dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+ dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid)
raise "Unable to locate VM to restore" unless dom
filename = "/tmp/#{dom.uuid}.save"
@@ -508,9 +513,9 @@ class TaskOmatic
def migrate(db_vm, dest = nil)
- vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+ vm = @qmfc.object(:class => "domain", 'uuid' => db_vm.uuid)
raise "Unable to find VM to migrate" unless vm
- src_node = @session.object(:object_id => vm.node)
+ src_node = @qmfc.object(:object_id => vm.node)
raise "Unable to find node that VM is on??" unless src_node
@logger.info "Migrating domain lookup complete, domain is #{vm}"
@@ -528,7 +533,7 @@ class TaskOmatic
db_dst_host = find_capable_host(db_vm)
end
- dest_node = @session.object(:class => 'node', 'hostname' => db_dst_host.hostname)
+ dest_node = @qmfc.object(:class => 'node', 'hostname' => db_dst_host.hostname)
raise "Unable to find host #{db_dst_host.hostname} to migrate to." unless dest_node
volumes = []
@@ -589,7 +594,7 @@ class TaskOmatic
next
end
puts "searching for node with hostname #{host.hostname}"
- node = @session.object(:class => 'node', 'hostname' => host.hostname)
+ node = @qmfc.object(:class => 'node', 'hostname' => host.hostname)
puts "node returned is #{node}"
return node if node
end
@@ -643,13 +648,13 @@ class TaskOmatic
@logger.info("refresh being done on node #{node.hostname}")
phys_libvirt_pool = LibvirtPool.factory(db_pool_phys, @logger)
- phys_libvirt_pool.connect(@session, node)
+ phys_libvirt_pool.connect(@qmfc, node)
db_pool_phys.state = StoragePool::STATE_AVAILABLE
db_pool_phys.save!
begin
# First we do the physical volumes.
- volumes = @session.objects(:class => 'volume',
+ volumes = @qmfc.objects(:class => 'volume',
'storagePool' => phys_libvirt_pool.remote_pool.object_id)
volumes.each do |volume|
storage_volume = StorageVolume.factory(db_pool_phys.get_type_label)
@@ -696,9 +701,9 @@ class TaskOmatic
physical_vol.save!
lvm_libvirt_pool = LibvirtPool.factory(lvm_db_pool, @logger)
- lvm_libvirt_pool.connect(@session, node)
+ lvm_libvirt_pool.connect(@qmfc, node)
- lvm_volumes = @session.objects(:class => 'volume',
+ lvm_volumes = @qmfc.objects(:class => 'volume',
'storagePool' => lvm_libvirt_pool.remote_pool.object_id)
lvm_volumes.each do |lvm_volume|
@@ -733,16 +738,16 @@ class TaskOmatic
begin
if db_volume[:type] == "LvmStorageVolume"
phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume, @logger)
- phys_libvirt_pool.connect(@session, node)
+ phys_libvirt_pool.connect(@qmfc, node)
end
begin
libvirt_pool = LibvirtPool.factory(db_pool, @logger)
begin
- libvirt_pool.connect(@session, node)
+ libvirt_pool.connect(@qmfc, node)
volume_id = libvirt_pool.create_vol(*db_volume.volume_create_params)
- volume = @session.object(:object_id => volume_id)
+ volume = @qmfc.object(:object_id => volume_id)
raise "Unable to find newly created volume" unless volume
@logger.debug " volume:"
@@ -776,7 +781,7 @@ class TaskOmatic
# I currently refresh ALL storage pools at this time as it
# shouldn't be a long operation and it doesn't hurt to refresh
# them once in a while.
- pools = @session.objects(:class => 'pool')
+ pools = @qmfc.objects(:class => 'pool')
pools.each do |pool|
result = pool.refresh
@logger.info "Problem refreshing pool (you can probably ignore this): #{result.text}" unless result.status == 0
@@ -798,16 +803,16 @@ class TaskOmatic
begin
if db_volume[:type] == "LvmStorageVolume"
phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume, @logger)
- phys_libvirt_pool.connect(@session, node)
+ phys_libvirt_pool.connect(@qmfc, node)
@logger.info "connected to lvm pool.."
end
begin
libvirt_pool = LibvirtPool.factory(db_pool, @logger)
- libvirt_pool.connect(@session, node)
+ libvirt_pool.connect(@qmfc, node)
begin
- volume = @session.object(:class => 'volume',
+ volume = @qmfc.object(:class => 'volume',
'storagePool' => libvirt_pool.remote_pool.object_id,
'key' => db_volume.key)
@logger.error "Unable to find volume to delete" unless volume
@@ -861,7 +866,7 @@ class TaskOmatic
was_disconnected = false
loop do
- if not @broker.connected?
+ if not @connection.connected?
@logger.info("Cannot implement tasks, not connected to broker. Sleeping.")
sleep(@sleeptime * 3)
was_disconnected = true
@@ -870,7 +875,7 @@ class TaskOmatic
@logger.info("Reconnected, resuming task checking..") if was_disconnected
was_disconnected = false
- @session.object(:class => 'agent')
+ @qmfc.object(:class => 'agent')
tasks = Array.new
begin
--
1.6.2.5
More information about the ovirt-devel
mailing list