[Ovirt-devel] [PATCH server] Taskomatic Refactoring and Qpidification Take 3

Chris Lalancette clalance at redhat.com
Tue Jan 6 09:39:05 UTC 2009


Ian Main wrote:
> This update adds a few small bugfixes, removes LVM storage pool/volume
> scanning and reindents to 2 spaces.
> 
> This patch reworks taskomatic quite a bit.  This mostly just shifts
> taskomatic to using the qpid interface in place of ruby-libvirt.  It
> also fixes a few bugs I discovered a long the way and adds new ones
> I'm sure.  The only other thing added was round-robin host selection
> for VMs.
> 
> Wherevery possible the hosts are queried directly using qpid rather than
> relying on states from the database.
> 
> This patch loses about 150 lines from the original taskomatic and moves
> most of the task implementation into a central class.  This was done to
> provide access to the qpid session as well as providing for locking/task
> ordering in future versions.
> 
> This requires the latest libvirt-qpid (0.2.8) as it fixes a number of
> bugs.  It's in the ovirt repository now.
> 
> Issues remaining:
> 
> - libvirt-qpid migrate is broken.  Since the migrate takes place on the
>   node instead of from the ovirt-appliance, the source node doesn't have
>   the ability to authenticate against the destination node.  For this
>   reason I'm still using ruby-libvirt migrate.  I talked to Chris about
>   this and we have a plan worked out. :)
> 
> - I wanted to get threading into this but that will have to wait.  I'll
>   post a thread about this to get the discussion started again.  I think
>   the refactoring allows this to be put in pretty easily.
> 
> Signed-off-by: Ian Main <imain at redhat.com>
> ---
>  src/task-omatic/task_host.rb    |   33 --
>  src/task-omatic/task_storage.rb |  424 +++++++++-----------
>  src/task-omatic/task_vm.rb      |  574 +--------------------------
>  src/task-omatic/taskomatic.rb   |  841 ++++++++++++++++++++++++++++++++++-----
>  src/task-omatic/utils.rb        |  221 ----------
>  5 files changed, 933 insertions(+), 1160 deletions(-)
>  delete mode 100644 src/task-omatic/task_host.rb
>  delete mode 100644 src/task-omatic/utils.rb
> 
> diff --git a/src/task-omatic/task_host.rb b/src/task-omatic/task_host.rb
> deleted file mode 100644
> index 3d039fb..0000000
> --- a/src/task-omatic/task_host.rb
> +++ /dev/null
> @@ -1,33 +0,0 @@
> -# Copyright (C) 2008 Red Hat, Inc.
> -# Written by Chris Lalancette <clalance at redhat.com>
> -#
> -# This program is free software; you can redistribute it and/or modify
> -# it under the terms of the GNU General Public License as published by
> -# the Free Software Foundation; version 2 of the License.
> -#
> -# This program is distributed in the hope that it will be useful,
> -# but WITHOUT ANY WARRANTY; without even the implied warranty of
> -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> -# GNU General Public License for more details.
> -#
> -# You should have received a copy of the GNU General Public License
> -# along with this program; if not, write to the Free Software
> -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
> -# MA  02110-1301, USA.  A copy of the GNU General Public License is
> -# also available at http://www.gnu.org/copyleft/gpl.html.
> -
> -require 'utils'
> -
> -# FIXME: a little ugly to be including all of task_vm here, but
> -# utils really isn't the right place for the migrate() method
> -require 'task_vm'
> -
> -def clear_vms_host(task)
> -  puts "clear_vms_host"
> -
> -  src_host = task.host
> -
> -  src_host.vms.each do |vm|
> -    migrate(vm)
> -  end
> -end
> diff --git a/src/task-omatic/task_storage.rb b/src/task-omatic/task_storage.rb
> index 19800fb..a5eb55d 100644
> --- a/src/task-omatic/task_storage.rb
> +++ b/src/task-omatic/task_storage.rb
> @@ -16,287 +16,231 @@
>  # MA  02110-1301, USA.  A copy of the GNU General Public License is
>  # also available at http://www.gnu.org/copyleft/gpl.html.
>  
> -require 'utils'
> -
>  require 'libvirt'
> +require 'rexml/document'
> +include REXML
> +
> +def String.random_alphanumeric(size=16)
> +  s = ""
> +  size.times { s << (i = Kernel.rand(62); i += ((i < 10) ? 48 : ((i < 36) ? 55 : 61 ))).chr }
> +  s
> +end
> +
> +def get_libvirt_lvm_pool_from_volume(db_volume)
> +  phys_volume = StorageVolume.find(:first, :conditions =>
> +                                   ["lvm_pool_id = ?", db_volume.storage_pool_id])
> +
> +  return LibvirtPool.factory(phys_volume.storage_pool)
> +end
> +
> +class LibvirtPool
> +
> +  attr_reader :remote_pool

I've tried to avoid this before by trying to make accessor methods for the
class; I think it is a little cleaner to not let external entities look into the
class.   What are you using this for now?

>  
> -def add_volumes_to_db(db_pool, libvirt_pool, owner = nil, group = nil, mode = nil)
> -  # FIXME: this is currently broken if you do something like:
> -  # 1.  Add an iscsi pool with 3 volumes (lun-1, lun-2, lun-3)
> -  # 2.  Scan it in
> -  # 3.  Remove lun-3 from the pool
> -  # 4.  Re-scan it
> -  # What will happen is that you will still have lun-3 available in the
> -  # database, even though it's not available in the pool anymore.  It's a
> -  # little tricky, though; we have to make sure that we don't pull the
> -  # database entry out from underneath a possibly running VM (or do we?)
> -  libvirt_pool.list_volumes.each do |volname|
> -    storage_volume = StorageVolume.factory(db_pool.get_type_label)
> -
> -    # NOTE: it is safe (and, in fact, necessary) to use
> -    # #{storage_volume.volume_name} here without sanitizing it.  This is
> -    # because this is *not* based on user modifiable data, but rather, on an
> -    # internal implementation detail
> -    existing_vol = StorageVolume.find(:first, :conditions =>
> -                                      [ "storage_pool_id = ? AND #{storage_volume.volume_name} = ?",
> -                                        db_pool.id, volname])
> -    if existing_vol != nil
> -      # in this case, this path already exists in the database; just skip
> -      next
> +  def initialize(type, name = nil)
> +    @remote_pool = nil
> +    @build_on_start = true
> +    @remote_pool_defined = false
> +    @remote_pool_started = false
> +
> +    if name == nil
> +      @name = type + "-" + String.random_alphanumeric
> +    else
> +      @name = name
>      end
>  
> -    volptr = libvirt_pool.lookup_vol_by_name(volname)
> +    @xml = Document.new
> +    @xml.add_element("pool", {"type" => type})
> +
> +    @xml.root.add_element("name").add_text(@name)
>  
> -    volinfo = volptr.info
> +    @xml.root.add_element("source")
>  
> -    storage_volume = StorageVolume.factory(db_pool.get_type_label)
> -    storage_volume.path = volptr.path
> -    storage_volume.size = volinfo.capacity / 1024
> -    storage_volume.storage_pool_id = db_pool.id
> -    storage_volume.write_attribute(storage_volume.volume_name, volname)
> -    storage_volume.lv_owner_perms = owner
> -    storage_volume.lv_group_perms = group
> -    storage_volume.lv_mode_perms = mode
> -    storage_volume.state = StorageVolume::STATE_AVAILABLE
> -    storage_volume.save!
> +    @xml.root.add_element("target")
> +    @xml.root.elements["target"].add_element("path")
>    end
> -end
>  
> -def storage_find_suitable_host(hardware_pool)
> -  conn = nil
> -  hardware_pool.hosts.each do |host|
> -    if not host.is_disabled.nil? and host.is_disabled == 0 \
> -      and host.state == Host::STATE_AVAILABLE
> -      begin
> -        # FIXME: this can hang up taskomatic for quite some time.  To see how,
> -        # make one of your remote servers do "iptables -I INPUT -j DROP"
> -        # and then try to run this; it will take TCP quite a while to give up.
> -        # Unfortunately the solution is probably to do some sort of threading
> -        conn = Libvirt::open("qemu+tcp://" + host.hostname + "/system")
> -
> -        # if we didn't raise an exception, we connected; get out of here
> +  def connect(session, node)
> +    pools = session.objects(:class => 'pool', 'node' => node.object_id)
> +    pools.each do |pool|
> +      result = pool.getXMLDesc
> +      raise "Error getting xml description of pool: #{result.text}" unless result.status == 0
> +
> +      xml_desc = result.description
> +      if self.xmlequal?(Document.new(xml_desc).root)
> +        @remote_pool = pool
>          break
> -      rescue Libvirt::ConnectionError
> -        # if we couldn't connect for whatever reason, just try the next host
> -        next
>        end
>      end
> -  end
>  
> -  if conn == nil
> -    # last ditch effort; if we didn't find any hosts, just use ourselves.
> -    # this may or may not work
> -    begin
> -      conn = Libvirt::open("qemu:///system")
> -    rescue
> +    #XXX: I'm not sure.. it seems like there could be other things going on
> +    #   with the storage pool state.  State can be inactive, building, running
> +    #   or degraded.  I think some more thought should go here to make sure
> +    #   we're doing things right in each state.

Yes, there are definitely other things that could be going on with the pool, and
they definitely need to be thought about.  Two things, though; this comment
doesn't belong here (it belongs near the "if @remote_pool.state == "inactive"
section below), and please use "FIXME" to be consistent with the rest of the
source code.

> +    if @remote_pool == nil
> +      result = node.storagePoolDefineXML(@xml.to_s)
> +      raise "Error creating pool: #{result.text}" unless result.status == 0

"Error defining pool" is a little more accurate.

> +      @remote_pool = session.object(:object_id => result.pool)
> +      raise "Error finding newly created remote pool." unless @remote_pool
> +
> +      # we need this because we don't want to "build" LVM pools, which would
> +      # destroy existing data
> +      if @build_on_start
> +        result = @remote_pool.build
> +        raise "Error building pool: #{result.text}" unless result.status == 0
> +      end
> +      @remote_pool_defined = true
> +    end
> +
> +    if @remote_pool.state == "inactive"
> +      # only try to start the pool if it is currently inactive; in all other
> +      # states, assume it is already running
> +      result = @remote_pool.create
> +      raise "Error creating pool: #{result.text}" unless result.status == 0
> +
> +      # Refresh qpid object with new properties.
> +      @remote_pool.update
> +
> +      @remote_pool_started = true
>      end
>    end
>  
> -  if conn == nil
> -    raise "Could not find a host to scan storage"
> +  def create_vol(type, name, size, owner, group, mode)
> +    @vol_xml = Document.new
> +    @vol_xml.add_element("volume", {"type" => type})
> +    @vol_xml.root.add_element("name").add_text(name)
> +    @vol_xml.root.add_element("capacity", {"unit" => "K"}).add_text(size.to_s)
> +    @vol_xml.root.add_element("target")
> +    @vol_xml.root.elements["target"].add_element("permissions")
> +    @vol_xml.root.elements["target"].elements["permissions"].add_element("owner").add_text(owner)
> +    @vol_xml.root.elements["target"].elements["permissions"].add_element("group").add_text(group)
> +    @vol_xml.root.elements["target"].elements["permissions"].add_element("mode").add_text(mode)
>    end
>  
> -  return conn
> -end
> +  def shutdown
> +    if @remote_pool_started
> +      result = @remote_pool.destroy
> +    end
> +    if @remote_pool_defined
> +      result = @remote_pool.undefine
> +    end
> +  end
>  
> -# The words "pool" and "volume" are ridiculously overloaded in our context.
> -# Therefore, the refresh_pool method adopts this convention:
> -# phys_db_pool: The underlying physical storage pool, as it is represented in
> -#               the database
> -# phys_libvirt_pool: The underlying physical storage, as it is represented in
> -#                    libvirt
> -# lvm_db_pool: The logical storage pool (if it exists), as it is represented
> -#              in the database
> -# lvm_libvirt_pool: The logical storage pool (if it exists), as it is
> -#                   represented in the database
> -
> -def refresh_pool(task)
> -  puts "refresh_pool"
> -
> -  phys_db_pool = task.storage_pool
> -  if phys_db_pool == nil
> -    raise "Could not find storage pool"
> +  def xmlequal?(docroot)
> +    return false
>    end
>  
> -  conn = storage_find_suitable_host(phys_db_pool.hardware_pool)
> -
> -  begin
> -    phys_libvirt_pool = LibvirtPool.factory(phys_db_pool)
> -    phys_libvirt_pool.connect(conn)
> -
> -    begin
> -      # OK, the pool is all set.  Add in all of the volumes
> -      add_volumes_to_db(phys_db_pool, phys_libvirt_pool)
> -
> -      phys_db_pool.state = StoragePool::STATE_AVAILABLE
> -      phys_db_pool.save!
> -
> -      # OK, now we've scanned the underlying hardware pool and added the
> -      # volumes.  Next we scan for pre-existing LVM volumes
> -      logical_xml = conn.discover_storage_pool_sources("logical")
> -
> -      Document.new(logical_xml).elements.each('sources/source') do |source|
> -        vgname = source.elements["name"].text
> -
> -        begin
> -          source.elements.each("device") do |device|
> -            byid_device = phys_libvirt_pool.lookup_vol_by_path(device.attributes["path"]).path
> -          end
> -        rescue
> -          # If matching any of the <device> sections in the LVM XML fails
> -          # against the storage pool, then it is likely that this is a storage
> -          # pool not associated with the one we connected above.  Go on
> -          # FIXME: it would be nicer to catch the right exception here, and
> -          # fail on other exceptions
> -          puts "One of the logical volumes in #{vgname} is not part of the pool of type #{phys_db_pool[:type]} that we are scanning; ignore the previous error!"
> -          next
> -        end
> -
> -        # if we make it here, then we were able to resolve all of the devices,
> -        # so we know we need to use a new pool
> -        lvm_db_pool = LvmStoragePool.find(:first, :conditions =>
> -                                          [ "vg_name = ?", vgname ])
> -        if lvm_db_pool == nil
> -          lvm_db_pool = LvmStoragePool.new
> -          lvm_db_pool[:type] = "LvmStoragePool"
> -          # set the LVM pool to the same hardware pool as the underlying storage
> -          lvm_db_pool.hardware_pool_id = phys_db_pool.hardware_pool_id
> -          lvm_db_pool.vg_name = vgname
> -          lvm_db_pool.save!
> -        end
> -
> -        source.elements.each("device") do |device|
> -          byid_device = phys_libvirt_pool.lookup_vol_by_path(device.attributes["path"]).path
> -          physical_vol = StorageVolume.find(:first, :conditions =>
> -                                            [ "path = ?",  byid_device])
> -          if physical_vol == nil
> -            # Hm. We didn't find the device in the storage volumes already.
> -            # something went wrong internally, and we have to bail
> -            raise "Storage internal physical volume error"
> -          end
> -
> -          # OK, put the right lvm_pool_id in place
> -          physical_vol.lvm_pool_id = lvm_db_pool.id
> -          physical_vol.save!
> -        end
> -
> -        lvm_libvirt_pool = LibvirtPool.factory(lvm_db_pool)
> -        lvm_libvirt_pool.connect(conn)
> -
> -        begin
> -          add_volumes_to_db(lvm_db_pool, lvm_libvirt_pool, "0744", "0744", "0744")
> -        ensure
> -          lvm_libvirt_pool.shutdown
> -        end
> -      end
> -    ensure
> -      phys_libvirt_pool.shutdown
> +  def self.factory(pool)
> +    if pool[:type] == "IscsiStoragePool"
> +      return IscsiLibvirtPool.new(pool.ip_addr, pool[:target])
> +    elsif pool[:type] == "NfsStoragePool"
> +      return NFSLibvirtPool.new(pool.ip_addr, pool.export_path)
> +    elsif pool[:type] == "LvmStoragePool"
> +      # OK, if this is LVM storage, there are two cases we need to care about:
> +      # 1) this is a LUN with LVM already on it.  In this case, all we need to
> +      #  do is to create a new LV (== libvirt volume), and be done with it
> +      # 2) this LUN is blank, so there is no LVM on it already.  In this
> +      #  case, we need to pvcreate, vgcreate first (== libvirt pool build),
> +      #  and *then* create the new LV (== libvirt volume) on top of that.
> +      #
> +      # We can tell the difference between an LVM Pool that exists and one
> +      # that needs to be created based on the value of the pool.state;
> +      # if it is PENDING_SETUP, we need to create it first
> +      phys_volume = StorageVolume.find(:first, :conditions =>
> +                                       [ "lvm_pool_id = ?", pool.id])
> +      return LVMLibvirtPool.new(pool.vg_name, phys_volume.path,
> +                                pool.state == StoragePool::STATE_PENDING_SETUP)
> +    else
> +      raise "Unknown storage pool type " + pool[:type].to_s
>      end
> -  ensure
> -    conn.close
>    end
>  end
>  
> -def create_volume(task)
> -  puts "create_volume"
> +class IscsiLibvirtPool < LibvirtPool
> +  def initialize(ip_addr, target)
> +    super('iscsi')
> +
> +    @type = 'iscsi'
> +    @ipaddr = ip_addr
> +    @target = target
> +
> +    @xml.root.elements["source"].add_element("host", {"name" => @ipaddr})
> +    @xml.root.elements["source"].add_element("device", {"path" => @target})
>  
> -  db_volume = task.storage_volume
> -  if db_volume == nil
> -    raise "Could not find storage volume to create"
> +    @xml.root.elements["target"].elements["path"].text = "/dev/disk/by-id"
>    end
>  
> -  db_pool = db_volume.storage_pool
> -  if db_pool == nil
> -    raise "Could not find storage pool"
> +  def xmlequal?(docroot)
> +    return (docroot.attributes['type'] == @type and
> +        docroot.elements['source'].elements['host'].attributes['name'] == @ipaddr and
> +        docroot.elements['source'].elements['device'].attributes['path'] == @target)
>    end
> +end
>  
> -  conn = storage_find_suitable_host(db_pool.hardware_pool)
> +class NFSLibvirtPool < LibvirtPool
> +  def initialize(ip_addr, export_path)
> +    super('netfs')
>  
> -  begin
> -    if db_volume[:type] == "LvmStorageVolume"
> -      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
> -      phys_libvirt_pool.connect(conn)
> -    end
> +    @type = 'netfs'
> +    @host = ip_addr
> +    @remote_path = export_path
> +    @name = String.random_alphanumeric

I'm not sure if this is a bug from my code before, or a new one, but @name is
already set by super() (in a better way, in my opinion), so you don't need to
re-do it here.

>  
> -    begin
> -      libvirt_pool = LibvirtPool.factory(db_pool)
> +    @xml.root.elements["source"].add_element("host", {"name" => @host})
> +    @xml.root.elements["source"].add_element("dir", {"path" => @remote_path})
> +    @xml.root.elements["source"].add_element("format", {"type" => "nfs"})
>  
> -      begin
> -        libvirt_pool.connect(conn)
> +    @xml.root.elements["target"].elements["path"].text = "/mnt/" + @name
> +  end
>  
> -        libvirt_pool.create_vol(*db_volume.volume_create_params)
> -        db_volume.state = StorageVolume::STATE_AVAILABLE
> -        db_volume.save!
> +  def create_vol(name, size, owner, group, mode)
> +    # FIXME: this can actually take some time to complete (since we aren't
> +    # doing sparse allocations at the moment).  During that time, whichever
> +    # libvirtd we chose to use is completely hung up.  The solution is 3-fold:
> +    # 1.  Allow sparse allocations in the WUI front-end
> +    # 2.  Make libvirtd multi-threaded
> +    # 3.  Make taskomatic multi-threaded
> +    super("netfs", name, size, owner, group, mode)
> +
> +    # FIXME: we have to add the format as raw here because of a bug in libvirt;
> +    # if you specify a volume with no format, it will crash libvirtd
> +    @vol_xml.root.elements["target"].add_element("format", {"type" => "raw"})
> +    result = @remote_pool.createVolumeXML(@vol_xml.to_s)
> +    raise "Error creating remote pool: #{result.text}" unless result.status == 0
> +    return result.volume
> +  end
>  
> -        db_pool.state = StoragePool::STATE_AVAILABLE
> -        db_pool.save!
> -      ensure
> -        libvirt_pool.shutdown
> -      end
> -    ensure
> -      if db_volume[:type] == "LvmStorageVolume"
> -        phys_libvirt_pool.shutdown
> -      end
> -    end
> -  ensure
> -    conn.close
> +  def xmlequal?(docroot)
> +    return (docroot.attributes['type'] == @type and
> +        docroot.elements['source'].elements['host'].attributes['name'] == @host and
> +        docroot.elements['source'].elements['dir'].attributes['path'] == @remote_path)
>    end
>  end
>  
> -def delete_volume(task)
> -  puts "delete_volume"
> +class LVMLibvirtPool < LibvirtPool
> +  def initialize(vg_name, device, build_on_start)
> +    super('logical', vg_name)
>  
> -  db_volume = task.storage_volume
> -  if db_volume == nil
> -    raise "Could not find storage volume to create"
> -  end
> +    @type = 'logical'
> +    @build_on_start = build_on_start
>  
> -  db_pool = db_volume.storage_pool
> -  if db_pool == nil
> -    raise "Could not find storage pool"
> +    @xml.root.elements["source"].add_element("name").add_text(@name)
> +    @xml.root.elements["source"].add_element("device", {"path" => device})
> +    @xml.root.elements["target"].elements["path"].text = "/dev/" + @name
>    end
>  
> -  conn = storage_find_suitable_host(db_pool.hardware_pool)
> -
> -  begin
> -    if db_volume[:type] == "LvmStorageVolume"
> -      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
> -      phys_libvirt_pool.connect(conn)
> -    end
> +  def create_vol(name, size, owner, group, mode)
> +    super("logical", name, size, owner, group, mode)
> +    result = @remote_pool.createVolumeXML(@vol_xml.to_s)
> +    raise "Error creating remote pool: #{result.text}" unless result.status == 0
> +    return result.volume
> +  end
>  
> -    begin
> -      libvirt_pool = LibvirtPool.factory(db_pool)
> -      libvirt_pool.connect(conn)
> -
> -      begin
> -        libvirt_volume = libvirt_pool.lookup_vol_by_name(db_volume.read_attribute(db_volume.volume_name))
> -        # FIXME: we actually probably want to zero out the whole volume here, so
> -        # we aren't potentially leaking data from one user to another.  There
> -        # are two problems, though:
> -        # 1)  I'm not sure how I would go about zero'ing the data on a remote
> -        # machine, since there is no "libvirt_write_data" call
> -        # 2)  This could potentially take quite a while, so we want to spawn
> -        # off another thread to do it
> -        libvirt_volume.delete
> -
> -        # Note: we have to nil out the task_target because when we delete the
> -        # volume object, that also deletes all dependent tasks (including this
> -        # one), which leads to accessing stale tasks.  Orphan the task, then
> -        # delete the object; we can clean up orphans later (or not, depending
> -        # on the audit policy)
> -        task.task_target = nil
> -        task.save!
> -
> -        db_volume.destroy
> -      ensure
> -        libvirt_pool.shutdown
> -      end
> -    ensure
> -      if db_volume[:type] == "LvmStorageVolume"
> -        phys_libvirt_pool.shutdown
> -      end
> -    end
> -  ensure
> -    conn.close
> +  def xmlequal?(docroot)
> +    return (docroot.attributes['type'] == @type and
> +        docroot.elements['name'].text == @name and
> +        docroot.elements['source'].elements['name'] and
> +        docroot.elements['source'].elements['name'].text == @name)
>    end
>  end
> +
> diff --git a/src/task-omatic/task_vm.rb b/src/task-omatic/task_vm.rb
> index c187287..46ef261 100644
> --- a/src/task-omatic/task_vm.rb
> +++ b/src/task-omatic/task_vm.rb
> @@ -19,35 +19,10 @@
>  require 'rexml/document'
>  include REXML
>  
> -require 'utils'
> -
>  gem 'cobbler'
>  require 'cobbler'
>  
> -def findHostSLA(vm)
> -  host = nil
> -
> -  vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr|
> -    # FIXME: we probably need to add in some notion of "load" into this check
> -    if curr.num_cpus >= vm.num_vcpus_allocated \
> -      and curr.memory >= vm.memory_allocated \
> -      and not curr.is_disabled.nil? and curr.is_disabled == 0 \
> -      and curr.state == Host::STATE_AVAILABLE \
> -      and (vm.host_id.nil? or (not vm.host_id.nil? and vm.host_id != curr.id))
> -      host = curr
> -      break
> -    end
> -  end
> -
> -  if host == nil
> -    # we couldn't find a host that matches this criteria
> -    raise "No host matching VM parameters could be found"
> -  end
> -
> -  return host
> -end
> -
> -def findHost(host_id)
> +def find_host(host_id)
>    host = Host.find(:first, :conditions => [ "id = ?", host_id])
>  
>    if host == nil
> @@ -58,75 +33,6 @@ def findHost(host_id)
>    return host
>  end
>  
> -def connect_storage_pools(conn, storage_volumes)
> -  storagedevs = []
> -  storage_volumes.each do |volume|
> -    # here, we need to iterate through each volume and possibly attach it
> -    # to the host we are going to be using
> -    db_pool = volume.storage_pool
> -    if db_pool == nil
> -      # Hum.  Specified by the VM description, but not in the storage pool?
> -      # continue on and hope for the best
> -      puts "Couldn't find pool for volume #{volume.path}; skipping"
> -      next
> -    end
> -
> -    # we have to special case LVM pools.  In that case, we need to first
> -    # activate the underlying physical device, and then do the logical one
> -    if volume[:type] == "LvmStorageVolume"
> -      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(volume)
> -      phys_libvirt_pool.connect(conn)
> -    end
> -
> -    libvirt_pool = LibvirtPool.factory(db_pool)
> -    libvirt_pool.connect(conn)
> -
> -    # OK, the pool should be all set.  The last thing we need to do is get
> -    # the path based on the volume name
> -    storagedevs << libvirt_pool.lookup_vol_by_name(volume.read_attribute(volume.volume_name)).path
> -  end
> -
> -  return storagedevs
> -end
> -
> -def remove_pools(conn, type = nil)
> -  all_storage_pools(conn).each do |remote_pool_name|
> -    pool = conn.lookup_storage_pool_by_name(remote_pool_name)
> -
> -    if type == nil or type == Document.new(pool.xml_desc).root.attributes['type']
> -      begin
> -        pool.destroy
> -      rescue
> -      end
> -
> -      begin
> -        # if the destroy failed, we still try to undefine; it may be a pool
> -        # that was previously destroyed but not undefined for whatever reason
> -        pool.undefine
> -      rescue
> -        # do nothing if any of this failed; the worst that happens is that
> -        # we leave a pool configured
> -        puts "Could not teardown pool " + remote_pool_name + "; skipping"
> -      end
> -    end
> -  end
> -end
> -
> -def teardown_storage_pools(conn)
> -  # FIXME: this needs to get a *lot* smarter.  In particular, we want to make
> -  # sure we can tear down unused pools even when there are other guests running
> -  if conn.list_domains.empty?
> -    # OK, there are no running guests on this host anymore.  We can teardown
> -    # any storage pools that are there without fear
> -
> -    # we first have to tear-down LVM pools, because they might depend on the
> -    # underlying physical pools
> -    remove_pools(conn, "logical")
> -
> -    # now tear down the rest of the pools
> -    remove_pools(conn)
> -  end
> -end
>  
>  def create_vm_xml(name, uuid, memAllocated, memUsed, vcpus, bootDevice,
>                    macAddr, bridge, diskDevices)
> @@ -195,13 +101,13 @@ def create_vm_xml(name, uuid, memAllocated, memUsed, vcpus, bootDevice,
>    return doc
>  end
>  
> -def setVmState(vm, state)
> +def set_vm_state(vm, state)
>    vm.state = state
>    vm.save!
>  end
>  
> -def setVmVncPort(vm, domain)
> -  doc = REXML::Document.new(domain.xml_desc)
> +def set_vm_vnc_port(vm, xml_desc)
> +  doc = REXML::Document.new(xml_desc)
>    attrib = REXML::XPath.match(doc, "//graphics/@port")
>    if not attrib.empty?:
>      vm.vnc_port = attrib.to_s.to_i
> @@ -209,32 +115,22 @@ def setVmVncPort(vm, domain)
>    vm.save!
>  end
>  
> -def findVM(task, fail_on_nil_host_id = true)
> +def find_vm(task, fail_on_nil_host_id = true)
>    # find the matching VM in the vms table
>    vm = task.vm
>  
>    if vm == nil
> -    raise "VM not found for task " + task.id
> +    raise "VM #{task.vm} not found for task #{task.id}"
>    end
>  
>    if vm.host_id == nil && fail_on_nil_host_id
> -    # in this case, we have no idea where the VM is.  How can we handle this
> -    # gracefully?  We don't necessarily want to just set the VM state to off;
> -    # if the machine does happen to be running somewhere and we set it to
> -    # disabled here, and then start it again, we could corrupt the disk
> -
> -    # FIXME: the right thing to do here is probably to contact all of the
> -    # hosts we know about and ensure that the domain isn't running; then we
> -    # can mark it either as off (if we didn't find it), or mark the correct
> -    # vm.host_id if we did.  However, if you have a large number of hosts
> -    # out there, this could take a while.
>      raise "No host_id for VM " + vm.id.to_s
>    end
>  
>    return vm
>  end
>  
> -def setVmShutdown(vm)
> +def set_vm_shut_down(vm)
>    vm.host_id = nil
>    vm.memory_used = nil
>    vm.num_vcpus_used = nil
> @@ -244,459 +140,3 @@ def setVmShutdown(vm)
>    vm.save!
>  end
>  
> -def create_vm(task)
> -  puts "create_vm"
> -
> -  vm = findVM(task, false)
> -
> -  if vm.state != Vm::STATE_PENDING
> -    raise "VM not pending"
> -  end
> -  setVmState(vm, Vm::STATE_CREATING)
> -
> -  # create cobbler system profile
> -  begin
> -    # FIXME: Presently the wui handles all cobbler system creation.
> -    # This should be moved out of the wui into Taskomatic.  Specifically
> -    # here, and in the edit_vm methods.
> -
> -    setVmState(vm, Vm::STATE_STOPPED)
> -  rescue Exception => error
> -    setVmState(vm, Vm::STATE_CREATE_FAILED)
> -    raise "Unable to create system: #{error.message}"
> -  end
> -end
> -
> -def shut_or_destroy_vm(task, which)
> -  # here, we are given an id for a VM to shutdown; we have to lookup which
> -  # physical host it is running on
> -
> -  vm = findVM(task)
> -
> -  if vm.state == Vm::STATE_STOPPED
> -    # the VM is already shutdown; just return success
> -    setVmShutdown(vm)
> -    return
> -  elsif vm.state == Vm::STATE_SUSPENDED
> -    raise "Cannot shutdown suspended domain"
> -  elsif vm.state == Vm::STATE_SAVED
> -    raise "Cannot shutdown saved domain"
> -  end
> -
> -  vm_orig_state = vm.state
> -  setVmState(vm, Vm::STATE_STOPPING)
> -
> -  begin
> -    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
> -    dom = conn.lookup_domain_by_uuid(vm.uuid)
> -    dom.send(which)
> -
> -    begin
> -      dom.undefine
> -    rescue
> -      # undefine can fail, for instance, if we live migrated from A -> B, and
> -      # then we are shutting down the VM on B (because it only has "transient"
> -      # XML).  Therefore, just ignore undefine errors so we do the rest
> -      # FIXME: we really should have a marker in the database somehow so that
> -      # we can tell if this domain was migrated; that way, we can tell the
> -      # difference between a real undefine failure and one because of migration
> -    end
> -
> -    teardown_storage_pools(conn)
> -
> -    conn.close
> -  rescue => ex
> -    setVmState(vm, vm_orig_state)
> -    raise ex
> -  end
> -
> -  setVmShutdown(vm)
> -end
> -
> -def shutdown_vm(task)
> -  puts "shutdown_vm"
> -  shut_or_destroy_vm(task, "shutdown")
> -end
> -
> -def poweroff_vm(task)
> -  puts "poweroff_vm"
> -  shut_or_destroy_vm(task, "destroy")
> -end
> -
> -def start_vm(task)
> -  puts "start_vm"
> -
> -  # here, we are given an id for a VM to start
> -
> -  vm = findVM(task, false)
> -
> -  if vm.state == Vm::STATE_RUNNING
> -    # the VM is already running; just return success
> -    return
> -  elsif vm.state == Vm::STATE_SUSPENDED
> -    raise "Cannot start suspended domain"
> -  elsif vm.state == Vm::STATE_SAVED
> -    raise "Cannot start saved domain"
> -  end
> -
> -  # FIXME: Validate that the VM is still within quota
> -
> -  vm_orig_state = vm.state
> -  setVmState(vm, Vm::STATE_STARTING)
> -
> -  begin
> -    if vm.host_id != nil
> -      # OK, marked in the database as already running on a host; for now, we
> -      # will just fail the operation
> -
> -      # FIXME: we probably want to go out to the host it is marked on and check
> -      # things out, just to make sure things are consistent
> -      raise "VM already running"
> -    end
> -
> -    # OK, now that we found the VM, go looking in the hardware_pool
> -    # hosts to see if there is a host that will fit these constraints
> -    host = findHostSLA(vm)
> -
> -    # if we're booting from a CDROM the VM is an image,
> -    # then we need to add the NFS mount as a storage volume for this
> -    # boot
> -    #
> -    if (vm.boot_device == Vm::BOOT_DEV_CDROM) && vm.uses_cobbler? && (vm.cobbler_type == Vm::IMAGE_PREFIX)
> -      details = Cobbler::Image.find_one(vm.cobbler_name)
> -
> -      raise "Image #{vm.cobbler_name} not found in Cobbler server" unless details
> -
> -      # extract the components of the image filename
> -      image_uri = details.file
> -      protocol = auth = ip_addr = export_path = filename = ""
> -
> -      protocol, image_uri = image_uri.split("://") if image_uri.include?("://")
> -      auth, image_uri = image_uri.split("@") if image_uri.include?("@")
> -      # it's ugly, but string.split returns an empty string as the first
> -      # result here, so we'll just ignore it
> -      ignored, ip_addr, image_uri =
> -	image_uri.split(/^([^\/]+)(\/.*)/) unless image_uri =~ /^\//
> -      ignored, export_path, filename =
> -	image_uri.split(/^(.*)\/(.+)/)
> -
> -      found = false
> -
> -      vm.storage_volumes.each do |volume|
> -        if volume.filename == filename
> -          if (volume.storage_pool.ip_addr == ip_addr) &&
> -          (volume.storage_pool.export_path == export_path)
> -            found = true
> -          end
> -        end
> -      end
> -
> -      unless found
> -        # Create a new transient NFS storage volume
> -        # This volume is *not* persisted.
> -        image_volume = StorageVolume.factory("NFS",
> -          :filename => filename
> -        )
> -
> -        image_volume.storage_pool
> -        image_pool = StoragePool.factory(StoragePool::NFS)
> -
> -        image_pool.ip_addr = ip_addr
> -        image_pool.export_path = export_path
> -        image_pool.storage_volumes << image_volume
> -        image_volume.storage_pool = image_pool
> -      end
> -    end
> -
> -    volumes = []
> -    volumes += vm.storage_volumes
> -    volumes << image_volume if image_volume
> -
> -    conn = Libvirt::open("qemu+tcp://" + host.hostname + "/system")
> -
> -    begin
> -      storagedevs = connect_storage_pools(conn, volumes)
> -
> -      dom = nil
> -      begin
> -        # FIXME: get rid of the hardcoded bridge
> -        xml = create_vm_xml(vm.description, vm.uuid, vm.memory_allocated,
> -                            vm.memory_used, vm.num_vcpus_allocated,
> -                            vm.boot_device, vm.vnic_mac_addr, "ovirtbr0",
> -                            storagedevs)
> -        dom = conn.define_domain_xml(xml.to_s)
> -        dom.create
> -
> -        setVmVncPort(vm, dom)
> -      rescue
> -        if dom != nil
> -          dom.undefine
> -        end
> -        teardown_storage_pools(conn)
> -        raise ex
> -      end
> -    ensure
> -      conn.close
> -    end
> -  rescue => ex
> -    setVmState(vm, vm_orig_state)
> -    raise ex
> -  end
> -
> -  vm.host_id = host.id
> -  vm.state = Vm::STATE_RUNNING
> -  vm.memory_used = vm.memory_allocated
> -  vm.num_vcpus_used = vm.num_vcpus_allocated
> -  vm.boot_device = Vm::BOOT_DEV_HD
> -  vm.save!
> -end
> -
> -def save_vm(task)
> -  puts "save_vm"
> -
> -  # here, we are given an id for a VM to suspend
> -
> -  vm = findVM(task)
> -
> -  if vm.state == Vm::STATE_SAVED
> -    # the VM is already saved; just return success
> -    return
> -  elsif vm.state == Vm::STATE_SUSPENDED
> -    raise "Cannot save suspended domain"
> -  elsif vm.state == Vm::STATE_STOPPED
> -    raise "Cannot save shutdown domain"
> -  end
> -
> -  vm_orig_state = vm.state
> -  setVmState(vm, Vm::STATE_SAVING)
> -
> -  begin
> -    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
> -    dom = conn.lookup_domain_by_uuid(vm.uuid)
> -    dom.save("/tmp/" + vm.uuid + ".save")
> -    conn.close
> -  rescue => ex
> -    setVmState(vm, vm_orig_state)
> -    raise ex
> -  end
> -
> -  # note that we do *not* reset the host_id here, since we stored the saved
> -  # vm state information locally.  restore_vm will pick it up from here
> -
> -  # FIXME: it would be much nicer to be able to save the VM and remove the
> -  # the host_id and undefine the XML; that way we could resume it on another
> -  # host later.  This can be done once we have the storage APIs, but it will
> -  # need more work
> -
> -  setVmState(vm, Vm::STATE_SAVED)
> -end
> -
> -def restore_vm(task)
> -  puts "restore_vm"
> -
> -  # here, we are given an id for a VM to start
> -
> -  vm = findVM(task)
> -
> -  if vm.state == Vm::STATE_RUNNING
> -    # the VM is already saved; just return success
> -    return
> -  elsif vm.state == Vm::STATE_SUSPENDED
> -    raise "Cannot restore suspended domain"
> -  elsif vm.state == Vm::STATE_STOPPED
> -    raise "Cannot restore shutdown domain"
> -  end
> -
> -  vm_orig_state = vm.state
> -  setVmState(vm, Vm::STATE_RESTORING)
> -
> -  begin
> -    # FIXME: we should probably go out to the host and check what it thinks
> -    # the state is
> -
> -    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
> -    dom = conn.lookup_domain_by_uuid(vm.uuid)
> -    dom.restore
> -
> -    setVmVncPort(vm, dom)
> -
> -    conn.close
> -  rescue => ex
> -    setVmState(vm, vm_orig_state)
> -    raise ex
> -  end
> -
> -  setVmState(vm, Vm::STATE_RUNNING)
> -end
> -
> -def suspend_vm(task)
> -  puts "suspend_vm"
> -
> -  # here, we are given an id for a VM to suspend; we have to lookup which
> -  # physical host it is running on
> -
> -  vm = findVM(task)
> -
> -  if vm.state == Vm::STATE_SUSPENDED
> -    # the VM is already suspended; just return success
> -    return
> -  elsif vm.state == Vm::STATE_STOPPED
> -    raise "Cannot suspend stopped domain"
> -  elsif vm.state == Vm::STATE_SAVED
> -    raise "Cannot suspend saved domain"
> -  end
> -
> -  vm_orig_state = vm.state
> -  setVmState(vm, Vm::STATE_SUSPENDING)
> -
> -  begin
> -    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
> -    dom = conn.lookup_domain_by_uuid(vm.uuid)
> -    dom.suspend
> -    conn.close
> -  rescue => ex
> -    setVmState(vm, vm_orig_state)
> -    raise ex
> -  end
> -
> -  # note that we do *not* reset the host_id here, since we just suspended the VM
> -  # resume_vm will pick it up from here
> -
> -  setVmState(vm, Vm::STATE_SUSPENDED)
> -end
> -
> -def resume_vm(task)
> -  puts "resume_vm"
> -
> -  # here, we are given an id for a VM to start
> -
> -  vm = findVM(task)
> -
> -  # OK, marked in the database as already running on a host; let's check it
> -
> -  if vm.state == Vm::STATE_RUNNING
> -    # the VM is already suspended; just return success
> -    return
> -  elsif vm.state == Vm::STATE_STOPPED
> -    raise "Cannot resume stopped domain"
> -  elsif vm.state == Vm::STATE_SAVED
> -    raise "Cannot resume suspended domain"
> -  end
> -
> -  vm_orig_state = vm.state
> -  setVmState(vm, Vm::STATE_RESUMING)
> -
> -  begin
> -    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
> -    dom = conn.lookup_domain_by_uuid(vm.uuid)
> -    dom.resume
> -    conn.close
> -  rescue => ex
> -    setVmState(vm, vm_orig_state)
> -    raise ex
> -  end
> -
> -  setVmState(vm, Vm::STATE_RUNNING)
> -end
> -
> -def update_state_vm(task)
> -  puts "update_state_vm"
> -
> -  # NOTE: findVM() will only return a vm if all the host information is filled
> -  # in.  So if a vm that we thought was stopped is running, this returns nil
> -  # and we don't update any information about it.  The tricky part
> -  # is that we're still not sure what to do in this case :).  - Ian
> -  #
> -  # Actually for migration it is necessary that it be able to update
> -  # the host and state of the VM once it is migrated.
> -  vm = findVM(task, false)
> -  new_vm_state, host_id_str = task.args.split(",")
> -  if (vm.host_id == nil) and host_id_str
> -    vm.host_id = host_id_str.to_i
> -  end
> -
> -
> -  vm_effective_state = Vm::EFFECTIVE_STATE[vm.state]
> -  task_effective_state = Vm::EFFECTIVE_STATE[new_vm_state]
> -
> -  if vm_effective_state != task_effective_state
> -    vm.state = new_vm_state
> -
> -    if task_effective_state == Vm::STATE_STOPPED
> -      setVmShutdown(vm)
> -    end
> -    vm.save!
> -    puts "Updated state to " + new_vm_state
> -  end
> -end
> -
> -def migrate(vm, dest = nil)
> -  if vm.state == Vm::STATE_STOPPED
> -    raise "Cannot migrate stopped domain"
> -  elsif vm.state == Vm::STATE_SUSPENDED
> -    raise "Cannot migrate suspended domain"
> -  elsif vm.state == Vm::STATE_SAVED
> -    raise "Cannot migrate saved domain"
> -  end
> -
> -  vm_orig_state = vm.state
> -  setVmState(vm, Vm::STATE_MIGRATING)
> -
> -  begin
> -    src_host = findHost(vm.host_id)
> -    unless dest.nil? or dest.empty?
> -      if dest.to_i == vm.host_id
> -        raise "Cannot migrate from host " + src_host.hostname + " to itself!"
> -      end
> -      dst_host = findHost(dest.to_i)
> -    else
> -      dst_host = findHostSLA(vm)
> -    end
> -
> -    src_conn = Libvirt::open("qemu+tcp://" + src_host.hostname + "/system")
> -    dst_conn = Libvirt::open("qemu+tcp://" + dst_host.hostname + "/system")
> -
> -    connect_storage_pools(dst_conn, vm)
> -
> -    dom = src_conn.lookup_domain_by_uuid(vm.uuid)
> -    dom.migrate(dst_conn, Libvirt::Domain::MIGRATE_LIVE)
> -
> -    # if we didn't raise an exception, then the migration was successful.  We
> -    # still have a pointer to the now-shutdown domain on the source side, so
> -    # undefine it
> -    begin
> -      dom.undefine
> -    rescue
> -      # undefine can fail, for instance, if we live migrated from A -> B, and
> -      # then we are shutting down the VM on B (because it only has "transient"
> -      # XML).  Therefore, just ignore undefine errors so we do the rest
> -      # FIXME: we really should have a marker in the database somehow so that
> -      # we can tell if this domain was migrated; that way, we can tell the
> -      # difference between a real undefine failure and one because of migration
> -    end
> -
> -    teardown_storage_pools(src_conn)
> -    dst_conn.close
> -    src_conn.close
> -  rescue => ex
> -    # FIXME: ug.  We may have open connections that we need to close; not
> -    # sure how to handle that
> -    setVmState(vm, vm_orig_state)
> -    raise ex
> -  end
> -
> -  setVmState(vm, Vm::STATE_RUNNING)
> -  vm.host_id = dst_host.id
> -  vm.save!
> -end
> -
> -def migrate_vm(task)
> -  puts "migrate_vm"
> -
> -  # here, we are given an id for a VM to migrate; we have to lookup which
> -  # physical host it is running on
> -
> -  vm = findVM(task)
> -
> -  migrate(vm, task.args)
> -end
> diff --git a/src/task-omatic/taskomatic.rb b/src/task-omatic/taskomatic.rb
> index ce37058..3eefc5a 100755
> --- a/src/task-omatic/taskomatic.rb
> +++ b/src/task-omatic/taskomatic.rb
> @@ -1,7 +1,7 @@
>  #!/usr/bin/ruby
> -# 
> +#
>  # Copyright (C) 2008 Red Hat, Inc.
> -# Written by Chris Lalancette <clalance at redhat.com>
> +# Written by Chris Lalancette <clalance at redhat.com> and Ian Main <imain at redhat.com>
>  #
>  # This program is free software; you can redistribute it and/or modify
>  # it under the terms of the GNU General Public License as published by
> @@ -22,122 +22,765 @@ $: << File.join(File.dirname(__FILE__), "../dutils")
>  $: << File.join(File.dirname(__FILE__), ".")
>  
>  require 'rubygems'
> +require "qpid"

Nit; use single-quotes for consistency

> +require 'monitor'
> +require 'dutils'
>  require 'optparse'
>  require 'daemons'
>  include Daemonize
>  
> -$logfile = '/var/log/ovirt-server/taskomatic.log'
> +require 'task_vm'
> +require 'task_storage'
> +
> +class TaskOmatic
> +
> +  include MonitorMixin
> +
> +  $logfile = '/var/log/ovirt-server/taskomatic.log'
> +
> +  def initialize()
> +    super()
> +
> +    @sleeptime = 5
> +    @nth_host = 0
> +
> +    @session = Qpid::Qmf::Session.new()
> +    # FIXME: Should come from some kind of config or DNS SRV or what have you.
> +    @broker = @session.add_broker("amqp://localhost:5672")
> +
> +    do_daemon = true
> +
> +    opts = OptionParser.new do |opts|
> +      opts.on("-h", "--help", "Print help message") do
> +        puts opts
> +        exit
> +      end
> +      opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
> +        do_daemon = false
> +      end
> +      opts.on("-s N", Integer, "--sleep", "Seconds to sleep between iterations (default is 5 seconds)") do |s|
> +        sleeptime = s
> +      end
> +    end
> +    begin
> +      opts.parse!(ARGV)
> +    rescue OptionParser::InvalidOption
> +      puts opts
> +      exit
> +    end
>  
> -do_daemon = true
> -sleeptime = 5
> -opts = OptionParser.new do |opts|
> -  opts.on("-h", "--help", "Print help message") do
> -    puts opts
> -    exit
> +    if do_daemon
> +      # XXX: This gets around a problem with paths for the database stuff.
> +      # Normally daemonize would chdir to / but the paths for the database
> +      # stuff are relative so it breaks it.. It's either this or rearrange
> +      # things so the db stuff is included after daemonizing.

Ugly, but fine.  Really, just remove the XXX and just leave the comment in
place; the code needs to be like this.

> +      pwd = Dir.pwd
> +      daemonize
> +      Dir.chdir(pwd)
> +      lf = open($logfile, 'a')
> +      $stdout = lf
> +      $stderr = lf
> +    end
>    end
> -  opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
> -    do_daemon = !n
> +
> +  def find_capable_host(db_vm)
> +    possible_hosts = []
> +
> +    vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
> +
> +    db_vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr|
> +      # Now each of 'curr' is in the right hardware pool.. now we check them out.
> +
> +      node = @session.object(:class => "node", 'hostname' => curr.hostname)
> +      next unless node
> +
> +      # So now we expect if the node was found it's alive and well, then we check
> +      # to make sure there's enough real cores for the number of vcpus, the node
> +      # memory is adequate, the node is not disabled in the database, and if the
> +      # node id is nil or if it is already running (has a node id set) then it
> +      # is probably looking to migrate so we find a node that is not the current
> +      # node.
> +      #
> +      # In the future we could add load or similar checks here.
> +
> +      #puts "checking node, #{node.cores} >= #{db_vm.num_vcpus_allocated},"
> +      #puts "and #{node.memory} >= #{db_vm.memory_allocated}"
> +      #puts "and not #{curr.is_disabled.nil?} and #{curr.is_disabled == 0}"
> +      #puts "and #{vm ? vm : 'nil'} or #{vm ? vm.active : 'nil'}) or #{vm ? vm.node : 'nil'} != #{node.object_id}"
> +
> +      if node and node.cores >= db_vm.num_vcpus_allocated \
> +         and node.memory >= db_vm.memory_allocated \
> +         and not curr.is_disabled.nil? and curr.is_disabled == 0 \
> +         and ((!vm or vm.active == 'false') or vm.node != node.object_id)
> +        possible_hosts.push(curr)
> +      end
> +    end
> +
> +    #puts "possible_hosts.length = #{possible_hosts.length}"
> +    if possible_hosts.length == 0
> +      # we couldn't find a host that matches this criteria
> +      raise "No host matching VM parameters could be found"
> +    end
> +
> +    # XXX: Right now we're just picking the nth host, we could also look at
> +    # how many vms are already on it, or the load of the hosts etc.
> +    host = possible_hosts[@nth_host % possible_hosts.length]
> +    @nth_host += 1
> +
> +    return host
>    end
> -  opts.on("-s N", Integer, "--sleep", "Seconds to sleep between iterations (default is 5 seconds)") do |s|
> -    sleeptime = s
> +
> +  def connect_storage_pools(node, storage_volumes)
> +    storagedevs = []
> +    storage_volumes.each do |db_volume|
> +      # here, we need to iterate through each volume and possibly attach it
> +      # to the host we are going to be using
> +      db_pool = db_volume.storage_pool
> +      if db_pool == nil
> +        # Hum.  Specified by the VM description, but not in the storage pool?
> +        # continue on and hope for the best
> +        puts "Couldn't find pool for volume #{db_volume.path}; skipping"
> +        next
> +      end
> +
> +      # we have to special case LVM pools.  In that case, we need to first
> +      # activate the underlying physical device, and then do the logical one
> +      if db_volume[:type] == "LvmStorageVolume"
> +        phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
> +        phys_libvirt_pool.connect(@session, node)
> +      end
> +
> +      libvirt_pool = LibvirtPool.factory(db_pool)
> +      libvirt_pool.connect(@session, node)
> +
> +      # OK, the pool should be all set.  The last thing we need to do is get
> +      # the path based on the volume name
> +
> +      volume_name = db_volume.read_attribute(db_volume.volume_name)
> +      pool = libvirt_pool.remote_pool
> +      volume = @session.object(:class => 'volume',
> +                               'name' => volume_name,
> +                               'storagePool' => pool.object_id)
> +      raise "Unable to find volume #{volume_name} attached to pool #{pool.name}." unless volume
> +      storagedevs << volume.path
> +    end
> +
> +    return storagedevs
>    end
> -end
> -begin
> -  opts.parse!(ARGV)
> -rescue OptionParser::InvalidOption
> -  puts opts
> -  exit
> -end
>  
> -if do_daemon
> -  daemonize
> -  STDOUT.reopen $logfile, 'a'
> -  STDERR.reopen STDOUT
> -end
> +  def task_create_vm(task)
> +    # XXX: This is mostly just a place holder.
> +    vm = find_vm(task, false)
> +    if vm.state != Vm::STATE_PENDING
> +      raise "VM not pending"
> +    end
> +    vm.state = Vm::STATE_STOPPED
> +    vm.save!
> +  end
>  
> -begin
> -  require 'dutils'
> -rescue => ex
> -  puts "dutils require failed! #{ex.class}: #{ex.message}"
> -end
> +  def teardown_storage_pools(node)
>  
> -require 'task_vm'
> -require 'task_storage'
> -require 'task_host'
> -
> -loop do
> -  tasks = Array.new
> -  begin
> -    tasks = Task.find(:all, :conditions => [ "state = ?", Task::STATE_QUEUED ])
> -  rescue => ex
> -    puts "1 #{ex.class}: #{ex.message}"
> -    if Task.connected?
> -      begin
> -        ActiveRecord::Base.connection.reconnect!
> -      rescue => norecon
> -        puts "2 #{norecon.class}: #{norecon.message}"
> +    # This is rather silly because we only destroy pools if there are no
> +    # more vms on the node.  We should be reference counting the pools
> +    # somehow so we know when they are no longer in use.
> +    vms = @session.objects(:class => 'domain', 'node' => node.object_id)
> +    if vms.length > 0
> +      return
> +    end
> +    pools = @session.objects(:class => 'pool', 'node' => node.object_id)
> +
> +    # FIXME: I think we should be destroying/undefining logical volumes first.

Yes, 100% correct.  You must fix this for LVM pools to work properly, otherwise
it will fail utterly.

> +    pools.each do |pool|
> +      result = pool.destroy
> +      result = pool.undefine
> +    end
> +  end
> +
> +
> +  def task_shutdown_or_destroy_vm(task, action)
> +    db_vm = task.vm
> +    vm = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
> +    if !vm
> +      puts "VM already shut down?"
> +      return
> +    end
> +
> +    node = @session.object(:object_id => vm.node)
> +    raise "Unable to get node that vm is on??" unless node
> +
> +    if vm.state == "shutdown" or vm.state == "shutoff"
> +      set_vm_shut_down(db_vm)
> +      return
> +    elsif vm.state == "suspended"
> +      raise "Cannot shutdown suspended domain"
> +    elsif vm.state == "saved"
> +      raise "Cannot shutdown saved domain"
> +    end
> +
> +    if action == :shutdown
> +      result = vm.shutdown
> +      raise "Error shutting down VM: #{result.text}" unless result.status == 0
> +    elsif action == :destroy
> +      result = vm.destroy
> +      raise "Error destroying VM: #{result.text}" unless result.status == 0
> +    end
> +
> +    # undefine can fail, for instance, if we live migrated from A -> B, and
> +    # then we are shutting down the VM on B (because it only has "transient"
> +    # XML).  Therefore, just ignore undefine errors so we do the rest
> +    # FIXME: we really should have a marker in the database somehow so that
> +    # we can tell if this domain was migrated; that way, we can tell the
> +    # difference between a real undefine failure and one because of migration
> +    result = vm.undefine
> +    puts "Error undefining VM: #{result.text}" unless result.status == 0

How are you handling the above comment with this code?

> +
> +    teardown_storage_pools(node)
> +
> +    set_vm_shut_down(db_vm)
> +  end
> +
> +  def task_start_vm(task)
> +    db_vm = find_vm(task, false)
> +
> +    # XXX: Kinda silly?  I dunno about these intermediate states..
> +    set_vm_state(db_vm, Vm::STATE_STARTING)
> +
> +    vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
> +
> +    if vm
> +      case vm.state
> +        when "running"
> +          return
> +        when "blocked"
> +          raise "Virtual machine state is blocked, cannot start VM."
> +        when "paused"
> +          raise "Virtual machine is currently paused, cannot start, must resume."
>        end
> -    else
> -      begin
> -        database_connect
> -      rescue => ex
> -        puts "3 #{ex.class}: #{ex.message}"
> +    end
> +    # FIXME: There's a bug here in that a host that's already running the vm won't be
> +    # returned.  I think that's supposed to be for migration but it just breaks stuff.
> +    db_host = find_capable_host(db_vm)

I'm not quite sure what this FIXME means.  If we are here in task_start_vm, and
the vm is already running, then we have some sort of logical error.  Now, we
should probably be robust to that error, but I think overloading
find_capable_host() for that is the wrong way to go; we should just have another
set of checks before this.  Oh, and it doesn't have anything to do with
migration, as far as I can remember.

> +
> +    node = @session.object(:class => "node", 'hostname' => db_host.hostname)
> +
> +    raise "Unable to find host #{db_host.hostname} to create VM on." unless node
> +
> +    if (db_vm.boot_device == Vm::BOOT_DEV_CDROM) && db_vm.uses_cobbler? && (db_vm.cobbler_type == Vm::IMAGE_PREFIX)

I know you didn't put this here, but since you are moving things around...
It would seem like a good idea to make a helper method with the Cobbler stuff in
it.  I think that just makes it look cleaner, as the details in getting the
Cobbler stuff isn't really interesting when you are reading the start_vm task.

> +
> +      details = Cobbler::Image.find_one(db_vm.cobbler_name)
> +      raise "Image #{vm.cobbler_name} not found in Cobbler server" unless details
> +
> +      # extract the components of the image filename
> +      image_uri = details.file
> +      protocol = auth = ip_addr = export_path = filename = ""
> +
> +      protocol, image_uri = image_uri.split("://") if image_uri.include?("://")
> +      auth, image_uri = image_uri.split("@") if image_uri.include?("@")
> +      # it's ugly, but string.split returns an empty string as the first
> +      # result here, so we'll just ignore it
> +      ignored, ip_addr, image_uri =
> +          image_uri.split(/^([^\/]+)(\/.*)/) unless image_uri =~ /^\//
> +      ignored, export_path, filename =
> +          image_uri.split(/^(.*)\/(.+)/)
> +
> +      found = false
> +
> +      db_vm.storage_volumes.each do |volume|
> +        if volume.filename == filename
> +          if (volume.storage_pool.ip_addr == ip_addr) &&
> +            (volume.storage_pool.export_path == export_path)
> +            found = true
> +          end
> +        end
> +      end
> +
> +      unless found
> +        # Create a new transient NFS storage volume
> +        # This volume is *not* persisted.
> +        image_volume = StorageVolume.factory("NFS", :filename => filename)
> +
> +        image_volume.storage_pool
> +        image_pool = StoragePool.factory(StoragePool::NFS)
> +
> +        image_pool.ip_addr = ip_addr
> +        image_pool.export_path = export_path
> +        image_pool.storage_volumes << image_volume
> +        image_volume.storage_pool = image_pool
>        end
>      end
> +
> +    # FIXME: I know this part is broken..
> +    #
> +    # hrrm, who wrote this comment and why is it broken?  - Ian

Good question, I have no idea, I didn't write it.  Darryl?

> +    volumes = []
> +    volumes += db_vm.storage_volumes
> +    volumes << image_volume if image_volume
> +    storagedevs = connect_storage_pools(node, volumes)
> +
> +    # FIXME: get rid of the hardcoded bridge
> +    xml = create_vm_xml(db_vm.description, db_vm.uuid, db_vm.memory_allocated,
> +              db_vm.memory_used, db_vm.num_vcpus_allocated, db_vm.boot_device,
> +              db_vm.vnic_mac_addr, "ovirtbr0", storagedevs)
> +
> +    result = node.domainDefineXML(xml.to_s)
> +    raise "Error defining virtual machine: #{result.text}" unless result.status == 0
> +
> +    domain = @session.object(:object_id => result.domain)
> +    raise "Cannot find domain on host #{db_host.hostname}, cannot start virtual machine." unless domain
> +
> +    result = domain.create
> +    if result.status != 0
> +      domain.undefine
> +      raise "Error creating virtual machine: #{result.text}"
> +    end
> +
> +    result = domain.getXMLDesc
> +
> +    # Reget the db record or you can get 'dirty' errors.

Out of curiosity, why?  I'm not opposed to it, but this makes me think that
there might be a problem lurking here.

> +    db_vm = find_vm(task, false)
> +    set_vm_vnc_port(db_vm, result.description) unless result.status != 0
> +
> +    # XXX: This information is not available via the libvirt interface.
> +    db_vm.memory_used = db_vm.memory_allocated
> +    db_vm.boot_device = Vm::BOOT_DEV_HD
> +    db_vm.host_id = db_host.id
> +
> +    # We write the new state here even though dbomatic will set it soon anyway.
> +    # This is just to let the UI know that it's good to go right away and really
> +    # dbomatic will just write the same thing over top of it soon enough.
> +    db_vm.state = Vm::STATE_RUNNING

Yep, good idea.

> +    db_vm.save!
> +  end
> +
> +  def task_suspend_vm(task)
> +    db_vm = task.vm
> +    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
> +    raise "Unable to locate VM to suspend" unless dom
> +
> +    if dom.state == "shutdown" or dom.state == "shutoff"
> +      raise "Cannot suspend stopped domain"
> +    elsif dom.state == "paused"
> +      raise "Cannot suspend saved domain"
> +    end

Actually, the original way I did this is kind of dumb.  We should probably just do:

if dom.state != "running" and dom.state != "blocked"
     raise "Cannot suspend domain in state #{dom.state}"
end

> +
> +    result = dom.suspend
> +    raise "Error suspending VM: #{result.text}" unless result.status == 0
> +
> +    db_vm.state = Vm::STATE_SUSPENDED
> +    db_vm.save!
>    end
> -  tasks.each do |task|
> -    # make sure we get our credentials up-front
> -    get_credentials
>  
> -    task.time_started = Time.now
> -    task.state = Task::STATE_RUNNING
> -    task.save!
> +  def task_resume_vm(task)
> +    db_vm = task.vm
> +    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
> +    raise "Unable to locate VM to resume" unless dom
> +
> +    if dom.state == "running"
> +      # the VM is already suspended; just return success
> +      return
> +    elsif dom.state == "shutoff" or dom.state == "shutdown"
> +      raise "Cannot resume stopped domain"
> +    elsif dom.state == "blocked"
> +      raise "Cannot resume suspended domain"
> +    end

This logic is actually wrong (blocked == running for most practical purposes),
but again we should just redo the logic as:

if dom.state != "paused"
     raise "Cannot suspend domain in state #{dom.state}"
end

> +
> +    result = dom.resume
> +    raise "Error resuming VM: #{result.text}" unless result.status == 0
> +
> +    db_vm.state = Vm::STATE_RUNNING
> +    db_vm.save!
> +  end
> +
> +  def task_save_vm(task)

Put a comment here saying that this functionality is completely broken; that way
ruby-lint (i.e. meyering) won't waste time cleaning this code up.  Once we have
a "save pool" defined, we can fix this whole thing up and remove the comment.

> +    db_vm = task.vm
> +    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
> +    raise "Unable to locate VM to save" unless dom
> +
> +    #XXX: I'm not checking states here. I want to see if libvirt gives back
> +    #decent error messages for different states.
> +    filename = "/tmp/#{dom.uuid}.save"
> +    puts "saving vm #{dom.name} to #{filename}"
> +    result = dom.save(filename)
> +    raise "Error saving VM: #{result.text}" unless result.status == 0
> +
> +    set_vm_state(db_vm, Vm::STATE_SAVED)
> +  end
> +
> +  def task_restore_vm(task)

Ditto.

> +    db_vm = task.vm
> +    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
> +    raise "Unable to locate VM to restore" unless dom
> +
> +    #XXX: I'm not checking states here. I want to see if libvirt gives back
> +    #decent error messages for different states.
> +
> +    filename = "/tmp/#{dom.uuid}.save"
> +    puts "restoring vm #{dom.name} from #{filename}"
> +    result = dom.restore("/tmp/" + dom.uuid + ".save")
> +    raise "Error restoring VM: #{result.text}" unless result.status == 0
> +
> +    set_vm_state(db_vm, Vm::STATE_RUNNING)
> +  end
> +
> +  def migrate(db_vm, dest = nil)
> +
> +    vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
> +    raise "Unable to find VM to migrate" unless vm
> +    src_node = @session.object(:object_id => vm.node)
> +    raise "Unable to find node that VM is on??" unless src_node
> +
> +    puts "Migrating domain lookup complete, domain is #{vm}"
> +
> +    case vm.state
> +      when "blocked"
> +        raise "Unable to migrate blocked VM."

Again, blocked is basically the same as running, so you don't want to switch
against it.

> +      when "paused"
> +        raise "Unable to migrate suspended VM."
> +    end

And I'm actually not sure if this works or not.  Quite possibly not, but I
haven't tested so I'm not sure.

> +
> +    vm_orig_state = db_vm.state
> +    set_vm_state(db_vm, Vm::STATE_MIGRATING)
>  
> -    state = Task::STATE_FINISHED
>      begin
> -      case task.action
> -      when VmTask::ACTION_CREATE_VM then create_vm(task)
> -      when VmTask::ACTION_SHUTDOWN_VM then shutdown_vm(task)
> -      when VmTask::ACTION_POWEROFF_VM then poweroff_vm(task)
> -      when VmTask::ACTION_START_VM then start_vm(task)
> -      when VmTask::ACTION_SUSPEND_VM then suspend_vm(task)
> -      when VmTask::ACTION_RESUME_VM then resume_vm(task)
> -      when VmTask::ACTION_SAVE_VM then save_vm(task)
> -      when VmTask::ACTION_RESTORE_VM then restore_vm(task)
> -      when VmTask::ACTION_UPDATE_STATE_VM then update_state_vm(task)
> -      when VmTask::ACTION_MIGRATE_VM then migrate_vm(task)
> -      when StorageTask::ACTION_REFRESH_POOL then refresh_pool(task)
> -      when StorageVolumeTask::ACTION_CREATE_VOLUME then create_volume(task)
> -      when StorageVolumeTask::ACTION_DELETE_VOLUME then delete_volume(task)
> -      when HostTask::ACTION_CLEAR_VMS then clear_vms_host(task)
> +      unless dest.nil? or dest.empty?
> +        if dest.to_i == db_vm.host_id
> +          raise "Cannot migrate from host " + src_node.hostname + " to itself!"
> +        end
> +        db_dst_host = find_host(dest.to_i)
>        else
> -        puts "unknown task " + task.action
> -        state = Task::STATE_FAILED
> -        task.message = "Unknown task type"
> +        db_dst_host = find_capable_host(db_vm)
>        end
> +
> +      dest_node = @session.object(:class => 'node', 'hostname' => db_dst_host.hostname)
> +      raise "Unable to find host #{db_dst_host.hostname} to migrate to." unless dest_node
> +
> +      volumes = []
> +      volumes += db_vm.storage_volumes
> +      connect_storage_pools(dest_node, volumes)
> +
> +      # Sadly migrate with qpid is broken because it requires a connection between
> +      # both nodes and currently that can't happen securely.  For now we do it
> +      # the old fashioned way..
> +      dst_uri = "qemu+tcp://#{dest_node.hostname}/system"
> +      src_uri = "qemu+tcp://#{src_node.hostname}/system"

You can remove these above two; I don't think they are needed for anything, and
it might be confusing for future readers.

> +      src_conn = Libvirt::open("qemu+tcp://" + src_node.hostname + "/system")
> +      dst_conn = Libvirt::open("qemu+tcp://" + dest_node.hostname + "/system")
> +      dom = src_conn.lookup_domain_by_uuid(vm.uuid)
> +      dom.migrate(dst_conn, Libvirt::Domain::MIGRATE_LIVE)
> +      src_conn.close
> +      dst_conn.close
> +
> +      # undefine can fail, for instance, if we live migrated from A -> B, and
> +      # then we are shutting down the VM on B (because it only has "transient"
> +      # XML).  Therefore, just ignore undefine errors so we do the rest
> +      # FIXME: we really should have a marker in the database somehow so that
> +      # we can tell if this domain was migrated; that way, we can tell the
> +      # difference between a real undefine failure and one because of migration
> +      result = vm.undefine
> +      puts "Error undefining old vm after migrate: #{result.text}" unless result.status == 0

Again, how are you handling the above comment with this code?

> +
> +      # See if we can take down storage pools on the src host.
> +      teardown_storage_pools(src_node)
>      rescue => ex
> -      puts "Task action processing failed: #{ex.class}: #{ex.message}"
> -      puts ex.backtrace
> -      state = Task::STATE_FAILED
> -      task.message = ex.message
> -    end
> -
> -    task.state = state
> -    task.time_ended = Time.now
> -    task.save!
> -    puts "done"
> -  end
> -
> -  # FIXME: here, we clean up "orphaned" tasks.  These are tasks that we had
> -  # to orphan (set task_target to nil) because we were deleting the object they
> -  # depended on.
> -  Task.find(:all, :conditions => [ "task_target_id IS NULL and task_target_type IS NULL" ]).each do |task|
> -    task.destroy
> -  end
> -  
> -  # we could destroy credentials, but another process might be using them (in
> -  # particular, host-browser).  Just leave them around, it shouldn't hurt
> -  
> -  STDOUT.flush
> -  sleep sleeptime
> +      puts "Error: #{ex}"
> +      set_vm_state(db_vm, vm_orig_state)
> +      raise ex
> +    end
> +
> +    db_vm.state = Vm::STATE_RUNNING
> +    db_vm.host_id = db_dst_host.id
> +    db_vm.save!
> +  end
> +
> +  def task_migrate_vm(task)
> +    puts "migrate_vm"
> +
> +    # here, we are given an id for a VM to migrate; we have to lookup which
> +    # physical host it is running on
> +    vm = find_vm(task)
> +    migrate(vm, task.args)
> +  end
> +
> +  def storage_find_suitable_host(hardware_pool)
> +    # find all of the hosts in the same pool as the storage
> +    hardware_pool.hosts.each do |host|
> +      puts "storage_find_suitable_host: host #{host.hostname} uuid #{host.uuid}"
> +      node = @session.object(:class => 'node', 'hostname' => host.hostname)

This isn't right; you need to use similar logic to what is in
"find_capable_host" to make sure that you don't pick a host that has been
disabled, etc.

> +      return node if node
> +    end
> +
> +    raise "Could not find a host within this storage pool to scan the storage server."
> +  end
> +
> +  def add_volumes_to_db(db_pool, libvirt_pool, owner = nil, group = nil, mode = nil)
> +    # FIXME: this is currently broken if you do something like:
> +    # 1.  Add an iscsi pool with 3 volumes (lun-1, lun-2, lun-3)
> +    # 2.  Scan it in
> +    # 3.  Remove lun-3 from the pool
> +    # 4.  Re-scan it
> +    # What will happen is that you will still have lun-3 available in the
> +    # database, even though it's not available in the pool anymore.  It's a
> +    # little tricky, though; we have to make sure that we don't pull the
> +    # database entry out from underneath a possibly running VM (or do we?)
> +    volumes = @session.objects(:class => 'volume', 'storagePool' => libvirt_pool.remote_pool.object_id)
> +    volumes.each do |volume|
> +      storage_volume = StorageVolume.factory(db_pool.get_type_label)
> +
> +      # NOTE: it is safe (and, in fact, necessary) to use
> +      # #{storage_volume.volume_name} here without sanitizing it.  This is
> +      # because this is *not* based on user modifiable data, but rather, on an
> +      # internal implementation detail
> +      existing_vol = StorageVolume.find(:first, :conditions =>
> +                        ["storage_pool_id = ? AND #{storage_volume.volume_name} = ?",
> +                        db_pool.id, volume.name])
> +
> +      # in this case, this path already exists in the database; just skip
> +      next if existing_vol
> +
> +      storage_volume = StorageVolume.factory(db_pool.get_type_label)
> +      storage_volume.path = volume.path
> +      storage_volume.size = volume.capacity / 1024
> +      storage_volume.storage_pool_id = db_pool.id
> +      storage_volume.write_attribute(storage_volume.volume_name, volume.name)
> +      storage_volume.lv_owner_perms = owner
> +      storage_volume.lv_group_perms = group
> +      storage_volume.lv_mode_perms = mode
> +      storage_volume.state = StorageVolume::STATE_AVAILABLE
> +      puts "saving storage volume to db."
> +      storage_volume.save!
> +    end
> +  end
> +
> +  # The words "pool" and "volume" are ridiculously overloaded in our context.
> +  # Therefore, the refresh_pool method adopts this convention:
> +  # db_pool_phys: The underlying physical storage pool, as it is represented in
> +  #         the database
> +  # phys_libvirt_pool: The underlying physical storage, as it is represented in
> +  #          libvirt
> +  # db_lvm_pool: The logical storage pool (if it exists), as it is represented
> +  #        in the database
> +  # lvm_libvirt_pool: The logical storage pool (if it exists), as it is
> +  #           represented in the database
> +
> +  def task_refresh_pool(task)
> +    puts "refresh_pool"
> +
> +    db_pool_phys = task.storage_pool
> +    raise "Could not find storage pool" unless db_pool_phys
> +
> +    node = storage_find_suitable_host(db_pool_phys.hardware_pool)
> +
> +    # FIXME: We may want to scan through all the LVM volumes available
> +    # and just update the database with allocation information.
> +    # However afaict right now libvirt provides no way for us to know
> +    # where an LVM pool/volume sits in terms of its physical pool/volume
> +    # so we're kinda screwed for now for updating the database.

This is true, except that this has to work.  This gets into the whole LVM
scanning thing, which I'll cover in my other "LVM Fun" response to you.

> +    #
> +    #   Ian
> +    begin
> +      phys_libvirt_pool = LibvirtPool.factory(db_pool_phys)
> +      phys_libvirt_pool.connect(@session, node)
> +
> +      begin
> +        # OK, the pool is all set.  Add in all of the volumes
> +        add_volumes_to_db(db_pool_phys, phys_libvirt_pool)
> +
> +        db_pool_phys.state = StoragePool::STATE_AVAILABLE
> +        db_pool_phys.save!
> +      end
> +    ensure
> +      phys_libvirt_pool.shutdown
> +    end
> +  end
> +
> +  def task_create_volume(task)
> +    puts "create_volume"
> +
> +    db_volume = task.storage_volume
> +    raise "Could not find storage volume to create" unless db_volume
> +
> +    db_pool = db_volume.storage_pool
> +    raise "Could not find storage pool" unless db_pool
> +
> +    node = storage_find_suitable_host(db_pool.hardware_pool)
> +
> +    begin
> +      if db_volume[:type] == "LvmStorageVolume"
> +        phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
> +        phys_libvirt_pool.connect(@session, node)
> +      end
> +
> +      begin
> +        libvirt_pool = LibvirtPool.factory(db_pool)
> +
> +        begin
> +          libvirt_pool.connect(@session, node)
> +          volume_id = libvirt_pool.create_vol(*db_volume.volume_create_params)
> +          volume = @session.object(:object_id => volume_id)
> +          raise "Unable to find newly created volume" unless volume
> +
> +          puts "  volume:"
> +          for (key, val) in volume.properties
> +            puts "    property: #{key}, #{val}"
> +          end
> +
> +          # FIXME: Should have this too I think..
> +          #db_volume.key = volume.key
> +          db_volume.path = volume.path
> +          db_volume.state = StorageVolume::STATE_AVAILABLE
> +          db_volume.save!
> +
> +          db_pool.state = StoragePool::STATE_AVAILABLE
> +          db_pool.save!
> +        ensure
> +          libvirt_pool.shutdown
> +        end
> +      ensure
> +        if db_volume[:type] == "LvmStorageVolume"
> +          phys_libvirt_pool.shutdown
> +        end
> +      end
> +    end
> +  end
> +
> +  def task_delete_volume(task)
> +    puts "delete_volume"
> +
> +    db_volume = task.storage_volume
> +    raise "Could not find storage volume to create" unless db_volume
> +
> +    db_pool = db_volume.storage_pool
> +    raise "Could not find storage pool" unless db_pool
> +
> +    node = storage_find_suitable_host(db_pool.hardware_pool)
> +
> +    begin
> +      if db_volume[:type] == "LvmStorageVolume"
> +        phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
> +        phys_libvirt_pool.connect(@session, node)
> +        puts "connected to lvm pool.."
> +      end
> +
> +      begin
> +        libvirt_pool = LibvirtPool.factory(db_pool)
> +        libvirt_pool.connect(@session, node)
> +
> +        begin
> +          volume = @session.object(:class => 'volume',
> +                                   'storagePool' => libvirt_pool.remote_pool.object_id,
> +                                   'path' => db_volume.path)
> +          puts "Unable to find volume to delete" unless volume
> +
> +          # FIXME: we actually probably want to zero out the whole volume here, so
> +          # we aren't potentially leaking data from one user to another.  There
> +          # are two problems, though:
> +          # 1)  I'm not sure how I would go about zero'ing the data on a remote
> +          # machine, since there is no "libvirt_write_data" call
> +          # 2)  This could potentially take quite a while, so we want to spawn
> +          # off another thread to do it
> +          result = volume.delete
> +          raise "Error deleting volume: #{result.text}" unless result.status == 0
> +
> +          # Note: we have to nil out the task_target because when we delete the
> +          # volume object, that also deletes all dependent tasks (including this
> +          # one), which leads to accessing stale tasks.  Orphan the task, then
> +          # delete the object; we can clean up orphans later (or not, depending
> +          # on the audit policy)
> +          task.task_target = nil
> +          task.save!
> +
> +          db_volume.destroy
> +        ensure
> +          libvirt_pool.shutdown
> +        end
> +      ensure
> +        if db_volume[:type] == "LvmStorageVolume"
> +          phys_libvirt_pool.shutdown
> +        end
> +      end
> +    end
> +  end
> +
> +  def task_clear_vms_host(task)
> +    src_host = task.host
> +
> +    src_host.vms.each do |vm|
> +      migrate(vm)
> +    end
> +  end
> +
> +  def mainloop()
> +    loop do
> +      tasks = Array.new
> +      begin
> +        tasks = Task.find(:all, :conditions => [ "state = ?", Task::STATE_QUEUED ])
> +      rescue => ex
> +        puts "1 #{ex.class}: #{ex.message}"
> +        if Task.connected?
> +          begin
> +            ActiveRecord::Base.connection.reconnect!
> +          rescue => norecon
> +            puts "2 #{norecon.class}: #{norecon.message}"
> +          end
> +        else
> +          begin
> +            database_connect
> +          rescue => ex
> +            puts "3 #{ex.class}: #{ex.message}"
> +          end
> +        end
> +      end
> +
> +      tasks.each do |task|
> +        # make sure we get our credentials up-front
> +        get_credentials
> +
> +        task.time_started = Time.now
> +
> +        state = Task::STATE_FINISHED
> +        begin
> +          case task.action
> +            when VmTask::ACTION_CREATE_VM then task_create_vm(task)
> +            when VmTask::ACTION_SHUTDOWN_VM then task_shutdown_or_destroy_vm(task, :shutdown)
> +            when VmTask::ACTION_POWEROFF_VM then task_shutdown_or_destroy_vm(task, :destroy)
> +            when VmTask::ACTION_START_VM then task_start_vm(task)
> +            when VmTask::ACTION_SUSPEND_VM then task_suspend_vm(task)
> +            when VmTask::ACTION_RESUME_VM then task_resume_vm(task)
> +            when VmTask::ACTION_SAVE_VM then task_save_vm(task)
> +            when VmTask::ACTION_RESTORE_VM then task_restore_vm(task)
> +            when VmTask::ACTION_MIGRATE_VM then task_migrate_vm(task)
> +            when StorageTask::ACTION_REFRESH_POOL then task_refresh_pool(task)
> +            when StorageVolumeTask::ACTION_CREATE_VOLUME then task_create_volume(task)
> +            when StorageVolumeTask::ACTION_DELETE_VOLUME then task_delete_volume(task)
> +            when HostTask::ACTION_CLEAR_VMS then task_clear_vms_host(task)
> +          else
> +            puts "unknown task " + task.action
> +            state = Task::STATE_FAILED
> +            task.message = "Unknown task type"
> +          end
> +        rescue => ex
> +          puts "Task action processing failed: #{ex.class}: #{ex.message}"
> +          puts ex.backtrace
> +          state = Task::STATE_FAILED
> +          task.message = ex.message
> +        end
> +
> +        task.state = state
> +        task.time_ended = Time.now
> +        task.save!
> +        puts "done"
> +      end
> +      # FIXME: here, we clean up "orphaned" tasks.  These are tasks that we had
> +      # to orphan (set task_target to nil) because we were deleting the object they
> +      # depended on.
> +      Task.find(:all, :conditions => [ "task_target_id IS NULL and task_target_type IS NULL" ]).each do |task|
> +        task.destroy
> +      end
> +      sleep(1)

You are ignoring the "sleeptime" command-line option here.  It's fine to default
it to 1, but you should let the user override it on the command-line.

> +    end
> +  end
>  end
> +
> +taskomatic = TaskOmatic.new()
> +taskomatic.mainloop()
> +
> diff --git a/src/task-omatic/utils.rb b/src/task-omatic/utils.rb
> deleted file mode 100644
> index e3005ed..0000000
> --- a/src/task-omatic/utils.rb
> +++ /dev/null
> @@ -1,221 +0,0 @@
> -require 'rexml/document'
> -include REXML
> -
> -def String.random_alphanumeric(size=16)
> -  s = ""
> -  size.times { s << (i = Kernel.rand(62); i += ((i < 10) ? 48 : ((i < 36) ? 55 : 61 ))).chr }
> -  s
> -end
> -
> -def all_storage_pools(conn)
> -  all_pools = conn.list_defined_storage_pools
> -  all_pools.concat(conn.list_storage_pools)
> -  return all_pools
> -end
> -
> -def get_libvirt_lvm_pool_from_volume(db_volume)
> -  phys_volume = StorageVolume.find(:first, :conditions =>
> -                                   [ "lvm_pool_id = ?", db_volume.storage_pool_id])
> -
> -  return LibvirtPool.factory(phys_volume.storage_pool)
> -end
> -
> -class LibvirtPool
> -  def initialize(type, name = nil)
> -    @remote_pool = nil
> -    @build_on_start = true
> -    @remote_pool_defined = false
> -    @remote_pool_started = false
> -
> -    if name == nil
> -      @name = type + "-" + String.random_alphanumeric
> -    else
> -      @name = name
> -    end
> -
> -    @xml = Document.new
> -    @xml.add_element("pool", {"type" => type})
> -
> -    @xml.root.add_element("name").add_text(@name)
> -
> -    @xml.root.add_element("source")
> -
> -    @xml.root.add_element("target")
> -    @xml.root.elements["target"].add_element("path")
> -  end
> -
> -  def connect(conn)
> -    all_storage_pools(conn).each do |remote_pool_name|
> -      tmppool = conn.lookup_storage_pool_by_name(remote_pool_name)
> -
> -      if self.xmlequal?(Document.new(tmppool.xml_desc).root)
> -        @remote_pool = tmppool
> -        break
> -      end
> -    end
> -
> -    if @remote_pool == nil
> -      @remote_pool = conn.define_storage_pool_xml(@xml.to_s)
> -      # we need this because we don't necessarily want to "build" LVM pools,
> -      # which might destroy existing data
> -      if @build_on_start
> -        @remote_pool.build
> -      end
> -      @remote_pool_defined = true
> -    end
> -
> -    if @remote_pool.info.state == Libvirt::StoragePool::INACTIVE
> -      # only try to start the pool if it is currently inactive; in all other
> -      # states, assume it is already running
> -      @remote_pool.create
> -      @remote_pool_started = true
> -    end
> -  end
> -
> -  def list_volumes
> -    return @remote_pool.list_volumes
> -  end
> -
> -  def lookup_vol_by_path(dev)
> -    return @remote_pool.lookup_volume_by_path(dev)
> -  end
> -
> -  def lookup_vol_by_name(name)
> -    return @remote_pool.lookup_volume_by_name(name)
> -  end
> -
> -  def create_vol(type, name, size, owner, group, mode)
> -    @vol_xml = Document.new
> -    @vol_xml.add_element("volume", {"type" => type})
> -    @vol_xml.root.add_element("name").add_text(name)
> -    @vol_xml.root.add_element("capacity", {"unit" => "K"}).add_text(size.to_s)
> -    @vol_xml.root.add_element("target")
> -    @vol_xml.root.elements["target"].add_element("permissions")
> -    @vol_xml.root.elements["target"].elements["permissions"].add_element("owner").add_text(owner)
> -    @vol_xml.root.elements["target"].elements["permissions"].add_element("group").add_text(group)
> -    @vol_xml.root.elements["target"].elements["permissions"].add_element("mode").add_text(mode)
> -  end
> -
> -  def shutdown
> -    if @remote_pool_started
> -      @remote_pool.destroy
> -    end
> -    if @remote_pool_defined
> -      @remote_pool.undefine
> -    end
> -  end
> -
> -  def xmlequal?(docroot)
> -    return false
> -  end
> -
> -  def self.factory(pool)
> -    if pool[:type] == "IscsiStoragePool"
> -      return IscsiLibvirtPool.new(pool.ip_addr, pool[:target])
> -    elsif pool[:type] == "NfsStoragePool"
> -      return NFSLibvirtPool.new(pool.ip_addr, pool.export_path)
> -    elsif pool[:type] == "LvmStoragePool"
> -      # OK, if this is LVM storage, there are two cases we need to care about:
> -      # 1) this is a LUN with LVM already on it.  In this case, all we need to
> -      #    do is to create a new LV (== libvirt volume), and be done with it
> -      # 2) this LUN is blank, so there is no LVM on it already.  In this
> -      #    case, we need to pvcreate, vgcreate first (== libvirt pool build),
> -      #    and *then* create the new LV (== libvirt volume) on top of that.
> -      #
> -      # We can tell the difference between an LVM Pool that exists and one
> -      # that needs to be created based on the value of the pool.state;
> -      # if it is PENDING_SETUP, we need to create it first
> -      phys_volume = StorageVolume.find(:first, :conditions =>
> -                                       [ "lvm_pool_id = ?", pool.id])
> -
> -      return LVMLibvirtPool.new(pool.vg_name, phys_volume.path,
> -                                pool.state == StoragePool::STATE_PENDING_SETUP)
> -    else
> -      raise "Unknown storage pool type " + pool[:type].to_s
> -    end
> -  end
> -end
> -
> -class IscsiLibvirtPool < LibvirtPool
> -  def initialize(ip_addr, target)
> -    super('iscsi')
> -
> -    @type = 'iscsi'
> -    @ipaddr = ip_addr
> -    @target = target
> -
> -    @xml.root.elements["source"].add_element("host", {"name" => @ipaddr})
> -    @xml.root.elements["source"].add_element("device", {"path" => @target})
> -
> -    @xml.root.elements["target"].elements["path"].text = "/dev/disk/by-id"
> -  end
> -
> -  def xmlequal?(docroot)
> -    return (docroot.attributes['type'] == @type and
> -            docroot.elements['source'].elements['host'].attributes['name'] == @ipaddr and
> -            docroot.elements['source'].elements['device'].attributes['path'] == @target)
> -  end
> -end
> -
> -class NFSLibvirtPool < LibvirtPool
> -  def initialize(ip_addr, export_path)
> -    super('netfs')
> -
> -    @type = 'netfs'
> -    @host = ip_addr
> -    @remote_path = export_path
> -    @name = String.random_alphanumeric
> -
> -    @xml.root.elements["source"].add_element("host", {"name" => @host})
> -    @xml.root.elements["source"].add_element("dir", {"path" => @remote_path})
> -    @xml.root.elements["source"].add_element("format", {"type" => "nfs"})
> -
> -    @xml.root.elements["target"].elements["path"].text = "/mnt/" + @name
> -  end
> -
> -  def create_vol(name, size, owner, group, mode)
> -    # FIXME: this can actually take some time to complete (since we aren't
> -    # doing sparse allocations at the moment).  During that time, whichever
> -    # libvirtd we chose to use is completely hung up.  The solution is 3-fold:
> -    # 1.  Allow sparse allocations in the WUI front-end
> -    # 2.  Make libvirtd multi-threaded
> -    # 3.  Make taskomatic multi-threaded
> -    super("netfs", name, size, owner, group, mode)
> -
> -    # FIXME: we have to add the format as raw here because of a bug in libvirt;
> -    # if you specify a volume with no format, it will crash libvirtd
> -    @vol_xml.root.elements["target"].add_element("format", {"type" => "raw"})
> -    @remote_pool.create_vol_xml(@vol_xml.to_s)
> -  end
> -
> -  def xmlequal?(docroot)
> -    return (docroot.attributes['type'] == @type and
> -            docroot.elements['source'].elements['host'].attributes['name'] == @host and
> -            docroot.elements['source'].elements['dir'].attributes['path'] == @remote_path)
> -  end
> -end
> -
> -class LVMLibvirtPool < LibvirtPool
> -  def initialize(vg_name, device, build_on_start)
> -    super('logical', vg_name)
> -
> -    @type = 'logical'
> -    @build_on_start = build_on_start
> -
> -    @xml.root.elements["source"].add_element("name").add_text(@name)
> -    @xml.root.elements["source"].add_element("device", {"path" => device})
> -
> -    @xml.root.elements["target"].elements["path"].text = "/dev/" + @name
> -  end
> -
> -  def create_vol(name, size, owner, group, mode)
> -    super("logical", name, size, owner, group, mode)
> -    @remote_pool.create_vol_xml(@vol_xml.to_s)
> -  end
> -
> -  def xmlequal?(docroot)
> -    return (docroot.attributes['type'] == @type and
> -            docroot.elements['name'].text == @name and
> -            docroot.elements['source'].elements['name'] == @name)
> -  end
> -end

General comments:
1.  Probably replace all of the XXX with FIXME, or vice-versa, so the whole
thing is consistent.  That way people looking for things to fix can just look
for one string (I prefer FIXME, but it's up to you).

2.  Try to make everything fit 80 columns again, just for ease of reading.

3.  Despite what I said about it when we talked earlier, I find this kind of
hard to read being all in one file.  The problem is that it is not clear which
methods are top-level methods (i.e. task_start_vm) and which one are helper
methods from a quick glance.  I think I would much prefer to have the helper
methods in a separate file and/or separate class, but I'm not sure how difficult
that would be.  Thoughts?

-- 
Chris Lalancette




More information about the ovirt-devel mailing list