extras-buildsys/server Repo.py, NONE, 1.1 BuildJob.py, 1.7, 1.8 BuildMaster.py, 1.4, 1.5 client_manager.py, 1.21, 1.22

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Fri Jun 24 12:51:24 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv3560/server

Modified Files:
	BuildJob.py BuildMaster.py client_manager.py 
Added Files:
	Repo.py 
Log Message:
2005-06-24  Dan Williams <dcbw at redhat.com>

    * server/Repo.py
        - Deal with repo locking during createrepo stages

    * Ensure repositories can run createrepo without being accessed
        by clients during the operation.  Since createrepo runs can take
        a long time, during that time clients cannot try to install their
        buildroots or do any operations against the repository, otherwise
        they may fail randomly with yum errors.  So we lock access to the
        repository on a 2-level lock: when a build job is done, it asks the
        repo to copy its finished RPMs, and the repo enters lock level 1.
        Level 1 prevents new build jobs from entering their 'prep' state.
        When all currently running jobs have finished their prep state, and
        the repo is in lock level 1, the repo promotes to lock leve 2 and
        is able to run createrepo after copying any new RPMs into the repo.
        When this is done, all waiting clients are released into their
        'prep' states.

        This requires an absolute latest mock from CVS.




--- NEW FILE Repo.py ---
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.

import os
import threading
import shutil
import CONFIG
import time
import commands


class Repo(threading.Thread):
    """ Represents an on-disk repository of RPMs and manages updates to the repo. """

    def __init__(self, target, client_manager):
        self._bcm = client_manager
        self._target = target
        self._repodir = os.path.join(CONFIG.get('repo_dir'), target)
        if not os.path.exists(self._repodir):
            os.makedirs(self._repodir)
        self._lock = threading.Lock()
        self._repo_additions = []
        self._lock_count = 0
        self._stop = False
        threading.Thread.__init__(self)

    def target(self):
        return self._target

    def request_copy(self, buildjob):
        """ Registers a BuildJob object that has files to copy to the repo """

        self._lock.acquire()
        print "Added job uid %s to additions" % buildjob.uid
        self._repo_additions.append(buildjob)
        # We enter lock level 1 here, preventing build clients from
        # starting their 'prep' state
        if self._lock_count == 0:
            self._lock_count = 1
        self._lock.release()

    def locked(self):
        # We can get away without holding _lock here...
        if self._lock_count > 0:
            return True
        return False

    def _update_repo(self):
        """ Copy new RPMS to each repo, and update each repo at the end """
        for buildjob in self._repo_additions:
            for src in buildjob.repofiles.keys():
                dst = buildjob.repofiles[src]
                if not os.path.exists(os.path.dirname(dst)):
                    os.makedirs(os.path.dirname(dst))
                shutil.copy(src, dst)

            # Notify the build job that we've copied its files to the repo
            buildjob.repo_add_callback()

        s, o = commands.getstatusoutput('/usr/bin/createrepo -q %s' % self._repodir)
        if s != 0:
            print "createrepo failed with exit status %d!" % s


    def run(self):
        while self._stop == False:
            # We have 2 lock levels.  When the repo is in either, clients are prevented
            # from starting their 'prep' state.  Clients may already be in the 'prep'
            # state when we lock the repo, therefore we don't actually enter lock level
            # 2 until all clients have finished their 'prep' state.  Only then do we
            # copy RPMs to the repo and run createrepo on it.

            prepping_clients = self._bcm.any_prepping_clients()

            self._lock.acquire()

            # If the lock level is 2, update the repo
            if self._lock_count == 2:
                print "Repo '%s': Lock level 2, updating repository..." % self._target
                self._update_repo()
                print "Repo '%s': Done updating..." % self._target
                self._lock_count = 0

            # Enter lock level 2 if there are no build clients in the
            # 'prep' state and we are already at lock level 1
            if not prepping_clients and self._lock_count == 1:
                print "Repo '%s': Promoting to lock level 2" % self._target
                self._lock_count = 2

            self._lock.release()

            time.sleep(5)

    def stop(self):
        self._stop = True



Index: BuildJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildJob.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- BuildJob.py	19 Jun 2005 02:47:53 -0000	1.7
+++ BuildJob.py	24 Jun 2005 12:51:22 -0000	1.8
@@ -72,7 +72,7 @@
 class BuildJob(threading.Thread):
     """ Controller object for building 1 SRPM on multiple arches """
 
-    def __init__(self, uid, username, package, cvs_tag, target, buildmaster, hostname):
+    def __init__(self, uid, username, package, cvs_tag, repo, buildmaster, hostname):
         self.curstage = 'initialize'
         self.bm = buildmaster
         self.hostname = hostname
@@ -82,7 +82,8 @@
         self.endtime = None
         self.package = package
         self.name = None
-        self.target = target
+        self.target = repo.target()
+        self.repo = repo
         self.buildarches = []
         self.sub_jobs = {}
         self.failed = False
@@ -94,6 +95,7 @@
         # Deal with straight SRPM builds
         if self.no_cvs and self.curstage is 'initialize':
             self.curstage = 'make_srpm'
+        self.repofiles = {}
         threading.Thread.__init__(self)
 
     def get_cur_stage(self):
@@ -145,7 +147,9 @@
         buildable_arches = targets[self.target]
 
         target_opt_arches = CONFIG.get('target_optional_arches')
-        opt_arches = target_opt_arches[self.target]
+        opt_arches = []
+        if target_opt_arches.has_key(self.target):
+            opt_arches = target_opt_arches[self.target]
 
         # Remove arches we don't support from addl_arches
         for arch in addl_arches:
@@ -312,9 +316,6 @@
             self.curstage = 'finished'
             return
 
-        # Make sure build clients see latest packages
-        self._createrepo()
-
         self.stage_dir = self._make_stage_dir(CONFIG.get('server_work_dir'))
         for arch in self.buildarches:
             thisdir = os.path.join(self.stage_dir, arch)
@@ -341,6 +342,7 @@
     def run(self):
         while self.curstage != 'needsign' and self.curstage != 'failed':
             # Advance to next stage based on current stage
+            do_sleep = False
             oldstage = self.curstage
             if oldstage == 'initialize':
                 self._checkout()
@@ -350,13 +352,24 @@
                 self._prep()
             elif oldstage == 'prep' or oldstage == 'building':
                 self._monitor()
+                do_sleep = True
             elif oldstage == 'finished':
                 self._cleanup()
             elif oldstage == 'cleanup':
                 if self.failed:
                     self._failed()
                 else:
-                    self._succeeded()
+                    self._add_to_repo()
+            elif oldstage == 'addtorepo':
+                do_sleep = True
+            elif oldstage == 'repodone':
+                self._succeeded()
+            print "%s/%s" % (oldstage, self.curstage)
+
+            # Only some stages need to sleep because we stay in
+            # them for a while.
+            if do_sleep:
+                time.sleep(3)
 
     def _start_unspawned_builds(self):
         for arch in self.buildarches:
@@ -396,8 +409,6 @@
         if self.failed or (have_jobs == True and jobs_running == False):
             self.curstage = 'finished'
 
-        time.sleep(3)
-
     def _cleanup(self):
         self.curstage = 'cleanup'
         if self.failed:
@@ -427,12 +438,11 @@
         resultstring = resultstring + "\n"
         self.email_result(resultstring)
         
-    def _succeeded(self):
-        self.curstage = 'needsign'
+    def _add_to_repo(self):
+        self.curstage = 'addtorepo'
 
-        # Copy completed RPMs to repo dir
-        # FIXME: possible concurrency issue, what if createrepo
-        # is being run when we are copying RPMs to the repo dir?
+        # Create a list of files that the repo should copy to
+        # the repo dir
         for job in self.sub_jobs.values():
             file_list = job.get_files()
             for f in file_list:
@@ -441,17 +451,22 @@
                 src_file = os.path.join(self.stage_dir, job.arch, f)
                 verrel = "%s-%s" % (self.ver, self.release)
                 dst_path = os.path.join(CONFIG.get('repo_dir'), self.target, self.name, verrel, job.arch)
-                if not os.path.exists(dst_path):
-                    os.makedirs(dst_path)
-                shutil.copy(src_file, dst_path)
-        self.bm.invalidate_repo()
+                self.repofiles[src_file] = dst_path
+
+        # Request the repo copy our files.  It will get the file
+        # list from this object directly when the copy operation
+        # happens
+        if len(self.repofiles):
+            self.repo.request_copy(self)
+
+    def repo_add_callback(self):
+        self.curstage = 'repodone'
 
+    def _succeeded(self):
+        self.curstage = 'needsign'
         resultstring = "%s (%s): Build on target %s succeeded." % (self.uid, self.name, self.target)
         self.email_result(resultstring)
 
-        # Udpate the repo with new packages
-        self._createrepo()
-
     def email_result(self, resultstring, subject=None):
         """send 'resultstring' to self.email from self.email_from"""
         
@@ -469,10 +484,3 @@
         s.sendmail(CONFIG.get('email_from'), [self.username], msg.as_string())
         s.close()
 
-    def _createrepo(self):
-        # createrepo on the needsign tree for new changes
-        repodir = os.path.join(CONFIG.get('repo_dir'), self.target)
-        self.bm.createrepo(repodir)
-        debugprint("%d: updated repodir %s" % (self.uid, repodir))
-
-


Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- BuildMaster.py	24 Jun 2005 10:55:15 -0000	1.4
+++ BuildMaster.py	24 Jun 2005 12:51:22 -0000	1.5
@@ -60,19 +60,22 @@
         self.should_stop = False
         self.dbcx = sqlite.connect("jobdb", encoding="utf-8", timeout=4)
         self.curs = self.dbcx.cursor()
-        self.createrepo_lock = threading.Lock()
-        self._repo_invalid = True
+        self.repos = {}
+        for target in CONFIG.get('targets').keys():
+            repo = Repo.Repo(target, client_manager)
+            self.repos[target] = repo
+            repo.start()
+            print "Created repo %s" % target
         ensure_build_db_tables(self.dbcx)
         threading.Thread.__init__(self)
 
     def __del__(self):
         self.dbcx.close()
 
-    def invalidate_repo(self):
-        self._repo_invalid = True
-
     def stop(self):
         self.should_stop = True
+        for repo in self.repos.values():
+            repo.stop()
 
     def set_job_status(self, job):
         status = job.get_cur_stage()
@@ -83,22 +86,6 @@
     def getClientManager(self):
         return self.bcm
 
-    def createrepo(self, repodir):
-        """ We need to lock calls to createrepo so they don't get run at the same time """
-        # FIXME: possibly concurrency issue here, what if clients are
-        # trying to pull repodata while we are recreating it?
-        self.createrepo_lock.acquire()
-
-        if not os.path.exists(repodir):
-            os.makedirs(repodir)
-        if self._repo_invalid:
-	        s, o = commands.getstatusoutput('/usr/bin/createrepo -q %s' % repodir)
-	        if s != 0:
-	            print "createrepo failed with exit status %d!" % s
-        self._repo_invalid = False
-
-        self.createrepo_lock.release()
-
     def run(self):
         while self.should_stop == False:
             # Update all build clients and known jobs


Index: client_manager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/client_manager.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- client_manager.py	24 Jun 2005 10:56:43 -0000	1.21
+++ client_manager.py	24 Jun 2005 12:51:22 -0000	1.22
@@ -100,6 +100,18 @@
 
             self.client_result = result
 
+            # Clients pause before they enter the 'prep' state (which accesses
+            # the repo for this target), and wait for the server to allow them
+            # to proceed when the repo is unlocked.
+            if result == 'downloaded':
+                if not self.parent_job.repo.locked():
+                    try:
+                        self._server.repo_unlocked(self.jobid)
+                    except socket.error, e:
+                        if not CommonErrors.canIgnoreSocketError(e):
+                            print "%s (%s/%s): [ %s ] Unknown error when signalling repo unlocked: '%s'" % (self.parent_job.uid,
+                                        self.parent_job.package, self.arch, self.bci.address(), e)
+
             # if the builder is done, grab list of files to download
             if result == 'done' or result == 'killed' or result == 'failed':
                 self.status = 'downloading'
@@ -298,6 +310,13 @@
         else:
             return True
 
+    def any_prepping_jobs(self):
+        for job in self._jobs:
+            if job.valid() and job.get_status() == 'running':
+                if job.get_builder_result() == 'prepping':
+                    return True
+        return False
+
     def to_dict(self):
         client_dict = {}
         client_dict['address'] = self._address
@@ -378,3 +397,11 @@
 
         return None
 
+    def any_prepping_clients(self):
+        # query each build client for any jobs that are in the 'prepping' state
+        for client in self.running_clients:
+            if client.any_prepping_jobs():
+                return True
+        return False
+
+




More information about the fedora-extras-commits mailing list