extras-buildsys/server ArchJob.py, 1.21, 1.22 BuildMaster.py, 1.35, 1.36 Builder.py, 1.23, 1.24 BuilderManager.py, 1.18, 1.19 Config.py, 1.10, 1.11 PackageJob.py, 1.36, 1.37 Repo.py, 1.18, 1.19

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Sat Nov 26 06:10:25 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv17265/server

Modified Files:
	ArchJob.py BuildMaster.py Builder.py BuilderManager.py 
	Config.py PackageJob.py Repo.py 
Log Message:
2005-11-26  Dan Williams  <dcbw at redhat.com>

    * First cut of depsolving code.  You need to add a
        'mock_configs_dir' option in your plague-server.cfg
        in the Directories section that should usually point
        to /etc/mock




Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- ArchJob.py	18 Nov 2005 14:33:59 -0000	1.21
+++ ArchJob.py	26 Nov 2005 06:10:22 -0000	1.22
@@ -40,10 +40,11 @@
     def __init__(self, builder, cfg, server, par_job, jobid, target_dict):
         self.par_job = par_job
         self.builder = builder
+        self._repo = par_job.repo()
         self._server = server
         self._use_ssl = cfg.get_bool("Builders", "use_ssl")
         self.jobid = jobid
-        self._status = 'running'
+        self._status = 'starting'
         self._builder_status = ''
         self._failure_noticed = False
         self._download_failed = False
@@ -56,6 +57,7 @@
         self._die = False
         self._die_user_requested = False
         self._die_lock = threading.Lock()
+        self._prepping = False
 
         # SSL certificate and key filenames
         if self._use_ssl:
@@ -85,10 +87,8 @@
     def download_failed(self):
         return self._download_failed
 
-    def builder_prepping(self):
-        if self._builder_status == 'prepping':
-            return True
-        return False
+    def prepping(self):
+        return self._prepping
 
     def arch(self):
         return self._target_dict['arch']
@@ -159,13 +159,32 @@
             return True
         return False
 
-    def _status_running(self):
+    def _status_starting(self):
         # Builders pause before they enter the 'prep' state (which accesses
         # the repo for this target), and wait for the server to allow them
         # to proceed when the repo is unlocked.
         if self._builder_status == 'downloaded':
-            if not self.par_job.repo.locked():
-                self._send_repo_unlocked()
+            self._set_status('repo_wait')
+            self._repo.request_unlock(self)
+            
+    def _status_repo_wait(self):
+        pass
+
+    def repo_unlocked_callback(self):
+        if self._status == 'repo_wait':
+            self._set_status('repo_unlock')        
+
+    def _status_repo_unlock(self):
+        # Builder will be in 'downloaded' state until
+        # it notices that the repo has been unlocked 
+        self._send_repo_unlocked()
+        self._prepping = True
+        if self._builder_status != 'downloaded':
+            self._set_status('running')
+
+    def _status_running(self):
+        if self._builder_status != 'prepping':
+            self._prepping = False
 
         # if the builder is done, grab list of files to download
         if self._builder_finished():
@@ -276,6 +295,8 @@
         self._die_lock.release()
         if should_die:
             self._server.die(self.jobid)
+            if self._status == 'repo_wait':
+                self._repo.cancel_unlock_request(self)
             self._set_status('done')
             if user_requested:
                 print "%s (%s/%s): %s - killed." % (self.par_job.uid, self.par_job.package,
@@ -317,7 +338,7 @@
 
     def die(self, user_requested=False):
         # Can be called from other threads
-        if self._status == 'running':
+        if self._status == 'running' or self._status == 'repo_wait' or self._status == 'starting' or self._status == 'repo_unlock':
             self._die_lock.acquire()
             self._die = True
             self._die_user_requested = user_requested


Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -r1.35 -r1.36
--- BuildMaster.py	9 Sep 2005 15:10:17 -0000	1.35
+++ BuildMaster.py	26 Nov 2005 06:10:22 -0000	1.36
@@ -214,8 +214,8 @@
             self._status_updates_lock.release()
 
             attrdict = {}
-            attrdict['status'] = job.get_cur_stage()
-            attrdict['result'] = job.get_result()
+            attrdict['status'] = job.cur_stage()
+            attrdict['result'] = job.result()
             self._write_job_status_to_db(uid, attrdict)
 
             # Update job end time


Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- Builder.py	25 Nov 2005 04:45:12 -0000	1.23
+++ Builder.py	26 Nov 2005 06:10:22 -0000	1.24
@@ -57,13 +57,12 @@
         self._when_died = 0
         self._server_cfg = cfg
 
-        certs = {}
+        certs = None
         if self._server_cfg.get_bool("Builders", "use_ssl"):
+            certs = {}
             certs['key_and_cert'] = self._server_cfg.get_str("SSL", "server_key_and_cert")
             certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
             certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
-        else:
-            certs = None
 
         self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs, timeout=20)
         self._server_lock = threading.Lock()
@@ -191,17 +190,14 @@
     def _update_building_jobs(self):
         jobs = self._building_jobs()
 
-        # Update the current job's status
+        # Update status for all jobs on this builder
         if self._unavail_count == 0:
-            self._prepping_jobs = False
             builder_jobs = []
             for jobid in jobs.keys():
                 try:
                     job = self._jobs[jobid]
                     status = jobs[jobid]
                     job.set_builder_job_status(status)
-                    if status == 'prepping':
-                        self._prepping_jobs = True
                     builder_jobs.append(jobid)
                 except KeyError:
                     pass
@@ -211,6 +207,7 @@
             # may have finished on the builder and was
             # removed from the building job list before we
             # were able to know that it was done.  HACK
+            self._prepping_jobs = False
             for jobid in self._jobs.keys():
                 # If the builder didn't report this job as building,
                 # and its not done, explicitly grab its status
@@ -220,6 +217,10 @@
                     if status:
                         job.set_builder_job_status(status)
 
+                # Check for prepping jobs
+                if job.prepping():
+                    self._prepping_jobs = True
+
     def _get_builder_job_status(self, jobid):
         """ Get the status of one job on the builder """
         status = None


Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- BuilderManager.py	14 Sep 2005 11:53:42 -0000	1.18
+++ BuilderManager.py	26 Nov 2005 06:10:22 -0000	1.19
@@ -148,8 +148,8 @@
         new_jobs = {}
         for req in self._queue:
             parent = req['parent']
-            stage = parent.get_cur_stage()
-            if (stage != 'prep') and (stage != 'building') and (stage != 'waiting'):
+            stage = parent.cur_stage()
+            if stage != 'building' and stage != 'waiting':
                 self._queue.remove(req)
                 continue
 
@@ -170,13 +170,13 @@
                 try:
                     job = builder.start_job(parent, req['target_dict'], req['srpm_url'])
                 except RuntimeError:
-                    pass
-                else:
-                    if not new_jobs.has_key(parent):
-                        new_jobs[parent] = []
-                    new_jobs[parent].append(job)
-                    self._queue.remove(req)
-                    break
+                    continue
+
+                if not new_jobs.has_key(parent):
+                    new_jobs[parent] = []
+                new_jobs[parent].append(job)
+                self._queue.remove(req)
+                break
         self._queue_lock.release()
 
         # Notify the parent jobs of their new archjobs.  Have to do this outside _queue_lock


Index: Config.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Config.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- Config.py	14 Nov 2005 04:56:02 -0000	1.10
+++ Config.py	26 Nov 2005 06:10:22 -0000	1.11
@@ -101,6 +101,7 @@
         self.set_option("Directories", "repo_dir", "/repodir")
         self.set_option("Directories", "tmpdir", "/tmp")
         self.set_option("Directories", "target_configs_dir", "/etc/plague/targets")
+        self.set_option("Directories", "mock_configs_dir", "/etc/mock")
 
         self.add_section("Email")
         self.set_option("Email", "email_from", "buildsys at foo.com")
@@ -144,10 +145,8 @@
         self.set_option("mysql Engine", "user", "plague")
         self.set_option("mysql Engine", "password", "")
 
-
         self.save()
 
-
 class TargetConfig(BaseConfig.BaseConfig):
     def __init__(self, cfg, filename):
         BaseConfig.BaseConfig.__init__(self, filename)
@@ -159,18 +158,37 @@
         self._parent_cfg = cfg
         self._distro = self.get_str("General", "distro")
         self._target = self.get_str("General", "target")
-        self._base_arches = self.get_str("Arches", "base_arches")
+        self._base_arches = self.get_list("Arches", "base_arches")
+        self._opt_arches = self.get_list("Arches", "optional_arches")
         self._repo = self.get_str("General", "repo")
         self._testing = self.get_bool("General", "testing")
 
+        self._mock_configs = self._find_mock_configs()
+
+    def _find_mock_configs(self):
+        mock_configs = {}
+        mock_config_dir = self._parent_cfg.get_str("Directories", "mock_configs_dir")
+        for arch in self._base_arches:
+            mock_config_file = "%s-%s-%s-%s.cfg" % (self._distro, self._target, arch, self._repo)
+            f = os.path.join(mock_config_dir, mock_config_file)
+            if not os.path.exists(f) or not os.access(f, os.R_OK):            
+                print """%s: Could not find mock config file for %s.  Each base_archs arch
+must have a matching mock config in mock_configs_dir for this target.""" % (self._filename, mock_config_file)
+                os._exit(0)
+            mock_configs[arch] = f
+        return mock_configs
+
     def parent_cfg(self):
         return self._parent_cfg
 
-    def target_dict(self):
+    def mock_config_for_arch(self, arch):
+        return self._mock_configs[arch]
+
+    def target_dict(self, arch=None):
         target_dict = {}
         target_dict['distro'] = self._distro
         target_dict['target'] = self._target
-        target_dict['arch'] = None  # meaningless for a server-side target
+        target_dict['arch'] = arch
         target_dict['repo'] = self._repo
         return target_dict
 
@@ -186,6 +204,9 @@
     def basearches(self):
         return self._base_arches
 
+    def optarches(self):
+        return self._base_arches
+
     def repo(self):
         return self._repo
 


Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -r1.36 -r1.37
--- PackageJob.py	15 Nov 2005 05:20:54 -0000	1.36
+++ PackageJob.py	26 Nov 2005 06:10:22 -0000	1.37
@@ -27,13 +27,17 @@
 import copy
 import string
 import EmailUtils
-import SimpleXMLRPCServer
 import xmlrpclib
 import socket
 import BuilderManager
 import ArchJob
 from plague import ArchUtils
 
+import yum
+import repomd.mdErrors
+from yum.logger import Logger
+
+
 CVS_CMD = "/usr/bin/cvs"
 MAKE_CMD = "/usr/bin/make"
 
@@ -45,12 +49,9 @@
 def log(stuff=''):
     print stuff
 
-class PrepError(exceptions.Exception):
-    def __init__(self, args=None):
-        exceptions.Exception.__init__(self)
-        self.args = args
-    def __str__(self):
-        return self.args
+class PrepError(exceptions.Exception): pass
+
+class DepError(exceptions.Exception): pass
         
 class BuildError(exceptions.Exception):
     def __init__(self, msg, arch):
@@ -78,13 +79,28 @@
         threading.Thread.__init__(self)
         
     def run(self):
-        while not self._pkg_job.is_done() and not self._pkg_job.get_cur_stage() == self._end_stage:
+        while not self._pkg_job.is_done() and not self._pkg_job.cur_stage() == self._end_stage:
             self._pkg_job.process()
 
 
 def is_package_job_stage_valid(stage):
     """ Validate a job stage """
-    stages = ['initialize', 'checkout_wait', 'checkout_wait_done', 'checkout', 'make_srpm', 'prep', 'waiting', 'building', 'build_done', 'add_to_repo', 'repodone', 'needsign', 'failed', 'finished']
+    stages = ['initialize',
+              'checkout_wait',
+              'checkout_wait_done',
+              'checkout',
+              'make_srpm',
+              'prep',
+              'depsolve',
+              'depsolve_wait',
+              'waiting',
+              'building',
+              'build_done',
+              'add_to_repo',
+              'repodone',
+              'needsign',
+              'failed',
+              'finished']
     if stage in stages:
         return True
     return False
@@ -110,8 +126,8 @@
     """ Controller object for building 1 SRPM on multiple arches """
 
     def __init__(self, uid, username, package, source, repo, buildmaster):
-        self.curstage = ''
-        self.result = 'in-progress'
+        self._curstage = ''
+        self._result = 'in-progress'
         self.bm = buildmaster
         self.uid = uid
         self.package = package
@@ -123,7 +139,7 @@
         self._target_cfg = repo.target_cfg()
         self._server_cfg = self._target_cfg.parent_cfg()
 
-        self.repo = repo
+        self._repo = repo
         self._target_str = self._target_cfg.target_string()
         self._target_dict = self._target_cfg.target_dict()
 
@@ -132,9 +148,11 @@
         self.endtime = 0
         self.use_cvs = self._server_cfg.get_bool("CVS", "use_cvs")
         self._source = source
-        self.result_dir = None
-        self.srpm_path = None
-        self.srpm_http_path = None
+        self._result_dir = None
+        self._srpm_path = None
+        self._srpm_http_path = None
+        self._depsolve_dir = None
+        self._last_depsolve_error = None
         self.repofiles = {}
         self.archjobs = {}
         self._archjobs_lock = threading.Lock()
@@ -151,22 +169,25 @@
         pjc = PackageJobController(self, first_stage, 'waiting')
         pjc.start()
 
-    def get_cur_stage(self):
-        return self.curstage
+    def cur_stage(self):
+        return self._curstage
+
+    def result(self):
+        return self._result
 
-    def get_result(self):
-        return self.result
+    def repo(self):
+        return self._repo
 
     def _set_cur_stage(self, stage, result_msg=None):
         """ Update our internal job stage, and notify the BuildMaster that
             we've changed as well.
         """
-        oldstage = self.curstage
-        self.curstage = stage
+        oldstage = self._curstage
+        self._curstage = stage
         if oldstage != stage:
             attrdict = {}
             attrdict['status'] = copy.copy(stage)
-            attrdict['result'] = copy.copy(self.result)
+            attrdict['result'] = copy.copy(self._result)
             if self.name and self.epoch and self.ver and self.release:
                 attrdict['epoch'] = self.epoch
                 attrdict['version'] = self.ver
@@ -186,8 +207,8 @@
         except KeyError:
             pass
 
-        base_arches = self._target_cfg.get_list("Arches", "base_arches")
-        opt_arches = self._target_cfg.get_list("Arches", "optional_arches")
+        base_arches = self._target_cfg.basearches()
+        opt_arches = self._target_cfg.optarches()
 
         # Remove arches we don't support from addl_arches
         for arch in addl_arches:
@@ -314,7 +335,7 @@
         if len(cvs_alias) > 0:
             cvs_target = cvs_alias
 
-        self.srpm_path = None
+        self._srpm_path = None
         srpm_dir = os.path.join(self.checkout_tmpdir, self.package, cvs_target)
         if not os.path.exists(srpm_dir):
             msg = "Error: could not find path %s for %s." % (srpm_dir, self._source)
@@ -345,35 +366,24 @@
             msg = "Error: could not find srpm for %s - output was:\n\n%s" % (self._source, o)
             raise PrepError(msg)
 
-        self.srpm_path = srpmpath
+        self._srpm_path = srpmpath
 
         self._set_cur_stage('prep')
         return False
 
-    def _make_stage_dir(self, rootdir):
-        # The dir will look like this:
-        # <rootdir>/devel/95-foo-1.1.0-23
-        pkgsubdir = '%d-%s-%s-%s' % (self.uid, self.name, self.ver, self.release)
-        stage_dir = os.path.join(rootdir, self._target_str, pkgsubdir)
-        if os.path.exists(stage_dir):
-            shutil.rmtree(stage_dir, ignore_errors=True)
-        os.makedirs(stage_dir)
-        return stage_dir
-
     def _stage_prep(self):
-
         # In SRPM-only mode, cvs_tag is path to the SRPM to build
         if self.use_cvs == False:
-            self.srpm_path = self._source
+            self._srpm_path = self._source
 
         # fail the job if we can't access the SRPM.  Can happen during
         # requeue of jobs when restarting the server.
-        if not os.path.exists(self.srpm_path) or not os.access(self.srpm_path, os.R_OK):
-            msg = "Could not access SRPM located at %s during prep stage." % self.srpm_path
+        if not os.path.exists(self._srpm_path) or not os.access(self._srpm_path, os.R_OK):
+            msg = "Could not access SRPM located at %s during prep stage." % self._srpm_path
             raise PrepError(msg)
 
         ts = rpmUtils.transaction.initReadOnlyTransaction()
-        hdr = rpmUtils.miscutils.hdrFromPackage(ts, self.srpm_path)
+        hdr = rpmUtils.miscutils.hdrFromPackage(ts, self._srpm_path)
         self.name = hdr['name']
         self.epoch = hdr['epoch']
         if not self.epoch:
@@ -391,42 +401,196 @@
 """ % (self._source, pkg_arches, allowed_arches)
             raise PrepError(msg)
 
+        self._set_cur_stage('depsolve_wait')
+        log("%s (%s): Requesting depsolve..." % (self.uid, self.package))
+        self._repo.request_depsolve(self, first_try=True)
+        return True  # sleep until the Repo wakes us up for depsolve
+
+    def _write_yum_conf(self, arch):
+        # Figure out which mock config file it is, and write out it's yum.conf
+        try:
+            mock_config = self._target_cfg.mock_config_for_arch(arch)
+        except KeyError:
+            return None
+
+        config_opts = {}
+        execfile(mock_config)
+        config_lines = []
+        job_yum_dir = os.path.join(self._depsolve_dir, arch)
+        for line in config_opts['yum.conf'].split('\n'):
+            if string.find(line, "cachedir=") >= 0:
+                line = "cachedir=%s" % os.path.join(job_yum_dir, "cache")
+            elif string.find(line, "logfile=") >= 0:
+                line = "logfile=%s" % os.path.join(job_yum_dir, "yum.log")
+            config_lines.append(line+'\n')
+        del config_opts
+
+        yum_config = os.path.join(job_yum_dir, "yum.conf")
+        if not os.path.exists(job_yum_dir):
+            os.makedirs(job_yum_dir)
+        f = open(yum_config, "w")
+        f.writelines(config_lines)
+        f.close()
+        
+        return yum_config
+
+    def _arch_deps_solved(self, arch):
+        # Returns: False if dep errors
+        #          True if all deps are solved
+
+        success = True
+        try:
+            base = yum.YumBase()
+            yum_config = self._write_yum_conf(arch)
+            base.doConfigSetup(fn=yum_config, root=os.path.dirname(yum_config))
+            threshold = 0
+            if DEBUG:
+                threshold = 5
+            base.log = Logger(threshold=threshold, file_object=sys.stdout)
+            try:
+                base.doRepoSetup()
+            except yum.Errors.RepoError, e:
+                raise DepError(str(e))
+
+            archlist = ['src', 'noarch']
+            if ArchUtils.supported_arches.has_key(arch):
+                archlist.extend(ArchUtils.supported_arches[arch])
+            else:
+                raise DepError("WARNING: arch %s was not in ArchUtils' supported_arches." % arch)
+
+            ts = rpmUtils.transaction.initReadOnlyTransaction()
+            try:
+                srpm = yum.packages.YumLocalPackage(ts, self._srpm_path)
+            except yum.Errors.MiscError, e:
+                del ts
+                raise DepError(str(e))
+            del ts
+
+            try:
+                base.doSackSetup(archlist)
+            except yum.Errors.RepoError, e:
+                raise DepError(str(e))
+
+            for dep in srpm.requiresList():
+                if dep.startswith("rpmlib("): continue
+                try:
+                    pkg = base.returnPackageByDep(dep)
+                except repomd.mdErrors.PackageSackError, e:
+                    raise DepError(str(e))
+                except yum.Errors.YumBaseError, e:
+                    raise DepError(str(e))
+        except DepError, e:
+            self._last_depsolve_error = str(e)
+            print "%s (%s/%s): Depsolve Error: %s" % (self.uid, self.package, arch, str(e))
+            success = False
+
+        del base, srpm
+        return success
+
+    def start_depsolve(self):
+        self._set_cur_stage('depsolve')
+        self.wake()
+
+    def _stage_depsolve(self):
+        """ Depsolve all arches, only if all pass do we proceed """
+        """ to queue up the actual archjobs for building """
+
+        # If the job's waited more than 8 hours for deps to be
+        # solved, kill the job and make some human figure it out
+        if time.time() > self.starttime + (60 * 60 * 8):
+            self._repo.notify_depsolve_done(self)
+            shutil.rmtree(self._depsolve_dir, ignore_errors=True)
+            raise DepError("The job's build dependencies couldn't be solved within 8 hours.  Last error: " % self._last_depsolve_error)
+
+        # Create the depsolve metadata cache dir
+        if not self._depsolve_dir:
+            server_work_dir = self._server_cfg.get_str("Directories", "server_work_dir")
+            self._depsolve_dir = os.path.join(server_work_dir, "depsolve", "%s-%s" % (self.uid, self.name))
+            if os.path.exists(self._depsolve_dir):
+                shutil.rmtree(self._depsolve_dir, ignore_errors=True)
+            os.makedirs(self._depsolve_dir)
+
+        self._archjobs_lock.acquire()
+        unsolved_deps = False
+        archlist = self.archjobs.keys()
+
+        # noarch jobs are a bit special here.  Since we don't know
+        # what arch they will actually build on, we have to make
+        # sure all arches the target supports will work
+        if len(self.archjobs.keys()) == 1 and self.archjobs.keys()[0] == 'noarch':
+            archlist = self._target_cfg.basearches()
+            for arch in self._target_cfg.optarches():
+                if arch not in archlist:
+                    archlist.append(arch)
+
+        log("%s (%s): Starting depsolve for arches: %s." % (self.uid, self.package, archlist))
+
+        for arch in archlist:
+            if self._arch_deps_solved(arch) == False:
+                unsolved_deps = True
+                break
+
+        self._archjobs_lock.release()
+
+        self._repo.notify_depsolve_done(self)
+
+        if unsolved_deps == True:
+            # Go to sleep until the repo changes
+            self._set_cur_stage('depsolve_wait')
+            self._repo.request_depsolve(self, first_try=False)
+            log("%s (%s): Finished depsolve (unsuccessful), trying again later." % (self.uid, self.package))
+            return True
+
+        log("%s (%s): Finished depsolve (successful), requesting archjobs." % (self.uid, self.package))
+
+        # Success, all deps are solved.  Kill the depsolve dir
+        shutil.rmtree(self._depsolve_dir, ignore_errors=True)
+        self._depsolve_dir = None
+
+        # Queue up the archjobs
         work_dir = self._server_cfg.get_str("Directories", "server_work_dir")
-        self.result_dir = self._make_stage_dir(work_dir)
+        self._result_dir = self._make_stage_dir(work_dir)
         for arch in self.archjobs.keys():
-            thisdir = os.path.join(self.result_dir, arch)
+            thisdir = os.path.join(self._result_dir, arch)
             if not os.path.exists(thisdir):
                 os.makedirs(thisdir)
 
         # Copy SRPM to where the builder can access it
         http_pkg_path = self._make_stage_dir(self.http_dir)
-        self.srpm_http_path = os.path.join(http_pkg_path, os.path.basename(self.srpm_path))
-        shutil.copy(self.srpm_path, self.srpm_http_path)
-        self.srpm_path = None
+        self._srpm_http_path = os.path.join(http_pkg_path, os.path.basename(self._srpm_path))
+        shutil.copy(self._srpm_path, self._srpm_http_path)
+        self._srpm_path = None
 
         # Remove CVS checkout and make_srpm dirs
         if self.use_cvs == True:
             shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
 
-        self._request_arch_jobs()
         self._set_cur_stage('waiting')
+        self._request_arch_jobs()
         return False
 
+    def _stage_depsolve_wait(self):
+        pass
+
+    def _make_stage_dir(self, rootdir):
+        # The dir will look like this:
+        # <rootdir>/devel/95-foo-1.1.0-23
+        pkgsubdir = '%d-%s-%s-%s' % (self.uid, self.name, self.ver, self.release)
+        stage_dir = os.path.join(rootdir, self._target_str, pkgsubdir)
+        if os.path.exists(stage_dir):
+            shutil.rmtree(stage_dir, ignore_errors=True)
+        os.makedirs(stage_dir)
+        return stage_dir
+
     def _request_one_arch_job(self, arch, orphaned):
         # Construct SPRM URL
-        srpm_http_base = self.srpm_http_path[len(self.http_dir):]
-        use_ssl = self._server_cfg.get_bool("Builders", "use_ssl")
-        if use_ssl == True:
+        srpm_http_base = self._srpm_http_path[len(self.http_dir):]
+        method = "http://"
+        if self._server_cfg.get_bool("Builders", "use_ssl") == True:
             method = "https://"
-        else:
-            method = "http://"
         hostname = self._server_cfg.get_str("General", "hostname")
         srpm_url = method + hostname + ":8886/" + srpm_http_base
-        target_dict = {}
-        target_dict['distro'] = self._target_cfg.get_str("General", "distro")
-        target_dict['target'] = self._target_cfg.get_str("General", "target")
-        target_dict['arch'] = arch
-        target_dict['repo'] = self._target_cfg.get_str("General", "repo")
+        target_dict = self._target_cfg.target_dict(arch)
         self.bm.builder_manager.request_arch_job(self, target_dict, srpm_url, orphaned)
 
     def _request_arch_jobs(self):
@@ -449,7 +613,7 @@
 
         # If this is the first archjob, that means we are now building.
         # So we start up the second PackageJobController thread.
-        if self.curstage == 'waiting':
+        if self._curstage == 'waiting':
             t = PackageJobController(self, 'building', None)
             t.start()
 
@@ -466,7 +630,7 @@
         self._archjobs_lock.release()
 
     def is_done(self):
-        if self.curstage == 'needsign' or self.curstage == 'failed' or self.curstage == 'finished':
+        if self._curstage == 'needsign' or self._curstage == 'failed' or self._curstage == 'finished':
             return True
         return False
 
@@ -476,7 +640,7 @@
 
         log("%s (%s): Job kill request from %s" % (self.uid, self.package, username))
         self._archjobs_lock.acquire()
-        if self.curstage == 'waiting':
+        if self._curstage == 'waiting':
             # In 'waiting' stage, we have no controller thread.  So to get
             # the job killed immediately, we have to start one
             t = PackageJobController(self, 'killed', None)
@@ -488,7 +652,7 @@
 
     def _handle_death(self):
         resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self._target_str, self._killer)
-        self.result = 'killed'
+        self._result = 'killed'
         self._set_cur_stage('finished', resultstring)
         self.email_result(self.username, resultstring)
         log(resultstring)
@@ -523,7 +687,7 @@
             return
 
         try:
-            func = getattr(self, "_stage_%s" % self.curstage)
+            func = getattr(self, "_stage_%s" % self._curstage)
             if func():
                 # Wait to be woken up when long-running operations complete
                 while not self._event.isSet():
@@ -532,9 +696,15 @@
         except PrepError, e:
             if self.use_cvs == True:
                 shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
+            msg = str(e)
             subj = 'Prep Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
-            self.email_result(self.username, resultstring=e.args, subject=subj)
-            self._stage_failed(e.args)
+            self.email_result(self.username, resultstring=msg, subject=subj)
+            self._stage_failed(msg)
+        except DepError, e:
+            msg = str(e)
+            subj = 'Dependencies Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
+            self.email_result(self.username, resultstring=msg, subject=subj)
+            self._stage_failed(msg)
         except BuildError, e:
             subj = 'Build Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
             base_url = self._server_cfg.get_str("UI", "log_url")
@@ -597,25 +767,25 @@
         return True
 
     def get_stage_dir(self):
-        return self.result_dir
+        return self._result_dir
 
     def _stage_failed(self, msg=None):
-        self.result = 'failed'
+        self._result = 'failed'
         self._set_cur_stage('failed', msg)
         self.endtime = time.time()
         self._cleanup_job_files()
         self.bm.notify_job_done(self)
 
     def _cleanup_job_files(self):
-        if not self.result_dir or not self.srpm_http_path:
+        if not self._result_dir or not self._srpm_http_path:
             return
 
         # If its a testing target, we keep the RPMs around since they don't
         # get copied to the repository, they only live in the repodir
-        if self.result == 'success' and self._target_cfg.testing() == True:
+        if self._result == 'success' and self._target_cfg.testing() == True:
             return
 
-        srpm_file = os.path.join(self.result_dir, os.path.basename(self.srpm_http_path))
+        srpm_file = os.path.join(self._result_dir, os.path.basename(self._srpm_http_path))
 
         # Delete any RPMs in the arch dirs
         for job in self.archjobs.values():
@@ -624,7 +794,7 @@
             for f in job.get_files():
                 if not f.endswith(".rpm"):
                     continue
-                src_file = os.path.join(self.result_dir, job.arch(), f)
+                src_file = os.path.join(self._result_dir, job.arch(), f)
                 if src_file.endswith(".src.rpm"):
                     # Keep an SRPM.  We prefer built SRPMs from builders over
                     # the original SRPM.
@@ -634,10 +804,10 @@
 
         # If there were no builder-built SRPMs, keep the original around
         if not os.path.exists(srpm_file):
-            shutil.copy(self.srpm_http_path, srpm_file)
+            shutil.copy(self._srpm_http_path, srpm_file)
 
         # Delete the SRPM in the server's HTTP dir
-        shutil.rmtree(os.path.dirname(self.srpm_http_path), ignore_errors=True)
+        shutil.rmtree(os.path.dirname(self._srpm_http_path), ignore_errors=True)
 
     def _stage_add_to_repo(self):
         # Create a list of files that the repo should copy to
@@ -650,7 +820,7 @@
                 if not f.endswith(".rpm"):
                     continue
                 jobarch = job.arch()
-                src_file = os.path.join(self.result_dir, jobarch, f)
+                src_file = os.path.join(self._result_dir, jobarch, f)
                 verrel = "%s-%s" % (self.ver, self.release)
                 if f.endswith(".src.rpm"):
                     dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, "SRPM")
@@ -664,7 +834,7 @@
         # list from this object directly when the copy operation
         # happens
         if len(self.repofiles):
-            self.repo.request_copy(self)
+            self._repo.request_copy(self)
 
         self.endtime = time.time()
         return True
@@ -678,7 +848,7 @@
 
     def _stage_repodone(self):
         resultstring = " %s (%s): Build on target %s succeeded." % (self.uid, self.name, self._target_str)
-        self.result = 'success'
+        self._result = 'success'
         self._set_cur_stage('needsign', resultstring)
 
         self._cleanup_job_files()


Index: Repo.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Repo.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- Repo.py	1 Nov 2005 14:46:18 -0000	1.18
+++ Repo.py	26 Nov 2005 06:10:22 -0000	1.19
@@ -28,6 +28,8 @@
 class Repo(threading.Thread):
     """ Represents an on-disk repository of RPMs and manages updates to the repo. """
 
+    MAX_DEPSOLVE_JOBS = 4
+
     def __init__(self, target_cfg, repodir, builder_manager):
         self._builder_manager = builder_manager
         self._target_cfg = target_cfg
@@ -36,15 +38,19 @@
             print "Error: Repository directory '%s' does not exist." % repodir
             os._exit(1)
 
+        # Ensure this repo's work directories are created
+        ### Base repo dir
         target_str = self._target_cfg.target_string()
         self._repodir = os.path.join(repodir, target_str)
         if not os.path.exists(self._repodir):
             os.makedirs(self._repodir)
 
+        ### createrepo "cache" dir
         self._repo_cache_dir = os.path.join(repodir, "cache", target_str)
         if not os.path.exists(self._repo_cache_dir):
             os.makedirs(self._repo_cache_dir)
 
+        ### SRPM HTTP upload dir
         parent_cfg = self._target_cfg.parent_cfg()
         upload_dir = os.path.join(parent_cfg.get_str("Directories", "server_work_dir"), "srpm_upload_dir", target_str)
         if not os.path.exists(upload_dir):
@@ -56,6 +62,24 @@
         self._lock_count = 0
         self._stop = False
 
+        # We want to execute a job's first depsolve right away, but
+        # if that one fails subsequent depsolves should only happen
+        # when the repo gets updated since that's when the deps might
+        # have changed.
+        #
+        # The queues are dicts mapping buildjob->boolean, where a boolean
+        # value of True means the job's depsolve has started, and false
+        # means it hasn't.
+        self._depsolve_immediate_lock = threading.Lock()
+        self._depsolve_immediate_queue = {}
+        self._depsolve_again_lock = threading.Lock()
+        self._depsolve_again_queue = {}
+
+        # Repo unlock queue
+        self._repo_unlock_lock = threading.Lock()
+        self._repo_unlock_queue = []
+
+        # Repo script stuff
         self._pobj = None
         self._repo_script_start = 0
         self._repo_script = None
@@ -74,19 +98,109 @@
         self._lock.acquire()
         self._repo_additions.append(buildjob)
         # We enter lock level 1 here, preventing builders from
-        # starting their 'prep' state
+        # starting their 'prep' state and jobs from depsolving
         if self._lock_count == 0:
             self._lock_count = 1
         self._lock.release()
 
-    def locked(self):
-        # We can get away without holding _lock here...
-        if self._lock_count > 0:
-            return True
-        return False
+    def request_depsolve(self, buildjob, first_try=False):
+        """ Registers a buildjob be notified to start depsolving when the repo is ready """
+        if first_try:
+            self._depsolve_immediate_lock.acquire()
+            self._depsolve_immediate_queue[buildjob] = False
+            self._depsolve_immediate_lock.release()
+        else:
+            self._depsolve_again_lock.acquire()
+            self._depsolve_again_queue[buildjob] = False
+            self._depsolve_again_lock.release()
+
+    def request_unlock(self, archjob):
+        self._repo_unlock_lock.acquire()
+        self._repo_unlock_queue.append(archjob)
+        self._repo_unlock_lock.release()
+
+    def cancel_unlock_request(self, archjob):
+        self._repo_unlock_lock.acquire()
+        if archjob in self._repo_unlock_queue:
+            self._repo_unlock_queue.remove(archjob)
+        self._repo_unlock_lock.release()
+
+    def _process_unlock_requests(self):
+        self._repo_unlock_lock.acquire()
+        for archjob in self._repo_unlock_queue:
+            archjob.repo_unlocked_callback()
+        self._repo_unlock_lock.release()
+
+    def _start_depsolves_for_queue(self, queue, max_jobs):
+        num = 0
+        for job in queue:
+            if queue[job]:
+                num = num + 1
+        available = max(max_jobs - num, 0)
+
+        if available > 0:
+            for job in queue.keys():
+                if available <= 0:
+                    break
+                if not queue[job]:
+                    queue[job] = True
+                    job.start_depsolve()
+                    available = available - 1
+
+    def _start_waiting_depsolves(self, repo_changed=False):
+        """ Start waiting depsolves, but only a certain number to avoid
+            nailing the build server too hard.
+        """
+        self._depsolve_immediate_lock.acquire()
+        self._depsolve_again_lock.acquire()
+
+        max_immediate_depsolves = self.MAX_DEPSOLVE_JOBS
+        if repo_changed:
+            max_again_depsolves = self.MAX_DEPSOLVE_JOBS / 2
+            max_immediate_depsolves = self.MAX_DEPSOLVE_JOBS / 2
+
+        self._start_depsolves_for_queue(self._depsolve_immediate_queue, max_immediate_depsolves)
+
+        # Only fire off non-first-try depsolves if the repo has changed
+        if repo_changed:
+            self._start_depsolves_for_queue(self._depsolve_again_queue, max_again_depsolves)
+
+        self._depsolve_again_lock.release()
+        self._depsolve_immediate_lock.release()
+
+    def notify_depsolve_done(self, buildjob):
+        """ Notifies the repo that a job is done depsolving """
+        self._depsolve_immediate_lock.acquire()
+        self._depsolve_again_lock.acquire()
+        if buildjob in self._depsolve_immediate_queue:
+            del self._depsolve_immediate_queue[buildjob]
+        elif buildjob in self._depsolve_again_queue:
+            del self._depsolve_again_queue[buildjob]
+        self._depsolve_again_lock.release()
+        self._depsolve_immediate_lock.release()
+
+    def _any_depsolving_jobs(self):
+        """ Determines if any jobs are currently depsolving """
+        any_depsolving = False
+
+        self._depsolve_immediate_lock.acquire()
+        self._depsolve_again_lock.acquire()
+        for job in self._depsolve_immediate_queue.keys():
+            if self._depsolve_immediate_queue[job]:
+                any_depsolving = True
+                break
+        if not any_depsolving:
+            for job in self._depsolve_again_queue.keys():
+                if self._depsolve_again_queue[job]:
+                    any_depsolving = True
+                    break
+        self._depsolve_again_lock.release()
+        self._depsolve_immediate_lock.release()
+
+        return any_depsolving
 
     def _update_repo(self):
-        """ Copy new RPMS to each repo, and update each repo at the end """
+        """ Copy new RPMS to the repo, and updates the repo at the end """
         for buildjob in self._repo_additions:
             # Ensure all the files are accessible
             success = True
@@ -118,12 +232,12 @@
 
         (s, o) = commands.getstatusoutput('/usr/bin/createrepo -q -c %s -x "*.src.rpm" -x "*.debuginfo.rpm" %s' % (self._repo_cache_dir, self._repodir))
         if s != 0:
-            print "Error: createrepo failed with exit status %d!  Output: '%s'" % (s, o)
+            print "Repo Error (%s): createrepo failed with exit status %d!  Output: '%s'" % (self._target_cfg.target_string(), s, o)
 
-    def _run_repo_script(self):
+    def _start_repo_script(self):
         target_str = self._target_cfg.target_string()
         cmd = "%s %s" % (self._repo_script, target_str)
-        print "Repo '%s': executing repository script %s" % (target_str, self._repo_script)
+        print "Repo Error (%s): executing repository script %s" % (target_str, self._repo_script)
         self._pobj = popen2.Popen4(cmd=cmd)
         fcntl.fcntl(self._pobj.fromchild.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
         self._repo_script_start = time.time()
@@ -178,8 +292,9 @@
             self._pobj = None
 
     def run(self):
+        repo_changed = False
         while self._stop == False:
-            # We have 3 lock levels:
+            # We have 3 locked operations, signified by self._lock_count:
             #
             # 0 - repo unlocked
             # 1 - entered when jobs request packages to be copied to the repo;
@@ -188,37 +303,44 @@
             #       packages copied to repo and createrepo is run
             # 3 - entered when createrepo is done; repo script run
 
-            prepping_builders = self._builder_manager.any_prepping_builders()
-
             self._lock.acquire()
 
-            # Level 2: update the repo
-            if self._lock_count == 2:
+            if self._lock_count == 0:
+                # Notify archjobs that the repo is unlocked
+                self._process_unlock_requests()
+
+                # Kick off depsolves
+                self._start_waiting_depsolves(repo_changed)
+                if repo_changed:
+                    repo_changed = False
+            elif self._lock_count == 1:
+                # Enter lock level 2 if there are no builders in the
+                # 'prep' state, no jobs are depsolving, and we are already at lock level 1
+                prepping_builders = self._builder_manager.any_prepping_builders()
+                depsolving_jobs = self._any_depsolving_jobs()
+                if not prepping_builders and not depsolving_jobs:
+                    self._lock_count = 2
+            elif self._lock_count == 2:
+                # Level 2: update the repo
                 target_str = self._target_cfg.target_string()
                 print "Repo '%s': updating repository metadata..." % target_str
                 self._update_repo()
+                repo_changed = True
                 print "Repo '%s': Done updating." % target_str
 
-                # If there's a repo script for this target, enter level 3
+                # Run the repo script, if any
+                self._pobj = None
                 if self._repo_script:
-                    self._run_repo_script()
-                    self._lock_count = 3
-                else:
-                    self._lock_count = 0
-
-            # Level 3: monitor the repo script
-            if self._lock_count == 3:
+                    self._start_repo_script()
+                self._lock_count = 3
+            elif self._lock_count == 3:
+                # Level 3: monitor the repo script
                 if self._pobj:
                     self._monitor_repo_script()
                 else:
                     # If for some reason self._pobj is None, unlock the repo
                     self._lock_count = 0
 
-            # Enter lock level 2 if there are no builders in the
-            # 'prep' state and we are already at lock level 1
-            if not prepping_builders and self._lock_count == 1:
-                self._lock_count = 2
-
             self._lock.release()
 
             time.sleep(5)




More information about the fedora-extras-commits mailing list