extras-buildsys/server BuildJob.py, 1.8, 1.9 BuildMaster.py, 1.5, 1.6 UserInterface.py, 1.7, 1.8 buildserver.py, 1.8, 1.9 client_manager.py, 1.22, 1.23

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Fri Jun 24 17:24:22 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv10323/server

Modified Files:
	BuildJob.py BuildMaster.py UserInterface.py buildserver.py 
	client_manager.py 
Log Message:
2005-06-24  Dan Williams <dcbw at redhat.com>

    * Rework job database handling.  Only 1 thread now has access to
        the job database, the BuildMaster thread.  This should fix issues
        with database locking, timeouts, etc.  sqlite doesn't really like
        multiple threads, plus it doesn't have row locking, only table
        locking.

        So, when things want the BuildMaster to do something, they queue
        up a request and the BuildMaster gets around to it.  Once consequence
        of this change is that job UIDs are not known until the job gets
        added to the database, so we can no longer return the job's UID
        to the client enqueueing the job.

    * BuildClients are also in their own thread now so they don't block the
        BuildMaster.  There are interesting cases where clients can block
        the server while the server is reading data from the client, or if
        the client tracebacks in the middle of an SSL connection.  This should
        help keep the server more robust.  Operational latencies are also
        reduced by this change and the one to the BuildMaster.




Index: BuildJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildJob.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- BuildJob.py	24 Jun 2005 12:51:22 -0000	1.8
+++ BuildJob.py	24 Jun 2005 17:24:20 -0000	1.9
@@ -94,13 +94,22 @@
         self.srpm_http_path = None
         # Deal with straight SRPM builds
         if self.no_cvs and self.curstage is 'initialize':
-            self.curstage = 'make_srpm'
+            self._set_cur_stage('make_srpm')
         self.repofiles = {}
         threading.Thread.__init__(self)
 
     def get_cur_stage(self):
         return self.curstage
-        
+
+    def _set_cur_stage(self, stage):
+        """ Update our internal job stage, and notify the BuildMaster that
+            we've changed as well.
+        """
+        oldstage = self.curstage
+        self.curstage = stage
+        if oldstage != stage:
+            self.bm.queue_job_status_update(self.uid, stage)
+
     def get_uid(self):
         return self.uid
         
@@ -215,7 +224,7 @@
 
         
     def _checkout(self):
-        self.curstage = 'checkout'
+        self._set_cur_stage('checkout')
         dir_prefix = self.cvs_tag + "-"
         self.checkout_tmpdir = tempfile.mkdtemp(prefix=dir_prefix, dir=CONFIG.get('tmpdir'))
         os.chdir(self.checkout_tmpdir)
@@ -228,7 +237,7 @@
             subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
             msg = "could not check out %s from %s - output was:\n %s" % (self.cvs_tag, self.target, o)
             self.email_result(resultstring=msg, subject=subj)
-            self.curstage = 'finished'
+            self._set_cur_stage('finished')
             self.failed = True
             shutil.rmtree(self.checkout_tmpdir, True)
             return
@@ -246,20 +255,20 @@
                 subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
                 msg = "could not check out common directory - output was:\n %s" % (self.cvs_tag, self.target, o)
                 self.email_result(resultstring=msg, subject=subj)
-                self.curstage = 'finished'
+                self._set_cur_stage('finished')
                 self.failed = True
                 shutil.rmtree(self.checkout_tmpdir, True)
                 return
 
     def _make_srpm(self):
-        self.curstage = 'make_srpm'
+        self._set_cur_stage('make_srpm')
         self.srpm_path = None
         srpm_dir = os.path.join(self.checkout_tmpdir, self.package, self.target)
         if not os.path.exists(srpm_dir):
             subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
             msg = "could not find path %s for %s." % (srpm_dir, self.cvs_tag)
             self.email_result(resultstring=msg, subject=subj)
-            self.curstage = 'finished'
+            self._set_cur_stage('finished')
             self.failed = True
             shutil.rmtree(self.checkout_tmpdir, True)
             return
@@ -273,7 +282,7 @@
             subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
             msg = "could not make srpm for %s - output was:\n %s" % (self.cvs_tag, o)
             self.email_result(resultstring=msg, subject=subj)
-            self.curstage = 'finished'
+            self._set_cur_stage('finished')
             self.failed = True
             shutil.rmtree(self.checkout_tmpdir, True)
             return
@@ -289,14 +298,14 @@
             subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
             msg = "could not find srpm for %s - output was:\n %s" % (self.cvs_tag, o)
             self.email_result(resultstring=msg, subject=subj)
-            self.curstage = 'finished'
+            self._set_cur_stage('finished')
             self.failed = True
             shutil.rmtree(self.checkout_tmpdir, True)
             return
         self.srpm_path = srpmpath
 
     def _prep(self):
-        self.curstage = 'prep'
+        self._set_cur_stage('prep')
 
         # In SRPM-only mode, cvs_tag is path to the SRPM to build
         if self.no_cvs:
@@ -313,7 +322,7 @@
 
         if len(self.buildarches) == 0:
             self.failed = True
-            self.curstage = 'finished'
+            self._set_cur_stage('finished')
             return
 
         self.stage_dir = self._make_stage_dir(CONFIG.get('server_work_dir'))
@@ -364,7 +373,6 @@
                 do_sleep = True
             elif oldstage == 'repodone':
                 self._succeeded()
-            print "%s/%s" % (oldstage, self.curstage)
 
             # Only some stages need to sleep because we stay in
             # them for a while.
@@ -383,7 +391,7 @@
                     log("%s (%s/%s): Builder UID is %s" % (self.uid, self.package, arch, job.jobid))
 
     def _monitor(self):
-        self.curstage = 'building'
+        self._set_cur_stage('building')
 
         # Deal with jobs whose build client is no longer responding
         for job in self.sub_jobs.values():
@@ -407,10 +415,10 @@
                     self.failed = True
 
         if self.failed or (have_jobs == True and jobs_running == False):
-            self.curstage = 'finished'
+            self._set_cur_stage('finished')
 
     def _cleanup(self):
-        self.curstage = 'cleanup'
+        self._set_cur_stage('cleanup')
         if self.failed:
             # Kill remaining jobs on other arches
             for job in self.sub_jobs.values():
@@ -422,8 +430,7 @@
         return self.stage_dir
 
     def _failed(self):
-        old_stage = self.curstage
-        self.curstage = 'failed'
+        self._set_cur_stage('failed')
 
         resultstring = """
    %s (%s): %s on %s failed to complete on one or more archs.
@@ -437,9 +444,11 @@
 
         resultstring = resultstring + "\n"
         self.email_result(resultstring)
+
+        self.bm.notify_job_done(self)
         
     def _add_to_repo(self):
-        self.curstage = 'addtorepo'
+        self._set_cur_stage('addtorepo')
 
         # Create a list of files that the repo should copy to
         # the repo dir
@@ -460,12 +469,13 @@
             self.repo.request_copy(self)
 
     def repo_add_callback(self):
-        self.curstage = 'repodone'
+        self._set_cur_stage('repodone')
 
     def _succeeded(self):
-        self.curstage = 'needsign'
+        self._set_cur_stage('needsign')
         resultstring = "%s (%s): Build on target %s succeeded." % (self.uid, self.name, self.target)
         self.email_result(resultstring)
+        self.bm.notify_job_done(self)
 
     def email_result(self, resultstring, subject=None):
         """send 'resultstring' to self.email from self.email_from"""


Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- BuildMaster.py	24 Jun 2005 12:51:22 -0000	1.5
+++ BuildMaster.py	24 Jun 2005 17:24:20 -0000	1.6
@@ -25,7 +25,7 @@
 import Repo
 
 
-def ensure_build_db_tables(dbcx):
+def ensure_job_db_tables(dbcx):
     """ Central routine to create the database table structure """
 
     curs = dbcx.cursor()
@@ -58,63 +58,174 @@
         self.hostname = hostname
         self.building_jobs = []
         self.should_stop = False
-        self.dbcx = sqlite.connect("jobdb", encoding="utf-8", timeout=4)
-        self.curs = self.dbcx.cursor()
         self.repos = {}
         for target in CONFIG.get('targets').keys():
             repo = Repo.Repo(target, client_manager)
             self.repos[target] = repo
             repo.start()
-            print "Created repo %s" % target
-        ensure_build_db_tables(self.dbcx)
+
+        self._done_queue = []
+        self._done_queue_lock = threading.Lock()
+        self._new_queue = []
+        self._new_queue_lock = threading.Lock()
+        self._status_updates = {}
+        self._status_updates_lock = threading.Lock()
+
+        self.dbcx = sqlite.connect("jobdb", encoding="utf-8", timeout=3)
+        self.curs = self.dbcx.cursor()
+        ensure_job_db_tables(self.dbcx)
+
         threading.Thread.__init__(self)
 
     def __del__(self):
         self.dbcx.close()
+        del self.bcm
 
     def stop(self):
         self.should_stop = True
         for repo in self.repos.values():
             repo.stop()
 
-    def set_job_status(self, job):
-        status = job.get_cur_stage()
-        job_uid = job.get_uid()
-        self.curs.execute('UPDATE jobs SET status="%s" WHERE uid=%d' \
-                % (status, job_uid))
+    def _job_desc_template(self, email, package, target, buildreq, time):
+        """ Fill in fields common to both SRPM and CVS jobs """
+
+        job_desc = {}
+        job_desc['email'] = email
+        job_desc['package'] = package
+        job_desc['target'] = target
+        job_desc['buildreq'] = buildreq
+        job_desc['time'] = time
+        return job_desc
+
+    def enqueue(self, email, package, cvs_tag, target, buildreq, time):
+        job_desc = self._job_desc_template(email, package, target, buildreq, time)
+        job_desc['cvs_tag'] = cvs_tag
+
+        self._new_queue_lock.acquire()
+        self._new_queue.append(job_desc)
+        self._new_queue_lock.release()
+
+    def enqueue_srpm(self, email, package, srpm_path, target, buildreq, time):
+        job_desc = self._job_desc_template(email, package, target, buildreq, time)
+        job_desc['srpm_path'] = srpm_path
+
+        self._new_queue_lock.acquire()
+        self._new_queue.append(job_desc)
+        self._new_queue_lock.release()
+
+    def queue_job_status_update(self, uid, status):
+        self._status_updates_lock.acquire()
+        self._status_updates[uid] = status
+        self._status_updates_lock.release()
+
+    def notify_job_done(self, job):
+        self._done_queue_lock.acquire()
+        self._done_queue.append(job)
+        self._done_queue_lock.release()
+
+    def _process_finished_jobs(self):
+        self._done_queue_lock.acquire()
+
+        for job in self._done_queue:
+            curstage = job.get_cur_stage()
+            if curstage == 'failed' or curstage == 'needsign':
+                self._write_status_to_db(job.get_uid(), curstage)
+                print "%s (%s): Job finished." % (job.get_uid(), job.package)
+                self.building_jobs.remove(job)
+
+        self._done_queue = []
+        self._done_queue_lock.release()
 
     def getClientManager(self):
         return self.bcm
 
+    def _write_status_to_db(self, uid, status):
+        try:
+            self.curs.execute('UPDATE jobs SET status="%s" WHERE uid=%d' \
+                % (status, uid))
+        except sqlite.OperationalError, e:
+            print "DB Error: could not access jobs database. Reason: '%s'" % e
+
+        self.dbcx.commit()
+
+    def _save_job_status(self):
+        # Write new job status to the database
+        self._status_updates_lock.acquire()
+        
+        for uid in self._status_updates.keys():
+            self._write_status_to_db(uid, self._status_updates[uid])
+
+        self._status_updates = {}
+        self._status_updates_lock.release()
+
+    def _start_new_jobs(self):
+        self._new_queue_lock.acquire()
+
+        for item in self._new_queue:
+            if item.has_key('cvs_tag'):
+                locator = item['cvs_tag']
+            elif item.has_key('srpm_path'):
+                locator = item['srpm_path']
+            else:
+                print "Error: '%s' incorrect job type, needs to either be cvs_tag or srpm_path" % item['package']
+                continue
+
+            self.curs.execute('INSERT INTO jobs (uid, username, package,' \
+                    ' cvs_tag, target, buildreq, time_submitted, status)' \
+                    ' VALUES (NULL, "%s", "%s", "%s", "%s", "%s", %d, "%s")' \
+                    % (item['email'], item['package'], locator, item['target'], \
+                    item['buildreq'], item['time'], 'waiting'))
+            self.dbcx.commit()
+
+            # Find the UID
+            self.curs.execute('SELECT uid FROM jobs WHERE username="%s" AND' \
+                    ' package="%s" AND cvs_tag="%s" AND target="%s" AND' \
+                    ' buildreq="%s" AND time_submitted=%d AND status="waiting"' \
+                    % (item['email'], item['package'], locator, item['target'], \
+                    item['buildreq'], item['time']))
+            self.dbcx.commit()
+
+            data = self.curs.fetchall()
+            # If two of the same job are submitted close together, we need
+            # to make sure we pick the last result to get the correct one
+            row = data[len(data) - 1]
+            repo = self.repos[item['target']]
+            job = BuildJob.BuildJob(row['uid'], item['email'], item['package'],
+                    locator, repo, self, self.hostname)
+
+            print "%s (%s): Starting tag '%s' on target '%s'" % (row['uid'], \
+                    item['package'], locator, item['target'])
+
+            self.building_jobs.append(job)
+            job.start()
+
+        self._new_queue = []
+
+        self._new_queue_lock.release()
+
     def run(self):
         while self.should_stop == False:
+
+            # Write update status for jobs to the database
+            self._save_job_status()
+
             # Update all build clients and known jobs
             self.bcm.process()
 
-            # Allow each job some processing time
-            for job in self.building_jobs:
-                self.set_job_status(job)
-                if job.get_cur_stage() == 'failed' or job.get_cur_stage() == 'needsign':
-                    print "%s (%s): Job finished." % (job.get_uid(), job.package)
-                    self.building_jobs.remove(job)
-
-            # Grab one waiting job from database and start it
-            try:
-                self.curs.execute('SELECT uid, username, package, cvs_tag, target' \
-                        ' FROM jobs WHERE status="waiting"')
-            except sqlite.OperationalError, e:
-                pass
-            else:
-                self.dbcx.commit()
-                item = self.curs.fetchone()
-                if item:
-                    print "%s (%s): Starting tag '%s' on target '%s'" % (item['uid'], \
-                            item['package'], item['cvs_tag'], item['target'])
-                    repo = self.repos[item['target']]
-                    job = BuildJob.BuildJob(item['uid'], item['username'], item['package'],
-                            item['cvs_tag'], repo, self, self.hostname)
-                    self.building_jobs.append(job)
-                    job.start()
+            # Clean up jobs that have finished
+            self._process_finished_jobs()
+
+            # Start any new jobs
+            self._start_new_jobs()
 
-            time.sleep(5)
+            last_time = time.time()
+            have_work = False
+            while not have_work and time.time() <= last_time + 5:
+                time.sleep(0.25)
+
+                # Break out early if there's work to do
+                self._new_queue_lock.acquire()
+                if len(self._new_queue) > 0:
+                    have_work = True
+                self._new_queue_lock.release()
 


Index: UserInterface.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/UserInterface.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- UserInterface.py	24 Jun 2005 10:49:26 -0000	1.7
+++ UserInterface.py	24 Jun 2005 17:24:20 -0000	1.8
@@ -46,36 +46,9 @@
     Base UserInterface class. NO AUTHENTICATION.  Subclass this to provide some.
     """
 
-    def __init__(self, client_manager):
-        self.bcm = client_manager
-        self.dbcx = sqlite.connect("jobdb", encoding="utf-8", timeout=2)
-        self.curs = self.dbcx.cursor()
-        BuildMaster.ensure_build_db_tables(self.dbcx)
-
-
-    def __del__(self):
-        self.dbcx.close()
-
-
-    def _insert_job(self, email, package, cvs_tag, target, buildreq, time):
-        # Insert request into the database
-        self.curs.execute('INSERT INTO jobs (uid, username, package,' \
-                ' cvs_tag, target, buildreq, time_submitted, status)' \
-                ' VALUES (NULL, "%s", "%s", "%s", "%s", "%s", %d, "%s")' \
-                % (email, package, cvs_tag, target, buildreq, time, 'waiting'))
-        self.dbcx.commit()
-
-        # Find the UID
-        self.curs.execute('SELECT uid FROM jobs WHERE username="%s" AND' \
-                ' package="%s" AND cvs_tag="%s" AND target="%s" AND' \
-                ' buildreq="%s" AND time_submitted=%d AND status="waiting"' \
-                % (email, package, cvs_tag, target, buildreq, time))
-        self.dbcx.commit()
-        data = self.curs.fetchall()
-        # If two of the same job are submitted close together, we need
-        # to make sure we pick the last result to get the correct one
-        item = data[len(data) - 1]
-        return item['uid']
+    def __init__(self, client_manager, build_master):
+        self._cm = client_manager
+        self._bm = build_master
 
 
     def enqueue(self, email, package, cvs_tag, target, buildreq=None):
@@ -97,8 +70,8 @@
                     "%s: target does not exist." % (cvs_tag, target))
             return (-1, "This build server does not support the target %s." % target)
         else:
-            jobid = self._insert_job(email, package, cvs_tag, target, buildreq, time.time())
-        return (0, "Success: package has been queued.", jobid)
+            self._bm.enqueue(email, package, cvs_tag, target, buildreq, time.time())
+        return (0, "Success: package has been queued.")
 
 
     def enqueue_srpm(self, email, package, srpm_file, target, buildreq=None):
@@ -126,8 +99,8 @@
                     "%s: target does not exist." % (srpm_file, target))
             return (-1, "This build server does not support the target %s." % target)
         else:
-            jobid = self._insert_job(email, package, srpm_file, target, buildreq, time.time())
-        return (0, "Success: package has been queued.", jobid)
+            self._bm.enqueue_srpm(email, package, srpm_file, target, buildreq, time.time())
+        return (0, "Success: package has been queued.")
 
 
     def list_jobs(self, args_dict):
@@ -145,9 +118,9 @@
 
         if len(search):
             sql = sql + search
-        self.curs.execute(sql)
-        self.dbcx.commit()
-        data = self.curs.fetchall()
+        self._curs.execute(sql)
+        self._dbcx.commit()
+        data = self._curs.fetchall()
         job_list = []
         for row in data:
             tempX = [ item for item in row]
@@ -157,13 +130,13 @@
 
     def update_clients(self):
         reload(CONFIG)
-        client_list = self.bcm.update_clients()
+        client_list = self._cm.update_clients()
         return (0, "Success.", client_list)
 
 
     def list_clients(self):
         reload(CONFIG)
-        client_list = self.bcm.list_clients()
+        client_list = self._cm.list_clients()
         return (0, "Success.", client_list)
 
 


Index: buildserver.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/buildserver.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- buildserver.py	17 Jun 2005 03:10:29 -0000	1.8
+++ buildserver.py	24 Jun 2005 17:24:20 -0000	1.9
@@ -101,10 +101,10 @@
     # Create the BuildMaster XMLRPC server
     ui = None
     if CONFIG.get('ssl_frontend') == True:
-        ui = UserInterfaceSSLAuth(bcm)
+        ui = UserInterfaceSSLAuth(bcm, bm)
         bm_server = AuthenticatedSSLXMLRPCServer(ui_certs, (hostname, 8887))
     else:
-        ui = UserInterfaceNoAuth(bcm)
+        ui = UserInterfaceNoAuth(bcm, bm)
         bm_server = MyXMLRPCServer((hostname, 8887))
     bm_server.register_instance(ui)
 
@@ -120,6 +120,7 @@
         # Make sure the BuildMaster thread shuts down
         print "Shutting down..."
         bm.stop()
+        del bm
 
     print "Done."
     os._exit(0)


Index: client_manager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/client_manager.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -r1.22 -r1.23
--- client_manager.py	24 Jun 2005 12:51:22 -0000	1.22
+++ client_manager.py	24 Jun 2005 17:24:20 -0000	1.23
@@ -208,11 +208,11 @@
         self.status = 'done'
 
 
-class BuildClient:
+class BuildClient(threading.Thread):
     """ Tracks all jobs on a build client """
 
     def __init__(self, manager, address):
-        self._cur_job_lock = threading.Lock()
+        self._cur_job = None
         self._manager = manager
         self._jobs = []
         self._address = address
@@ -223,11 +223,14 @@
             self._arches = self._server.supported_arches()
         except socket.error, e:
             del self._server
-            self._valid = False
+            self._alive = False
             return
-        self._valid = True
+        self._alive = True
         self._arches.append('noarch')
-        self._update_cur_job()
+        self._server_lock = threading.Lock()
+        self._stop = False
+        self._prepping_jobs = False
+        threading.Thread.__init__(self)
 
     def arches(self):
         return self._arches
@@ -235,28 +238,30 @@
     def address(self):
         return self._address
 
-    def valid(self):
-        return self._valid
+    def alive(self):
+        return self._alive
     
     def start_arch_job(self, parent_job, target, arch, srpm_url):
+        self._server_lock.acquire()
+
         # Can be called from other threads (notably from BuildJob threads)
+        job = None
         if arch in self._arches and self.available():
             job = BuildClientJob(self, parent_job, self._server, target, arch, srpm_url)
             if job.valid():
                 self._jobs.append(job)
-                self._update_cur_job()
-                return job
             else:
                 del job
+                job = None
+        self._server_lock.release()
 
-        return None
+        self._update_cur_job()
+        return job
 
     def _update_cur_job(self):
-        # Need to do some locking here since BuildJobs (which are their own thread)
-        # can call through to our start_arch_job() method at any time
-        self._cur_job_lock.acquire()
-
         cur_job = None
+
+        self._server_lock.acquire()
         try:
             cur_job = self._server.get_cur_job()
         except socket.error, e:
@@ -276,46 +281,47 @@
             if cur_job == 0:
                 cur_job = None
 
+        self._server_lock.release()
+
         self._cur_job = cur_job
-        self._cur_job_lock.release()
 
-    def process(self):
-        self._update_cur_job()
+    def stop(self):
+        self._stop = True
+
+    def run(self):
+        while not self._stop:
+            self._update_cur_job()
+
+            # If we haven't been able to contact the BuildClient for a bit, kill build
+            # jobs on this BuildClient
+            if self._unavail_count > 2:
+                for job in self._jobs:
+                    job.builder_gone()
+                    self._jobs.remove(job)
+                self._alive = False
+                self._stop = True
+                continue
 
-        # If we haven't been able to contact the BuildClient for a bit, kill build
-        # jobs on this BuildClient
-        if self._unavail_count > 2:
-            for job in self._jobs:
-                job.builder_gone()
-                self._jobs.remove(job)
-            # Return -1 to indicate we should be killed
-            return -1
-
-        # Update status of all jobs
-        for j in self._jobs:
-            j.process()
+            # Update status of all jobs
+            prep_jobs = False
+            for j in self._jobs:
+                j.process()
+                if j.valid() and j.get_status() == 'running':
+                    if j.get_builder_result() == 'prepping':
+                        prep_jobs = True
+            self._prepping_jobs = prep_jobs
 
-        return 0
+            time.sleep(3)
     
     def available(self):
-        if self._unavail_count > 2:
+        if self._unavail_count > 2 or not self._alive:
             return False
-
-        self._cur_job_lock.acquire()
-        cur_job = self._cur_job
-        self._cur_job_lock.release()
-
-        if cur_job:
+        if self._cur_job:
             return False
-        else:
-            return True
+        return True
 
     def any_prepping_jobs(self):
-        for job in self._jobs:
-            if job.valid() and job.get_status() == 'running':
-                if job.get_builder_result() == 'prepping':
-                    return True
-        return False
+        return self._prepping_jobs
 
     def to_dict(self):
         client_dict = {}
@@ -327,6 +333,7 @@
             client_dict['status'] = 'idle'
         return client_dict
 
+
 class BuildClientManager:
     def __init__(self):
         # List of addresses of possible builders
@@ -345,6 +352,12 @@
             print string
         print ""
 
+    def __del__(self):
+        for client in self.running_clients:
+            client.stop()
+        time.sleep(2)
+        for client in self.running_clients:
+            del client
 
     def update_clients(self):
         client_list = []
@@ -360,8 +373,9 @@
             # Try to connect to client and add it to our client
             # list if we can
             client = BuildClient(self, address)
-            if client.valid():
+            if client.alive():
                 client_list.append(client.to_dict())
+                client.start()
                 self.running_clients.append(client)
             else:
                 del client
@@ -377,13 +391,11 @@
     def process(self):
         """ Allow each BuildClient to update its status and do some processing """
         for client in self.running_clients:
-            if client.process() == -1:
+            if not client.alive():
                 print "Removing BuildClient '%s' because it timed out." % client.address()
+                client.stop()
                 self.running_clients.remove(client)
-
-    def track_job(self, job):
-        if job:
-            job.bci.track_job(job)
+                del client
 
     def start_arch_job(self, parent_job, target, arch, srpm_url):
         """ Create a job on a free builder for this arch """
@@ -400,7 +412,7 @@
     def any_prepping_clients(self):
         # query each build client for any jobs that are in the 'prepping' state
         for client in self.running_clients:
-            if client.any_prepping_jobs():
+            if client.alive() and client.any_prepping_jobs():
                 return True
         return False
 




More information about the fedora-extras-commits mailing list