extras-buildsys/server ArchJob.py, 1.31, 1.32 BuildMaster.py, 1.40, 1.41 Builder.py, 1.40, 1.41 BuilderManager.py, 1.24, 1.25 PackageJob.py, 1.49, 1.50 main.py, 1.21, 1.22

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Sun May 14 05:43:09 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv2964/server

Modified Files:
	ArchJob.py BuildMaster.py Builder.py BuilderManager.py 
	PackageJob.py main.py 
Log Message:
2006-05-14  Dan Williams  <dcbw at redhat.com>

    * Rework archjob handling.  They are now processed from the owning
    PackageJob object, and only one is spawned for the lifetime of the
    PackageJob for each architecture (previously one was spawned for each
    individual build job on the builder).  Archjob UIDs are generated on
    the server now rather than the builder.

    * Correctly handle builders going away before they've had a chance to
    notify the server that they have started an archjob.  Previously, these
    jobs would just be lost.  This is the real reason for the rework of the
    archjob handling above.

    * On the builder, don't use die() to end jobs that have failed; do it
    properly and upload files to the server if we weren't killed

    * Deal with active builders whose hostnames can't be resolved when
    the server starts

    * Kill any existing jobs on builders when the server starts up

    * Consolidate and simplify logging functions in PackageJob

    * More pylint-induced cleanups




Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -r1.31 -r1.32
--- ArchJob.py	9 May 2006 19:10:57 -0000	1.31
+++ ArchJob.py	14 May 2006 05:43:07 -0000	1.32
@@ -15,36 +15,80 @@
 # Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
 
 import time
-import socket
 import os
 import threading
 import urllib
-import OpenSSL
 from plague import FileTransfer
+import sha
 
 
+def _generate_uniqid(parent_jobid, start_time, target_dict, srpm_url):
+    distro = target_dict['distro']
+    repo =  target_dict['repo']
+    target = target_dict['target']
+    arch = target_dict['arch']
+    hash_string = "%d%d%s%s%s%s%s" % (parent_jobid, start_time, distro,
+            target, arch, repo, srpm_url)
+    sha_hash = sha.new()
+    sha_hash.update(hash_string)
+    return sha_hash.hexdigest()
+
+
+AJ_STATUS_QUEUED = 'queued'
+AJ_STATUS_WAITING = 'waiting'
+AJ_STATUS_REPO_WAIT = 'repo_wait'
+AJ_STATUS_REPO_UNLOCK = 'repo_unlock'
+AJ_STATUS_RUNNING = 'running'
+AJ_STATUS_DOWNLOADING = 'downloading'
+AJ_STATUS_DONE = 'done'
+
 class ArchJob:
     """ Tracks a single build instance for a single arch on a builder """
 
-    def __init__(self, builder, par_job, jobid, target_dict):
-        self.par_job = par_job
-        self._builder = builder
-        self._repo = par_job.repo()
-        self.jobid = jobid
-        self._status = 'starting'
+    def __init__(self, parent, target_dict, srpm_url):
+        self._parent = parent
+        self._builder = None
+        self._repo = parent.repo()
+        self._starttime = time.time()
+        self._endtime = 0
+        self._id = _generate_uniqid(parent.uid, self._starttime, target_dict,
+                srpm_url)
+        self._status = AJ_STATUS_QUEUED
         self._builder_status = ''
         self._failure_noticed = False
         self._download_failed = False
         self._internal_failure = False
         self._target_dict = target_dict
-        self._builder_gone = False
+        self._srpm_url = srpm_url
         self._result_files = {}
-        self._starttime = time.time()
-        self._endtime = 0
         self._die = False
         self._die_user_requested = False
         self._die_lock = threading.Lock()
         self._prepping = False
+        self._orphaned = False
+
+    def builder_suspend_cb(self, builder, reason, msg):
+        self.unclaim(builder)
+        if not self._is_done_status():
+            self._parent.log(self.arch(), "Builder disappeared.  Requeuing arch...")
+
+    def set_builder_status(self, builder, builder_status):
+        if builder != self._builder:
+            return
+
+        # The job has just started on the builder
+        if self._builder_status == '':
+            addr = builder.address()
+            self._parent.log(self.arch(), "%s - UID is %s" % (addr, self._id))
+            # Notify our parent PackageJob that we've started
+            self._parent.archjob_started_cb(self)
+
+        oldstatus = self._builder_status
+        self._builder_status = builder_status
+        if oldstatus != self._builder_status:
+            attrdict = self._to_dict()
+            self._parent.bm.queue_archjob_status_update(self._id, attrdict)
+            del attrdict
 
     def failure_noticed(self):
         return self._failure_noticed
@@ -74,86 +118,115 @@
     def arch(self):
         return self._target_dict['arch']
 
+    def target_dict(self):
+        return self._target_dict
+
+    def srpm_url(self):
+        return self._srpm_url
+
+    def archjob_id(self):
+        return self._id
+
     def builder(self):
         return self._builder
 
+    def orphaned(self):
+        return self._orphaned
+
+    def claim(self, builder):
+        """Called by the Builder via the BuilderManager when the builder
+        agrees to build this archjob."""
+        if self._status != AJ_STATUS_QUEUED:
+            return
+        self._set_status(AJ_STATUS_WAITING)
+        self._builder = builder
+        builder.add_suspend_listener(self)
+
+    def unclaim(self, builder):
+        builder.remove_suspend_listener(self)
+        self._builder = None
+        if not self._is_done_status():
+            self._orphaned = True
+            self._builder_status = ''
+            # Mark ourselves as available for building again
+            self._set_status(AJ_STATUS_QUEUED)
+
     def _to_dict(self):
         attrdict = {}
-        attrdict['jobid'] = self.jobid
-        attrdict['parent_uid'] = self.par_job.uid
+        attrdict['jobid'] = self._id
+        attrdict['parent_uid'] = self._parent.uid
         attrdict['arch'] = self._target_dict['arch']
-        (ip, addr) = self._builder.address()
-        # for some reason, splithost doesn't like the protocol
-        # method, you have to give it a string starting with "//"
-        if addr.startswith("http"):
-            idx = addr.find('//')
-            addr = addr[idx:]
-        host_port, path = urllib.splithost(addr)
-        host, port = urllib.splitport(host_port)
-        attrdict['builder_addr'] = host
+        if self._builder:
+            name = self._builder.address()
+            # for some reason, splithost doesn't like the protocol
+            # method, you have to give it a string starting with "//"
+            if name.startswith("http"):
+                idx = name.find('//')
+                name = name[idx:]
+            host_port, path = urllib.splithost(name)
+            host, port = urllib.splitport(host_port)
+            attrdict['builder_addr'] = host
+        else:
+            attrdict['builder_addr'] = ""
         attrdict['status'] = self._status
         attrdict['builder_status'] = self._builder_status
         attrdict['starttime'] = self._starttime
         attrdict['endtime'] = self._endtime
         return attrdict
 
-    def set_builder_job_status(self, builder_status):
-        oldstatus = self._builder_status
-        self._builder_status = builder_status
-        if oldstatus != self._builder_status:
-            attrdict = self._to_dict()
-            self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
-            del attrdict
-
-        if builder_status == 'killed' or builder_status == 'failed':
-            self.par_job.wake()
-
     def _set_status(self, status):
         oldstatus = self._status
         self._status = status
         if oldstatus != self._status:
             attrdict = self._to_dict()
-            self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
+            self._parent.bm.queue_archjob_status_update(self._id, attrdict)
             del attrdict
 
     def _is_done_status(self):
-        if self._status == 'done':
+        if self._status == AJ_STATUS_DONE:
             return True
         return False
 
-    def _status_starting(self):
+    def _set_done(self):
+        self._builder.remove_suspend_listener(self)
+        self._set_status(AJ_STATUS_DONE)
+
+    def _status_queued(self):
+        pass
+
+    def _status_waiting(self):
         # Builders pause before they enter the 'prep' state (which accesses
         # the repo for this target), and wait for the server to allow them
         # to proceed when the repo is unlocked.
         if self._builder_status == 'downloaded':
-            self._set_status('repo_wait')
+            self._set_status(AJ_STATUS_REPO_WAIT)
             self._repo.request_unlock(self)
             
     def _status_repo_wait(self):
         pass
 
     def repo_unlocked_callback(self):
-        if self._status == 'repo_wait':
-            self._set_status('repo_unlock')        
+        if self._status == AJ_STATUS_REPO_WAIT:
+            self._set_status(AJ_STATUS_REPO_UNLOCK)
 
     def _status_repo_unlock(self):
         # Builder will be in 'downloaded' state until
         # it notices that the repo has been unlocked 
-        self._builder.unlock_repo_for_job(self.jobid)
         self._prepping = True
+        self._builder.unlock_repo_for_job(self._id)
         if self._builder_status != 'downloaded':
-            self._set_status('running')
+            self._set_status(AJ_STATUS_RUNNING)
 
     def _status_running(self):
         if self._builder_status != 'prepping':
             self._prepping = False
 
         if self._builder_finished():
-            self._set_status('downloading')
-            self._builder.request_job_files(self.jobid)
+            self._set_status(AJ_STATUS_DOWNLOADING)
+            self._builder.request_job_files(self._id)
 
     def get_result_files_dir(self):
-        result_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
+        result_dir = os.path.join(self._parent.get_stage_dir(), self._target_dict['arch'])
         if not os.path.exists(result_dir):
             os.makedirs(result_dir)
         return result_dir
@@ -169,13 +242,16 @@
             if fname != files.keys()[nresults - 1]:
                 file_string = file_string + ", "
 
-        print "%s (%s/%s): Build result files - [ %s ]" % (self.par_job.uid,
-                    self.par_job.package, self._target_dict['arch'], file_string)
+        print "%s (%s/%s): Build result files - [ %s ]" % (self._parent.uid,
+                    self._parent.package, self._target_dict['arch'], file_string)
 
     def download_cb(self, files):
         """Called by the Builder to notify us that our job's files are available.
         The files argument should be a list of _filenames_, not paths.  All files
         are assumed to be in the directory returned by get_result_files_dir."""
+        if self._is_done_status():
+            return
+
         if len(files.keys()) == 0:
             self._download_failed = True
         else:
@@ -186,13 +262,21 @@
         self._print_downloaded_files(self._result_files)
 
         self._endtime = time.time()
-        self._set_status('done')
-        self.par_job.wake()
+        self._set_done()
 
     def _status_downloading(self):
         # Wait to be notified that our files are downloaded
         pass
 
+    def _handle_death(self, user_requested):
+        self._builder.request_kill_for_job(self._id)
+        if self._status == AJ_STATUS_REPO_WAIT:
+            self._repo.cancel_unlock_request(self)
+        self._set_done()
+        if user_requested:
+            print "%s (%s/%s): %s - killed." % (self._parent.uid, self._parent.package,
+                    self._target_dict['arch'], self._id)
+
     def process(self):
         if self._is_done_status():
             return
@@ -203,32 +287,23 @@
         user_requested = self._die_user_requested
         self._die_lock.release()
         if should_die:
-            try:
-                self._server.die(self.jobid)
-            except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error):
-                pass
-            if self._status == 'repo_wait':
-                self._repo.cancel_unlock_request(self)
-            self._set_status('done')
-            if user_requested:
-                print "%s (%s/%s): %s - killed." % (self.par_job.uid, self.par_job.package,
-                        self._target_dict['arch'], self.jobid)
+            self._handle_death(user_requested)
             return
 
         status_func = None
         try:
             status_func = getattr(self, "_status_%s" % self._status)
         except AttributeError:
-            print "%s (%s/%s): %s - internal archjob inconsistency.  Unknown status '%s'." % (self.par_job.uid, self.par_job.package,
-                        self._target_dict['arch'], self.jobid, self._status)
-            self._set_status('done')
+            print "%s (%s/%s): %s - internal archjob inconsistency.  Unknown status '%s'." % (self._parent.uid,
+                    self._parent.package, self._target_dict['arch'], self._id, self._status)
+            self._set_done()
             self._internal_failure = True
             return
             
         # Do the actual work for this status
         status_func()
 
-    def get_status(self):
+    def status(self):
         return self._status
 
     def get_files(self):
@@ -239,21 +314,17 @@
                 files.append(fname)
         return files
 
-    def builder_gone(self):
-        if self._status != 'done':
-            self._builder_status = 'orphaned'
-            self._set_status('done')
-            self.par_job.remove_arch_job(self)
-
     def die(self, user_requested=False):
         # Can be called from other threads
-        if self._status == 'running' or self._status == 'repo_wait' or self._status == 'starting' or self._status == 'repo_unlock':
-            self._die_lock.acquire()
-            self._die = True
-            self._die_user_requested = user_requested
-            self._die_lock.release()
-            if user_requested:
-                print "%s (%s/%s): %s - kill requested by parent job" % (self.par_job.uid,
-                        self.par_job.package, self._target_dict['arch'], self.jobid)
+        if self._is_done_status():
+            return
+
+        self._die_lock.acquire()
+        self._die = True
+        self._die_user_requested = user_requested
+        self._die_lock.release()
+        if user_requested:
+            print "%s (%s/%s): %s - kill requested by parent job" % (self._parent.uid,
+                    self._parent.package, self._target_dict['arch'], self._id)
 
 


Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -r1.40 -r1.41
--- BuildMaster.py	28 Apr 2006 03:17:41 -0000	1.40
+++ BuildMaster.py	14 May 2006 05:43:07 -0000	1.41
@@ -18,9 +18,7 @@
 
 import time
 import PackageJob
-import DBManager
 import threading
-import os
 import Repo
 import copy
 import Config
@@ -144,7 +142,7 @@
         for repo in self._repos.values():
             repo.stop()
 
-    def create_job_request(self, email, package, source, target_dict, buildreq, time):
+    def create_job_request(self, email, package, source, target_dict, buildreq, ctime):
         req = {}
         req['email'] = email
         req['package'] = package
@@ -152,7 +150,7 @@
         req['target_target'] = target_dict['target']
         req['target_repo'] = target_dict['repo']
         req['buildreq'] = buildreq
-        req['time'] = time
+        req['time'] = ctime
         req['source'] = source
         req['uid_avail'] = False
         req['uid'] = -1
@@ -226,8 +224,8 @@
             # Update job end time
             try:
                 self._cursor.execute('UPDATE jobs SET endtime=%d WHERE uid=%d' % (job.endtime, uid))
-            except StandardError, e:
-                print "DB Error: could not access jobs database. Reason: '%s'" % e
+            except StandardError, exc:
+                print "DB Error: could not access jobs database. Reason: '%s'" % exc
             self._dbcx.commit()
 
             print "%s (%s): Job finished." % (uid, job.package)
@@ -253,8 +251,8 @@
         sql = 'UPDATE jobs SET ' + sql + ' WHERE uid=%d' % uid
         try:
             self._cursor.execute(sql)
-        except StandardError, e:
-            print "DB Error: could not access jobs database. Reason: '%s'" % e
+        except StandardError, exc:
+            print "DB Error: could not access jobs database. Reason: '%s'" % exc
         self._dbcx.commit()
 
     def _write_archjob_status_to_db(self, uid, attrdict):
@@ -266,15 +264,15 @@
                     "VALUES ('%s', %d, %d, %d, '%s', '%s', '%s', '%s')" % (uid, attrdict['parent_uid'], \
                     attrdict['starttime'], attrdict['endtime'], attrdict['arch'],                       \
                     attrdict['builder_addr'], attrdict['status'], attrdict['builder_status']))
-            except StandardError, e:
-                print "DB Error: could not access jobs database. Reason: '%s'" % e
+            except StandardError, exc:
+                print "DB Error: could not access jobs database. Reason: '%s'" % exc
         else:            
             try:
                 self._cursor.execute("UPDATE archjobs SET status='%s', builder_status='%s', endtime=%d " \
                     "WHERE jobid='%s' AND parent_uid=%d" % (attrdict['status'], 
                     attrdict['builder_status'], attrdict['endtime'], uid, attrdict['parent_uid']))
-            except StandardError, e:
-                print "DB Error: could not access jobs database. Reason: '%s'" % e
+            except StandardError, exc:
+                print "DB Error: could not access jobs database. Reason: '%s'" % exc
         self._dbcx.commit()
 
     def _save_job_status(self):


Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -r1.40 -r1.41
--- Builder.py	9 May 2006 19:10:57 -0000	1.40
+++ Builder.py	14 May 2006 05:43:07 -0000	1.41
@@ -15,7 +15,6 @@
 # Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
 
 import time
-import string
 import xmlrpclib
 import socket
 import os
@@ -60,19 +59,27 @@
         self._seq_gen = Commands.SequenceGenerator()
         self._lock = threading.Lock()
         self._cmd_queue = []
+        self._suspend_listeners = []
+        self._status_listeners = []
+        self._ip = None
+
+        uri, rest = urllib.splittype(address)
+        host, ignore = urllib.splithost(rest)
+        self._host, port = urllib.splitport(host)
 
-        try:
-            type, rest = urllib.splittype(address)
-            host, ignore = urllib.splithost(rest)
-            host, port = urllib.splitport(host)
-            self._ip = socket.gethostbyname(host)
-        except Exception, e:
-            print "Builder Error(%s): couldn't lookup builder's IP address." % address
-            raise Exception(e)
+        self._get_ip()
 
         threading.Thread.__init__(self)
         self.setName("Builder: %s" % address)
 
+    def _get_ip(self):
+        try:
+            self._ip = socket.gethostbyname(self._host)
+            return True
+        except Exception:
+            pass
+        return False
+
     def _match_target_dict(self, td1, td2):
         if td1['distro'] == td2['distro']:
             if td1['target'] == td2['target']:
@@ -81,24 +88,28 @@
         return False
 
     def arches(self, target_dict):
-        for td in self._target_list:
-            if self._match_target_dict(td, target_dict):
+        for tdict in self._target_list:
+            if self._match_target_dict(tdict, target_dict):
                 arches = []
-                for arch in td['supported_arches']:
+                for arch in tdict['supported_arches']:
                     if not arch in arches:
                         arches.append(arch)
                 return arches
         return None
 
-    def can_build_for_target(self, target_dict):
+    def can_build_arch_job(self, archjob):
+        target_dict = archjob.target_dict()
         for td in self._target_list:
             if self._match_target_dict(td, target_dict):
-                if target_dict['arch'] in td['supported_arches']:
+                if archjob.arch() in td['supported_arches']:
                     return True
         return False
 
     def address(self):
-        return (self._ip, self._address)
+        return self._address
+
+    def ip(self):
+        return self._ip
 
     def available(self):
         """ Is the builder responding to requests? """
@@ -123,17 +134,32 @@
             pass
         return None
 
+    def add_suspend_listener(self, listener):
+        listeners = self._suspend_listeners[:]
+        if listener not in listeners:
+            self._suspend_listeners.append(listener)
+
+    def remove_suspend_listener(self, listener):
+        if listener in self._suspend_listeners:
+            self._suspend_listeners.remove(listener)
+
+    def _notify_suspend_listeners(self, reason, msg):
+        # Must copy the list since it can be modified from
+        # the listener during our iteration of it
+        listeners = self._suspend_listeners[:]
+        for listener in listeners:
+            listener.builder_suspend_cb(self, reason, msg)
+        del listeners
+
     def _handle_builder_suspend(self, reason, msg):
-        for jobid in self._jobs.keys():
-            job = self._jobs[jobid]
-            job.builder_gone()
-            del self._jobs[jobid]
-        self._jobs = {}
         self._available = False
         self._suspend_reason = reason
         self._unavail_count = 0
         self._prepping_jobs = False
         self._when_died = time.time()
+        self._jobs = {}
+
+        self._notify_suspend_listeners(reason, msg)
 
         # Notify admins
         print "Suspending builder '%s'.  Reason: %s - %s." % (self._address, reason, msg)
@@ -178,8 +204,8 @@
         builder_dict['address'] = host
 
         arches = []
-        for td in self._target_list:
-            for arch in td['supported_arches']:
+        for tdict in self._target_list:
+            for arch in tdict['supported_arches']:
                 if not arch in arches:
                     arches.append(arch)
         builder_dict['arches'] = arches
@@ -197,7 +223,7 @@
             (uniqid, status) = cmd.get_job(item)
             try:
                 job = self._jobs[uniqid]
-                job.set_builder_job_status(status)
+                job.set_builder_status(self, status)
                 reported_uniqids.append(uniqid)
             except KeyError:
                 pass
@@ -208,11 +234,10 @@
         # removed from the building job list before we
         # were able to know that it was done.  HACK
         self._prepping_jobs = False
-        for jobid in self._jobs.keys():
+        for jobid, job in self._jobs.items():
             # If the builder didn't report this job as building,
             # and its not done, explicitly get its status
-            job = self._jobs[jobid]
-            if jobid not in reported_uniqids and job.get_status() != 'done':
+            if jobid not in reported_uniqids and job.status() != 'done':
                 new_cmds.append(Commands.PlgCommandJobStatus(jobid, self._seq_gen.next()))                    
 
             # Check for prepping jobs
@@ -243,11 +268,15 @@
 
         old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandNewJobReq)
         if old_cmd:
-            parent = old_cmd.parent_job()
-            archjob_id = ack.archjob_id()
-            archjob = ArchJob.ArchJob(self, parent, archjob_id, old_cmd.target_dict())
-            self._jobs[archjob_id] = archjob
-            parent.add_arch_job(archjob)
+            archjob = old_cmd.archjob()
+            archjob_id = archjob.archjob_id()
+            ack_archjob_id = ack.archjob_id()
+            if archjob_id != ack_archjob_id:
+                print "Builder Error (%s): returned archjob_id (%s) " \
+                        "doesn't match expected (%s)." % (self._address, ack_archjob_id, archjob_id)
+                archjob.unclaim(self)
+            else:
+                self._jobs[archjob_id] = archjob
 
     def _handle_job_status_ack(self, ack):
         """Handle a job status ack by setting telling the job object
@@ -258,7 +287,7 @@
             archjob_id = ack.archjob_id()
             status = ack.status()
             job = self._jobs[archjob_id]
-            job.set_builder_job_status(status)
+            job.set_builder_status(self, status)
 
     def _decompose_job_files_ack(self, ack):
         """Handle a job files ack by finding the archjob it's for, then
@@ -305,6 +334,28 @@
         self._cmd_queue.append(cmd)
         self._lock.release()
 
+    def request_kill_for_job(self, uniqid):
+        cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next())
+        self._lock.acquire()
+        self._cmd_queue.append(cmd)
+        self._lock.release()
+
+    def unlock_repo_for_job(self, uniqid):
+        """Called by an archjob to request the sending of a RepoUnlocked
+        command to the builder for a particular archjob."""
+
+        self._lock.acquire()
+        found = False
+        for cmd in self._cmd_queue:
+            if isinstance(cmd, Commands.PlgCommandUnlockRepo):
+                if cmd.archjob_id() == uniqid:
+                    found = True
+                    break
+        if not found:
+            cmd = Commands.PlgCommandUnlockRepo(uniqid, self._seq_gen.next())
+            self._cmd_queue.append(cmd)
+        self._lock.release()
+
 
 # HACK: This class is a hack to work around SSL hanging issues,
 # which cause the whole server to grind to a halt
@@ -326,8 +377,8 @@
             (jobs, free_slots) = self._server.building_jobs()
         except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
             self.failed = True
-        except xmlrpclib.Fault, e:
-            print "Builder Error (%s) in _building_jobs(): builder replied '%s'" % (self._address, e)
+        except xmlrpclib.Fault, exc:
+            print "Builder Error (%s) in _building_jobs(): builder replied '%s'" % (self._address, exc)
             self.failed = True
         self.jobs = jobs
         self.free_slots = free_slots
@@ -374,14 +425,13 @@
             alive = True
         except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
             alive = False
-        except xmlrpclib.Fault, e:
-            print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, e)
+        except xmlrpclib.Fault, exc:
+            print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, exc)
             alive = False
         return (alive, target_list)
 
     def _init_builder(self, target_list):
         self._target_list = target_list
-
         # Kill any jobs currently running on the builder
         jobs = self._building_jobs()
         for jobid in jobs.keys():
@@ -398,8 +448,8 @@
             num_slots = self._server.num_slots()
         except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
             pass
-        except xmlrpclib.Fault, e:
-            print "Builder Error (%s) in _get_num_slots(): builder replied '%s'" % (self._address, e)
+        except xmlrpclib.Fault, exc:
+            print "Builder Error (%s) in _get_num_slots(): builder replied '%s'" % (self._address, exc)
         return num_slots
 
     def _building_jobs(self):
@@ -444,26 +494,29 @@
         self._ping_timeout = 0
         self._init_builder(target_list)
 
-    def start_job(self, par_job, target_dict, srpm_url):
+    def start_job(self, req):
         if not self.available():
             raise RuntimeError
-        if not self.can_build_for_target(target_dict):
+        if not self.can_build_request(req):
             raise RuntimeError
 
         self._server_lock.acquire()
         try:
             # Builder will return jobid of 0 if it can't start the job for some reason
-            jobid = self._server.start_new_job(target_dict, srpm_url)
+            srpm_url = req.srpm_url()
+            target_dict = req.target_dict()
+            jobid = self._server.start_new_job(target_dict, req.srpm_url())
         except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, \
-                OpenSSL.SSL.Error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e:
-            error_class = str(e.__class__)
-            error_string = str(e)
+                OpenSSL.SSL.Error, xmlrpclib.ProtocolError, xmlrpclib.Fault), exc:
+            error_class = str(exc.__class__)
+            error_string = str(exc)
             jobarch = target_dict['arch']
-            print "%s (%s/%s): %s exception '%s' starting job on %s" % (par_job.uid, \
-                par_job.package, jobarch, error_class, error_string, \
+            parent = req.parent()
+            print "%s (%s/%s): %s exception '%s' starting job on %s" % (parent.uid, \
+                parent.package, jobarch, error_class, error_string, \
                 self._address)
             # Check for hard errors, for which we suspend the builder
-            if string.find(error_string, "OSError") >= 0 and string.find(error_string, "Errno") >= 0:
+            if error_string.find("OSError") >= 0 and error_string.find("Errno") >= 0:
                 self._handle_builder_suspend(SUSPEND_HARD_ERROR, error_string)
             time.sleep(0.5)
             jobid = 0
@@ -472,7 +525,7 @@
             self._server_lock.release()
             raise RuntimeError
 
-        job = ArchJob.ArchJob(self, self._server_cfg, self._server, par_job, jobid, target_dict)
+        job = ArchJob.ArchJob(self, self._server_cfg, self._server, parent, jobid, target_dict)
         self._jobs[jobid] = job
         self._update_building_jobs()
         self._server_lock.release()
@@ -488,7 +541,7 @@
                 try:
                     job = self._jobs[jobid]
                     status = jobs[jobid]
-                    job.set_builder_job_status(status)
+                    job.set_builder_status(self, status)
                     builder_jobs.append(jobid)
                 except KeyError:
                     pass
@@ -503,10 +556,10 @@
                 # If the builder didn't report this job as building,
                 # and its not done, explicitly grab its status
                 job = self._jobs[jobid]
-                if jobid not in builder_jobs and job.get_status() != 'done':
+                if jobid not in builder_jobs and job.status() != 'done':
                     status = self._get_builder_job_status(jobid)
                     if status:
-                        job.set_builder_job_status(status)
+                        job.set_builder_status(self, status)
 
                 # Check for prepping jobs
                 if job.prepping():
@@ -573,22 +626,6 @@
     def _init_builder(self, target_list):
         self._target_list = target_list
 
-    def unlock_repo_for_job(self, uniqid):
-        """Called by an archjob to request the sending of a RepoUnlocked
-        command to the builder for a particular archjob."""
-
-        self._lock.acquire()
-        found = False
-        for cmd in self._cmd_queue:
-            if isinstance(cmd, Commands.PlgCommandUnlockRepo):
-                if cmd.archjob_id() == uniqid:
-                    found = True
-                    break
-        if not found:
-            cmd = Commands.PlgCommandUnlockRepo(uniqid, self._seq_gen.next())
-            self._cmd_queue.append(cmd)
-        self._lock.release()
-
     def _handle_job_files_ack(self, cmd):
         (archjob, urls) = self._decompose_job_files_ack(cmd)
         if not archjob:
@@ -644,10 +681,10 @@
         # Grab some work for the builder if any is available
         new_cmds = []
         if self._free_slots > 0:
-            req = self._manager.claim_arch_job(self)
-            if req:
+            archjob = self._manager.claim_arch_job(self)
+            if archjob:
                 next_seq = self._seq_gen.next()
-                cmd = Commands.PlgCommandNewJobReq(req['parent'], req['target_dict'], req['srpm_url'], next_seq)
+                cmd = Commands.PlgCommandNewJobReq(archjob, seq=next_seq)
                 new_cmds.append(cmd)
 
         self._lock.acquire()
@@ -667,6 +704,11 @@
         self._lock.release()
         return cmd_list
 
+    def ip(self):
+        if not self._ip:
+            self._get_ip()
+        return Builder.ip(self)
+
     _SLEEP_INTERVAL = 10
     def run(self):
         """Main builder loop.  Since the builder contacts us,
@@ -675,28 +717,24 @@
 
         DebugUtils.registerThreadName(self)
         while not self._stop:
-            if not self._available:
-                time.sleep(self._SLEEP_INTERVAL)
-                continue
-
-            process_jobs = True
-            self._lock.acquire()
-            if self._unavail_count > 2:
-                self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
-                process_jobs = False
-            elif self._last_contact + self._REQUIRED_CONTACT_INTERVAL < time.time():
-                self._unavail_count = self._unavail_count + 1
-            self._lock.release()
-
-            if process_jobs:
-                for j in self._jobs.values():
-                    j.process()
+            if self._available:
+                self._lock.acquire()
+                if self._unavail_count > 2:
+                    self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+                elif self._last_contact + self._REQUIRED_CONTACT_INTERVAL < time.time():
+                    self._unavail_count = self._unavail_count + 1
+                self._lock.release()
 
             time.sleep(self._SLEEP_INTERVAL)
 
     def _handle_builder_suspend(self, reason, msg):
         Builder._handle_builder_suspend(self, reason, msg)
         self._last_contact = 0
+        self._ip = None
+
+        # Clear out the command queue; we start clean when the
+        # builder contacts us again
+        self._cmd_queue = []
 
     def _handle_builder_reactivate(self, cmd_list):
         # Grab an updated target list from the command stream when
@@ -705,6 +743,14 @@
         for cmd in cmd_list:
             if isinstance(cmd, Commands.PlgCommandTargets):
                 target_list = cmd.targets()
+            elif isinstance(cmd, Commands.PlgCommandBuildingJobs):
+                # Tell the builder to kill all jobs it might be building
+                # right now.  Think server restart here.
+                for item in cmd.jobs():
+                    (uniqid, status) = cmd.get_job(item)
+                    cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next())
+                    self._cmd_queue.append(cmd)
+                    
         if not target_list:
             target_list = self._target_list
 


Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -r1.24 -r1.25
--- BuilderManager.py	9 May 2006 19:10:57 -0000	1.24
+++ BuilderManager.py	14 May 2006 05:43:07 -0000	1.25
@@ -15,34 +15,31 @@
 # Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
 
 import time
-import string
-import xmlrpclib
-import sys
 import socket
 import os
 import threading
 import Builder
-import EmailUtils
-import Config
-import time
 from plague import DebugUtils
 from plague import AuthedXMLRPCServer
 from plague import HTTPServer
 from plague import Commands
+import ArchJob
 
 
 class AddrCache(object):
+    _ENTRY_EXPIRE_TIME = 3600
+
     def __init__(self):
         self._cache = {}
 
     def get(self, name):
         # Expire cache entry if one exists and is old
-        time = ip = None
+        ip = None
         try:
-            (time, ip) = self._cache[name]
-            if time < time.time() - (60 * 60):
+            (itime, ip) = self._cache[name]
+            if itime < time.time() - self._ENTRY_EXPIRE_TIME:
                 del self._cache[name]
-                time = ip = None
+                itime = ip = None
         except KeyError:
             pass
 
@@ -127,14 +124,18 @@
 
         hostname = cfg.get_str("General", "hostname")
         port = cfg.get_int("Active Builders", "xmlrpc_server_port")
-        if cfg.get_bool("Builders", "use_ssl") == True:
-            certs = {}
-            certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
-            certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
-            certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
-            self._server = AuthedSSLBuilderServer((hostname, port), certs, self._bm)
-        else:
-            self._server = AuthedBuilderServer((hostname, port), self._bm)
+        try:
+            if cfg.get_bool("Builders", "use_ssl") == True:
+                certs = {}
+                certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+                certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+                certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+                self._server = AuthedSSLBuilderServer((hostname, port), certs, self._bm)
+            else:
+                self._server = AuthedBuilderServer((hostname, port), self._bm)
+        except socket.gaierror, exc:
+            raise socket.gaierror(exc[0], "Couldn't bind to address %s:%d (%s)" % (hostname,
+                    port, exc[1]))
         self._dispatcher = BuilderDispatcher()
         self._server.register_instance(self._dispatcher)
 
@@ -145,10 +146,10 @@
 
     def stop(self):
         self._server.stop()
-        t = time.time()
+        tm = time.time()
         while not self._stopped:
             try:
-                if time.time() > t + 2:
+                if time.time() > tm + 2:
                     break
             except KeyboardInterrupt:
                 pass
@@ -163,7 +164,6 @@
 
         self._builders = []
         any_active = self._load_builders()
-        self._print_builders()
 
         self._queue_lock = threading.Lock()
         self._queue = []
@@ -190,10 +190,12 @@
             self._fileserver.set_POST_handler('/upload', self.upload_callback)
         self._fileserver.start()
 
+        self._print_builders()
+
     def upload_callback(self, request_handler, fs):
         # Ensure we know this builder
-        addr = request_handler.client_address[0]
-        builder = self.get_builder(addr, addr)
+        ip = request_handler.client_address[0]
+        builder = self.get_builder(ip, ip)
         if not builder:
             request_handler.send_error(403, "Unauthorized")
             return
@@ -236,7 +238,7 @@
         print "\nAuthorized Builders:"
         print "-" * 90
         for builder in self._builders:
-            (ip, addr) = builder.address()
+            addr = builder.address()
             string = "  " + addr
             string = string + " " * (40 - len(addr))
             builder_dict = builder.to_dict()
@@ -269,7 +271,7 @@
             # If the address is already in our _builders list, skip it
             skip = False
             for builder in self._builders:
-                (ip, addr) = builder.address()
+                addr = builder.address()
                 if address == addr:
                     skip = True
                     break
@@ -301,21 +303,23 @@
         return builder_list
 
     def get_builder(self, cert_ip, con_ip):
+        """Return the Builder object, if any, that matches the specified
+        IP address.  Performs basic checking on the address too."""
         self._builders_lock.acquire()
-        builder = None
+        ret_builder = None
 
         # Ensure builder's certificate (if SSL) and
         # the remote address of its connection are the same
         if cert_ip == con_ip:
             # Find matching builder in our authorized builders list
-            for b in self._builders:
-                (ip, addr) = b.address()
-                if cert_ip == ip:
-                    builder = b
+            for builder in self._builders:
+                ip = builder.ip()
+                if ip and cert_ip == ip:
+                    ret_builder = builder
                     break
 
         self._builders_lock.release()
-        return builder
+        return ret_builder
 
     def _builder_cmp_func(self, builder1, builder2):
         # If both builders have at least one free slot, sort on
@@ -341,29 +345,36 @@
         return 1
 
     def claim_arch_job(self, builder):
+        """Called by a Builder instance to find a job for the builder to build."""
         archjob = None
         self._queue_lock.acquire()
-        for req in self._queue:
-            if builder.can_build_for_target(req['target_dict']):
-                self._queue.remove(req)
-                archjob = req
+
+        # First pass: look for orphaned jobs
+        for job in self._queue:
+            if job.status() != ArchJob.AJ_STATUS_QUEUED:
+                continue
+            if job.orphaned() and builder.can_build_arch_job(job):
+                job.claim(builder)
+                archjob = job
                 break
+
+        # Second pass: just pick any job
+        if not archjob:
+            for job in self._queue:
+                if job.status() != ArchJob.AJ_STATUS_QUEUED:
+                    continue
+                if builder.can_build_arch_job(job):
+                    job.claim(builder)
+                    archjob = job
+                    break
+
         self._queue_lock.release()
         return archjob
 
-    def request_arch_job(self, par_job, target_dict, srpm_url, orphaned):
-        req = {}
-        req['parent'] = par_job
-        req['target_dict'] = target_dict
-        req['srpm_url'] = srpm_url
-        req['time_queued'] = time.time()
-
+    def request_arch_job(self, archjob):
+        """Called by the PackageJob instance to queue new arch-specific build jobs."""
         self._queue_lock.acquire()
-        if orphaned:
-            # insert orphaned requests at the front of the queue
-            self._queue.insert(0, req)
-        else:
-            self._queue.append(req)
+        self._queue.append(archjob)
         self._queue_lock.release()
 
     def any_prepping_builders(self):
@@ -373,4 +384,3 @@
                 return True
         return False
 
-


Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -r1.49 -r1.50
--- PackageJob.py	11 May 2006 14:46:37 -0000	1.49
+++ PackageJob.py	14 May 2006 05:43:07 -0000	1.50
@@ -27,9 +27,6 @@
 import copy
 import string
 import EmailUtils
-import xmlrpclib
-import socket
-import BuilderManager
 import ArchJob
 from plague import ArchUtils
 from plague import DebugUtils
@@ -47,9 +44,6 @@
     if DEBUG:
         print stuff
 
-def log(stuff=''):
-    print stuff
-
 class PrepError(exceptions.Exception): pass
 
 class DepError(exceptions.Exception): pass
@@ -72,6 +66,7 @@
     "build" stage.  This class provides the actual running thread.
     """
     def __init__(self, pkg_job, start_stage, end_stage):
+        self._stop = False
         self._pkg_job = pkg_job
         if not end_stage:
             end_stage = 'aaaaa'
@@ -82,9 +77,11 @@
         
     def run(self):
         DebugUtils.registerThreadName(self)
-        while not self._pkg_job.is_done() and not self._pkg_job.cur_stage() == self._end_stage:
+        while not self._stop and not self._pkg_job.is_done() and not self._pkg_job.cur_stage() == self._end_stage:
             self._pkg_job.process()
 
+    def stop(self):
+        self._stop = True
 
 def is_package_job_stage_valid(stage):
     """ Validate a job stage """
@@ -118,9 +115,9 @@
 def make_job_log_url(base_url, target_str, uid, name, ver, release):
     if target_str and uid and name and ver and release:
         if base_url.endswith('/'):
-            slash=''
+            slash = ''
         else:
-            slash='/'
+            slash = '/'
         return "%s%s%s/%s-%s-%s-%s/" % (base_url, slash, target_str, uid, name, ver, release)
     return None
 
@@ -158,9 +155,10 @@
         self._depsolve_dir = None
         self._last_depsolve_error = None
         self._depsolve_first_try = False
+        self._checkout_tmpdir = None
 
         self.repofiles = {}
-        self.archjobs = {}
+        self._archjobs = {}
         self._archjobs_lock = threading.Lock()
         self._event = threading.Event()
         self._killer = None
@@ -175,6 +173,12 @@
         pjc = PackageJobController(self, first_stage, 'waiting')
         pjc.start()
 
+    def log(self, arch=None, msg=None):
+        archstring = ""
+        if arch:
+            archstring = "/%s" % arch
+        print "%s (%s%s): %s" % (self.uid, self.package, archstring, msg)
+
     def cur_stage(self):
         return self._curstage
 
@@ -374,7 +378,7 @@
             for line in lines:
                 if line.find('..........') == -1 and len(line) > 0:
                     output_lines.append(line)
-            o = string.join(output_lines, '\n')
+            o = string.join(output_lines, ('\n'))
             msg = "Error: could not make srpm for %s - output was:\n\n%s" % (self._source, o)
             raise PrepError(msg)
         
@@ -413,11 +417,11 @@
             self.epoch = '0'
         self.ver = hdr['version']
         self.release = hdr['release']
-        (self.archjobs, pkg_arches, allowed_arches) = self.arch_handling(hdr)
+        (self._archjobs, pkg_arches, allowed_arches) = self.arch_handling(hdr)
         del hdr
         del ts
 
-        if len(self.archjobs) == 0:
+        if len(self._archjobs) == 0:
             msg = """Package %s does not build on any architectures this build system supports.
     Package: %s
     Build System: %s
@@ -426,7 +430,7 @@
 
         if self._server_cfg.get_bool("General", "depsolve_jobs"):
             self._set_cur_stage('depsolve_wait')
-            log("%s (%s): Requesting depsolve..." % (self.uid, self.package))
+            self.log(msg="Requesting depsolve...")
             self._repo.request_depsolve(self, first_try=True)
             return True  # sleep until the Repo wakes us up for depsolve
         else:
@@ -445,9 +449,9 @@
         config_lines = []
         job_yum_dir = os.path.join(self._depsolve_dir, arch)
         for line in config_opts['yum.conf'].split('\n'):
-            if string.find(line, "cachedir=") >= 0:
+            if line.find("cachedir=") >= 0:
                 line = "cachedir=cache"
-            elif string.find(line, "logfile=") >= 0:
+            elif line.find("logfile=") >= 0:
                 line = "logfile=yum.log"
             config_lines.append(line+'\n')
         del config_opts
@@ -483,8 +487,8 @@
             base.log = Logger(threshold=threshold, file_object=sys.stdout)
             try:
                 base.doRepoSetup()
-            except yum.Errors.RepoError, e:
-                raise DepError(str(e))
+            except yum.Errors.RepoError, exc:
+                raise DepError(str(exc))
 
             archlist = ['src', 'noarch']
             if ArchUtils.supported_arches.has_key(arch):
@@ -495,27 +499,27 @@
             ts = rpmUtils.transaction.initReadOnlyTransaction()
             try:
                 srpm = yum.packages.YumLocalPackage(ts, self._srpm_path)
-            except yum.Errors.MiscError, e:
+            except yum.Errors.MiscError, exc:
                 del ts
-                raise DepError(str(e))
+                raise DepError(str(exc))
             del ts
 
             try:
                 base.doSackSetup(archlist)
-            except yum.Errors.RepoError, e:
-                raise DepError(str(e))
+            except yum.Errors.RepoError, exc:
+                raise DepError(str(exc))
 
             for dep in srpm.requiresList():
                 if dep.startswith("rpmlib("): continue
                 try:
                     pkg = base.returnPackageByDep(dep)
-                except repomd.mdErrors.PackageSackError, e:
-                    raise DepError(str(e))
-                except yum.Errors.YumBaseError, e:
-                    raise DepError(str(e))
-        except DepError, e:
-            self._last_depsolve_error = str(e)
-            print "%s (%s/%s): Depsolve Error: %s" % (self.uid, self.package, arch, str(e))
+                except repomd.mdErrors.PackageSackError, exc:
+                    raise DepError(str(exc))
+                except yum.Errors.YumBaseError, exc:
+                    raise DepError(str(exc))
+        except DepError, exc:
+            self._last_depsolve_error = str(exc)
+            print "%s (%s/%s): Depsolve Error: %s" % (self.uid, self.package, arch, str(exc))
             success = False
 
         if base:
@@ -530,8 +534,8 @@
         self.wake()
 
     def _stage_depsolve(self):
-        """ Depsolve all arches, only if all pass do we proceed """
-        """ to queue up the actual archjobs for building """
+        """Depsolve all arches, only if all pass do we proceed
+        to queue up the actual archjobs for building."""
 
         # If the job's waited more than 8 hours for deps to be
         # solved, kill the job and make some human figure it out
@@ -551,18 +555,18 @@
 
         self._archjobs_lock.acquire()
         unsolved_deps = False
-        archlist = self.archjobs.keys()
+        archlist = self._archjobs.keys()
 
         # noarch jobs are a bit special here.  Since we don't know
         # what arch they will actually build on, we have to make
         # sure all arches the target supports will work
-        if len(self.archjobs.keys()) == 1 and self.archjobs.keys()[0] == 'noarch':
+        if len(self._archjobs.keys()) == 1 and self._archjobs.keys()[0] == 'noarch':
             archlist = self._target_cfg.basearches()
             for arch in self._target_cfg.optarches():
                 if arch not in archlist:
                     archlist.append(arch)
 
-        log("%s (%s): Starting depsolve for arches: %s." % (self.uid, self.package, archlist))
+        self.log(msg="Starting depsolve for arches: %s." % archlist)
 
         failed_arch = None
         for arch in archlist:
@@ -588,10 +592,10 @@
             # Go to sleep until the repo changes
             self._set_cur_stage('depsolve_wait')
             self._repo.request_depsolve(self, first_try=False)
-            log("%s (%s): Finished depsolve (unsuccessful), trying again later." % (self.uid, self.package))
+            self.log(msg="Finished depsolve (unsuccessful), trying again later.")
             return True
 
-        log("%s (%s): Finished depsolve (successful), requesting archjobs." % (self.uid, self.package))
+        self.log(msg="Finished depsolve (successful), requesting archjobs.")
 
         # Success, all deps are solved.  Kill the depsolve dir
         shutil.rmtree(self._depsolve_dir, ignore_errors=True)
@@ -603,7 +607,7 @@
         # Make some directories we need
         work_dir = self._server_cfg.get_str("Directories", "server_work_dir")
         self._result_dir = self._make_stage_dir(work_dir)
-        for arch in self.archjobs.keys():
+        for arch in self._archjobs.keys():
             thisdir = os.path.join(self._result_dir, arch)
             if not os.path.exists(thisdir):
                 os.makedirs(thisdir)
@@ -620,7 +624,7 @@
 
         # Queue up archjobs
         self._set_cur_stage('waiting')
-        self._request_arch_jobs()
+        self._create_arch_jobs()
         return False
 
     def _stage_depsolve_wait(self):
@@ -636,58 +640,34 @@
         os.makedirs(stage_dir)
         return stage_dir
 
-    def _request_one_arch_job(self, arch, orphaned):
-        # Construct SPRM URL
-        srpm_http_base = self._srpm_http_path[len(self.http_dir):]
-        method = "http"
-        if self._server_cfg.get_bool("Builders", "use_ssl") == True:
-            method = "https"
-        hostname = self._server_cfg.get_str("General", "hostname")
-        port = self._server_cfg.get_int("Active Builders", "file_server_port")
-        srpm_url = "%s://%s:%d/%s" % (method, hostname, port, srpm_http_base)
-        target_dict = self._target_cfg.target_dict(arch)
-        self.bm.builder_manager.request_arch_job(self, target_dict, srpm_url, orphaned)
-
-    def _request_arch_jobs(self):
-        # Queue requests for build jobs
+    def _create_arch_jobs(self):
         self._archjobs_lock.acquire()
-        for arch in self.archjobs.keys():
-            if self.archjobs[arch]:
-                continue
-            self._request_one_arch_job(arch, False)
+        for arch in self._archjobs.keys():
+            # Construct the SRPM URL
+            srpm_http_base = self._srpm_http_path[len(self.http_dir):]
+            method = "http"
+            if self._server_cfg.get_bool("Builders", "use_ssl") == True:
+                method = "https"
+            hostname = self._server_cfg.get_str("General", "hostname")
+            port = self._server_cfg.get_int("Active Builders", "file_server_port")
+            srpm_url = "%s://%s:%d/%s" % (method, hostname, port, srpm_http_base)
+
+            # Create and queue the archjob
+            target_dict = self._target_cfg.target_dict(arch)
+            archjob = ArchJob.ArchJob(self, target_dict, srpm_url)
+            self._archjobs[arch] = archjob
+            self.bm.builder_manager.request_arch_job(archjob)
         self._archjobs_lock.release()
 
-    def add_arch_job(self, job):
-        """ Called by the BuilderManager when it's started a new arch job for us """
-        self._archjobs_lock.acquire()
-        jobarch = job.arch()
-        if self.archjobs[jobarch] != None:
-            log("%s (%s/%s): Already have archjob for this arch (%s).  New job UID is %s." % (self.uid, \
-                    self.package, jobarch, self.archjobs[jobarch].jobid, job.jobid))
-        self.archjobs[jobarch] = job
-
+    def archjob_started_cb(self, archjob):
         # If this is the first archjob, that means we are now building.
         # So we start up the second PackageJobController thread.
         if self._curstage == 'waiting':
             t = PackageJobController(self, 'building', None)
             t.start()
 
-        self._archjobs_lock.release()
-        # Only want hostname, not both IP and hostname
-        addr = job.builder().address()[1]
-        log("%s (%s/%s): %s - UID is %s" % (self.uid, self.package, jobarch, addr, job.jobid))
-
-    def remove_arch_job(self, job):
-        """ Removes an arch job when its builder is no longer responding """
-        self._archjobs_lock.acquire()
-        jobarch = job.arch()
-        log("%s (%s/%s): Builder disappeared.  Requeuing arch..." % (self.uid, self.package, jobarch))
-        self.archjobs[jobarch] = None
-        self._request_one_arch_job(jobarch, True)
-        self._archjobs_lock.release()
-
     def is_done(self):
-        if self._curstage == 'needsign' or self._curstage == 'failed' or self._curstage == 'finished':
+        if self._curstage in ['needsign', 'failed', 'finished']:
             return True
         return False
 
@@ -695,8 +675,7 @@
         self._killer = username
         self._die = True
 
-        log("%s (%s): Job kill request from %s" % (self.uid, self.package, username))
-        self._archjobs_lock.acquire()
+        self.log(msg="Job kill request from %s" % username)
         if self._curstage == 'waiting':
             # In 'waiting' stage, we have no controller thread.  So to get
             # the job killed immediately, we have to start one
@@ -705,14 +684,14 @@
         else:
             # Otherwise, wake up the existing controller thread
             self.wake()
-        self._archjobs_lock.release()
 
     def _handle_death(self):
-        resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self._target_str, self._killer)
+        result_start = "%s (%s): " % (self.uid, self.name)
+        resultstring = "Build on target %s was killed by %s." % (self._target_str, self._killer)
         self._result = 'killed'
         self._set_cur_stage('finished', resultstring)
-        self.email_result(self.username, resultstring)
-        log(resultstring)
+        self.email_result(self.username, result_start + resultstring)
+        self.log(msg=resultstring)
 
         # Kill any building jobs
         self._kill_all_archjobs(True)
@@ -726,19 +705,14 @@
 
     def _kill_all_archjobs(self, user_requested=False):
         self._archjobs_lock.acquire()
-        for job in self.archjobs.values():
-            if job:
-                job.die(user_requested)
-        self.archjobs = {}
+        for job in self._archjobs.values():
+            job.die(user_requested)
         self._archjobs_lock.release()
 
     def wake(self):
         self._event.set()
 
     def process(self):
-        if self.is_done():
-            return
-
         if self._die:
             self._handle_death()
             return
@@ -750,24 +724,24 @@
                 while not self._event.isSet():
                     self._event.wait()
                 self._event.clear()
-        except PrepError, e:
+        except PrepError, exc:
             if self.use_cvs == True:
                 shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
-            msg = str(e)
+            msg = str(exc)
             subj = 'Prep Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
             self.email_result(self.username, resultstring=msg, subject=subj)
             self._stage_failed(msg)
-        except DepError, e:
-            msg = str(e)
+        except DepError, exc:
+            msg = str(exc)
             subj = 'Dependencies Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
             self.email_result(self.username, resultstring=msg, subject=subj)
             self._stage_failed(msg)
-        except BuildError, e:
+        except BuildError, exc:
             subj = 'Build Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
             base_url = self._server_cfg.get_str("UI", "log_url")
             log_url = make_job_log_url(base_url, self._target_str, self.uid, self.name, self.ver, self.release)
-            msg = "%s\n\n         Build logs may be found at %s\n\n" % (e.msg, log_url)
-            logtail = self._get_log_tail(e.arch)
+            msg = "%s\n\n         Build logs may be found at %s\n\n" % (exc.msg, log_url)
+            logtail = self._get_log_tail(exc.arch)
             msg = "%s\n-------------------------------------------------\n\n%s\n" % (msg, logtail)
             self.email_result(self.username, resultstring=msg, subject=subj)
 
@@ -776,44 +750,46 @@
             if self._target_cfg.testing() == False:
                 # Kill remaining jobs on other arches
                 self._kill_all_archjobs(False)
-                self._stage_failed(e.msg)
+                self._stage_failed(exc.msg)
 
     def _stage_building(self):
-        # Count failed and completed jobs
         completed_jobs = 0
         failed_jobs = 0
         self._archjobs_lock.acquire()
-        for job in self.archjobs.values():
-            if not job:
-                continue
-            if job.get_status() is 'done':
-                completed_jobs = completed_jobs + 1
-
-                if job.builder_failed() or job.download_failed() or job.internal_failure():
-                    failed_jobs = failed_jobs + 1
+        for job in self._archjobs.values():
+            # If the archjob is running, give it some time
+            if job.status() is not ArchJob.AJ_STATUS_DONE:
+                job.process()
 
-                    # Normal jobs will just stop when a single archjob fails, but
-                    # testing targets don't kill the build when one fails.  However,
-                    # even for testing targets, we still want to notify the user if
-                    # a particular arch fails.
-                    if not job.failure_noticed():
-                        job.set_failure_noticed()
-                        jobarch = job.arch()
-                        msg = "Job failed."
-                        if job.builder_failed():
-                            msg = "Job failed on arch %s\n" % jobarch
-                        elif job.download_failed():
-                            msg = "Job failed on arch %s: couldn't download result files from builder '%s'.\n " \
-                            "Please contact the build system administrator." % (jobarch, job.builder().address())
-                        elif job.internal_failure():
-                            msg = "Job failed on arch %s: there was an internal build system failure.\n " \
-                            "Please contact the build system administrator." % jobarch
-                        self._archjobs_lock.release()
-                        raise BuildError(msg, jobarch)
+            # If the archjob is still running, go to next archjob
+            if job.status() is not ArchJob.AJ_STATUS_DONE:
+                continue
 
+            completed_jobs = completed_jobs + 1
+            if job.builder_failed() or job.download_failed() or job.internal_failure():
+                failed_jobs = failed_jobs + 1
+
+                # Normal jobs will just stop when a single archjob fails, but
+                # testing targets don't kill the build when one fails.  However,
+                # even for testing targets, we still want to notify the user if
+                # a particular arch fails.
+                if not job.failure_noticed():
+                    job.set_failure_noticed()
+                    jobarch = job.arch()
+                    msg = "Job failed."
+                    if job.builder_failed():
+                        msg = "Job failed on arch %s\n" % jobarch
+                    elif job.download_failed():
+                        msg = "Job failed on arch %s: couldn't download result files from builder '%s'.\n " \
+                        "Please contact the build system administrator." % (jobarch, job.builder().address())
+                    elif job.internal_failure():
+                        msg = "Job failed on arch %s: there was an internal build system failure.\n " \
+                        "Please contact the build system administrator." % jobarch
+                    self._archjobs_lock.release()
+                    raise BuildError(msg, jobarch)
         self._archjobs_lock.release()
 
-        if completed_jobs == len(self.archjobs):
+        if completed_jobs == len(self._archjobs):
             # Testing targets don't contribute packages to the repo
             if self._target_cfg.testing() == True:
                 if failed_jobs > 0:
@@ -824,7 +800,8 @@
                 self._set_cur_stage('add_to_repo')
             return False  # Don't want to wait
 
-        return True
+        time.sleep(5)
+        return False
 
     def get_stage_dir(self):
         return self._result_dir
@@ -848,9 +825,7 @@
         srpm_file = os.path.join(self._result_dir, os.path.basename(self._srpm_http_path))
 
         # Delete any RPMs in the arch dirs
-        for job in self.archjobs.values():
-            if not job:
-                continue
+        for job in self._archjobs.values():
             job_result_dir = job.get_result_files_dir()
             for f in job.get_files():
                 if not f.endswith(".rpm"):
@@ -874,9 +849,7 @@
         # Create a list of files that the repo should copy to
         # the repo dir
         repo_dir = self._server_cfg.get_str("Directories", "repo_dir")
-        for job in self.archjobs.values():
-            if not job:
-                continue
+        for job in self._archjobs.values():
             job_result_dir = job.get_result_files_dir()
             for f in job.get_files():
                 if not f.endswith(".rpm"):


Index: main.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/main.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- main.py	28 Apr 2006 03:17:41 -0000	1.21
+++ main.py	14 May 2006 05:43:07 -0000	1.22
@@ -91,7 +91,7 @@
         ret=daemonize.createDaemon()
         if ret:
             print "Daemonizing failed!"
-            sys.exit(2)
+            os._exit(2)
 
     if opts.pidfile:
         open(opts.pidfile, 'w').write('%d\n' % os.getpid())
@@ -107,7 +107,7 @@
     cfg.load_target_configs()
     if len(cfg.targets()) == 0:
         print "You need at least one target to do anything useful."
-        sys.exit(3)
+        os._exit(3)
 
     hostname = cfg.get_str("General", "hostname")
 
@@ -125,7 +125,11 @@
     dbm = DBManager.DBManager(cfg)
 
     # Create the BuildMaster thread
-    builder_manager = BuilderManager.BuilderManager(cfg)
+    try:
+        builder_manager = BuilderManager.BuilderManager(cfg)
+    except Exception, exc:
+        print "Couldn't create BuilderManager: %s" % exc
+        os._exit(4)
     bm = BuildMaster.BuildMaster(builder_manager, dbm, cfg)
     bm.start()
 
@@ -143,10 +147,10 @@
         else:
             ui = UserInterfaceNoAuth(builder_manager, bm, dbm, cfg)
             bm_server = AuthedXMLRPCServer.AuthedXMLRPCServer((hostname, port))
-    except socket.error, e:
-        if e[0] == 98:      # Address already in use
+    except socket.error, exc:
+        if exc[0] == 98:      # Address already in use
             print "Error: couldn't bind to address '%s:%s'.  Is the server already running?" % (hostname, port)
-            os._exit(1)
+            os._exit(4)
     bm_server.register_instance(ui)
 
     # Create dummy thread just to register main thread's name




More information about the fedora-extras-commits mailing list