extras-buildsys/server BuildMaster.py, 1.12, 1.13 BuilderManager.py, 1.1, 1.2 PackageJob.py, 1.4, 1.5

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Sun Jul 10 19:59:35 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv11822/server

Modified Files:
	BuildMaster.py BuilderManager.py PackageJob.py 
Log Message:
2005-07-10  Dan Williams <dcbw at redhat.com>

    * server/PackageJob.py
      server/BuildMaster.py
        - Reduce total threadcount by breaking PackageJob runs into
            two threads, so that no thread executes while the package
            is waiting for a builder




Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- BuildMaster.py	8 Jul 2005 18:12:31 -0000	1.12
+++ BuildMaster.py	10 Jul 2005 19:59:32 -0000	1.13
@@ -282,7 +282,6 @@
             self._building_jobs_lock.acquire()
             self._building_jobs.append(job)
             self._building_jobs_lock.release()
-            job.start()
 
         self._new_queue = []
 


Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- BuilderManager.py	5 Jul 2005 21:08:03 -0000	1.1
+++ BuilderManager.py	10 Jul 2005 19:59:32 -0000	1.2
@@ -131,7 +131,7 @@
         for req in self._queue:
             parent = req['parent']
             stage = parent.get_cur_stage()
-            if (stage != 'prep') and (stage != 'building'):
+            if (stage != 'prep') and (stage != 'building') and (stage != 'waiting'):
                 self._queue.remove(req)
                 continue
             # Find a free builder for this request


Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- PackageJob.py	7 Jul 2005 18:17:51 -0000	1.4
+++ PackageJob.py	10 Jul 2005 19:59:32 -0000	1.5
@@ -64,10 +64,26 @@
     def __str__(self):
         return self.args
 
-        
-
-http_dir = os.path.join(config_opts['server_work_dir'], "srpm_http_dir")
 
+class PackageJobController(threading.Thread):
+    """
+    A class that controls PackageJob objects through specific phases.
+    Python seems to have issues with threads, so to reduce the total
+    running threadcount, we split the PackageJob runs up into two threads,
+    one for everything up to the "build" stage, and then the actual
+    "build" stage.  This class provides the actual running thread.
+    """
+    def __init__(self, pkg_job, start_stage, end_stage):
+        self._pkg_job = pkg_job
+        if not end_stage:
+            end_stage = 'aaaaa'
+        self._end_stage = end_stage
+        self._pkg_job._set_cur_stage(start_stage)
+        threading.Thread.__init__(self)
+        
+    def run(self):
+        while not self._pkg_job.is_done() and not self._pkg_job.get_cur_stage() == self._end_stage:
+            self._pkg_job.process()
 
 
 def is_build_job_stage_valid(stage):
@@ -75,21 +91,24 @@
     Validate a job stage.
     """
 
-    stages = ['initialize', 'checkout', 'make_srpm', 'prep', 'building', 'failed', 'addtorepo', 'repodone', 'needsign', 'finished', 'killed']
+    stages = ['initialize', 'checkout', 'make_srpm', 'prep', 'waiting', 'building', 'failed', 'addtorepo', 'repodone', 'needsign', 'finished', 'killed']
     if stage in stages:
         return True
     return False
 
 
-
-class PackageJob(threading.Thread):
+class PackageJob:
     """ Controller object for building 1 SRPM on multiple arches """
 
+    http_dir = os.path.join(config_opts['server_work_dir'], "srpm_http_dir")
+
     def __init__(self, uid, username, package, cvs_tag, repo, buildmaster, hostname):
-        self.curstage = 'initialize'
+        self.curstage = ''
         self.bm = buildmaster
-        self.hostname = hostname
         self.uid = uid
+        pjc = PackageJobController(self, 'initialize', 'waiting')
+
+        self.hostname = hostname
         self.username = username
         self.starttime = time.time()
         self.endtime = None
@@ -109,7 +128,7 @@
         self.archjobs = {}
         self._archjobs_lock = threading.Lock()
         self._event = threading.Event()
-        threading.Thread.__init__(self)
+        pjc.start()
 
     def get_cur_stage(self):
         return self.curstage
@@ -332,7 +351,7 @@
         shutil.copy(self.srpm_path, self.stage_dir)
 
         # Must also copy SRPM to where the build client can get it over HTTP
-        http_pkg_path = self._make_stage_dir(http_dir)
+        http_pkg_path = self._make_stage_dir(self.http_dir)
         self.srpm_http_path = os.path.join(http_pkg_path, srpm)
         shutil.copy(self.srpm_path, self.srpm_http_path)
         self.srpm_path = srpm_in_dir
@@ -345,7 +364,7 @@
 
     def _request_one_arch_job(self, arch):
         # Construct SPRM URL
-        srpm_http_base = self.srpm_http_path[len(http_dir):]
+        srpm_http_base = self.srpm_http_path[len(self.http_dir):]
         if config_opts['ssl_builders'] == True:
             method = "https://"
         else:
@@ -368,6 +387,13 @@
         if self.archjobs[job.arch] != None:
             log("%s (%s/%s): Already have archjob for this arch (%s).  New job UID is %s." % (self.uid, self.package, job.arch, self.archjobs[job.arch].jobid, job.jobid))
         self.archjobs[job.arch] = job
+
+        # If this is the first archjob, that means we are now building.
+        # So we start up the second PackageJobController thread.
+        if self.curstage == 'waiting':
+            t = PackageJobController(self, 'building', None)
+            t.start()
+
         self._archjobs_lock.release()
         log("%s (%s/%s): Builder UID is %s" % (self.uid, self.package, job.arch, job.jobid))
 
@@ -401,48 +427,48 @@
     def wake(self):
         self._event.set()
 
-    def run(self):
-        while not self.is_done():
-            # Advance to next stage based on current stage
+    def process(self):
+        # Advance to next stage based on current stage
+        try:
             wait = False
-
-            try:
-                if self.curstage == 'initialize':
-                    self._checkout()
-                elif self.curstage == 'checkout':
-                    self._make_srpm()
-                elif self.curstage == 'make_srpm':
-                    self._prep()
-                elif self.curstage == 'prep' or self.curstage == 'building':
-                    wait = self._monitor()
-                elif self.curstage == 'finished':
-                    self._add_to_repo()
-                elif self.curstage == 'addtorepo':
-                    wait = True
-                elif self.curstage == 'repodone':
-                    self._succeeded()
-            except PrepError, e:
-                if not self.no_cvs:
-                    shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
-                subj = 'Prep Error (Job %s): %s on %s' % (self.uid, self.cvs_tag, self.target)
-                self.email_result(resultstring=e.args, subject=subj)
-                self._failed()
-            except BuildError, e:
-                subj = 'Build Error (Job %s): %s on %s' % (self.uid, self.cvs_tag, self.target)
-                self.email_result(resultstring=e.args, subject=subj)
-                # Kill remaining jobs on other arches
-                self._archjobs_lock.acquire()
-                for job in self.archjobs.values():
-                    if job:
-                        job.die()
-                self._archjobs_lock.release()
-                self._failed()
-            else:
-                # Wait to be woken up when long-running operations complete
-                if wait:
-                    while not self._event.isSet():
-                        self._event.wait()
-                    self._event.clear()
+            if self.curstage == 'initialize':
+                self._checkout()
+            elif self.curstage == 'checkout':
+                self._make_srpm()
+            elif self.curstage == 'make_srpm':
+                self._prep()
+                self._set_cur_stage('waiting')
+            # When prep is done, the first thread ends
+            elif self.curstage == 'building':
+                wait = self._monitor()
+            elif self.curstage == 'finished':
+                self._add_to_repo()
+            elif self.curstage == 'addtorepo':
+                wait = True
+            elif self.curstage == 'repodone':
+                self._succeeded()
+        except PrepError, e:
+            if not self.no_cvs:
+                shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
+            subj = 'Prep Error (Job %s): %s on %s' % (self.uid, self.cvs_tag, self.target)
+            self.email_result(resultstring=e.args, subject=subj)
+            self._failed()
+        except BuildError, e:
+            subj = 'Build Error (Job %s): %s on %s' % (self.uid, self.cvs_tag, self.target)
+            self.email_result(resultstring=e.args, subject=subj)
+            # Kill remaining jobs on other arches
+            self._archjobs_lock.acquire()
+            for job in self.archjobs.values():
+                if job:
+                    job.die()
+            self._archjobs_lock.release()
+            self._failed()
+        else:
+            # Wait to be woken up when long-running operations complete
+            if wait:
+                while not self._event.isSet():
+                    self._event.wait()
+                self._event.clear()
 
     def _monitor(self):
         self._set_cur_stage('building')




More information about the fedora-extras-commits mailing list