extras-buildsys/builder Builder.py,1.3,1.4 BuilderMock.py,1.2,1.3

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Tue May 9 02:57:14 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/builder
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv359/builder

Modified Files:
	Builder.py BuilderMock.py 
Log Message:
2006-05-08  Dan Williams  <dcbw at redhat.com>

    * builder/Builder.py
        - pylint cleanups
        - make SRPM downloads and uploads work

    * builder/BuilderMock.py
        - pyling cleanups
        - remove log()
        - make download and upload work

    * server/Builder.py
        - Don't send multiple UnlockRepo commands
        - Actually let archjobs have some processing time




Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/builder/Builder.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- Builder.py	5 May 2006 02:10:38 -0000	1.3
+++ Builder.py	9 May 2006 02:57:11 -0000	1.4
@@ -20,20 +20,22 @@
 import socket
 import time
 import threading
-import sha
-import exceptions
+import urllib
 import xmlrpclib
 import OpenSSL
 from plague import Commands
 from plague import AuthedXMLRPCServer
 from plague import HTTPServer
 from plague import XMLRPCServerProxy
+from plague import FileDownloader
+from plague import FileUploader
 
 import Config
 import BuilderMock
 
 
-class BuilderInitException(Exception): pass
+class BuilderInitException(Exception):
+    pass
 
 def get_hostname(cfg, bind_all):
     cfg_hostname = cfg.get_str("Passive", "hostname")
@@ -49,10 +51,10 @@
     import commands
     max_jobs = 1
     cmd = "/usr/bin/getconf _NPROCESSORS_ONLN"
-    (s, o) = commands.getstatusoutput(cmd)
-    if s == 0:
+    (status, output) = commands.getstatusoutput(cmd)
+    if status == 0:
         try:
-            max_jobs = int(o)
+            max_jobs = int(output)
         except ValueError:
             pass
     return max_jobs
@@ -62,75 +64,6 @@
     sys.stdout.flush()
 
 
-class PassiveBuilderRequestHandler:
-    def __init__(self, cfg, builder):
-        self._builder = builder
-        self._all_jobs = {} # unique id => awclass instance
-        self._building_jobs_lock = threading.Lock()
-        self._building_jobs = []
-        self._cfg = cfg
-
-    def _log(self, string):
-        if self._cfg.get_bool("General", "debug"):
-            print string
-
-    def notify_job_done(self, archjob):
-        self._building_jobs_lock.acquire()
-        if archjob in self._building_jobs:
-            self._building_jobs.remove(archjob)
-        self._building_jobs_lock.release()
-
-    def die(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            job.die()
-        except KeyError:
-            pass
-        return 0
-
-    def files(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            return job.files()
-        except KeyError:
-            pass
-        return []
-    
-    def repo_unlocked(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            job.unlock_repo()
-        except KeyError:
-            pass
-        return 0
-
-    def building_jobs(self):
-        jobs = {}
-        self._building_jobs_lock.acquire()
-        building = 0
-        for job in self._building_jobs:
-            jobs[job.uniqid()] = job.status()
-            building = building + 1
-        free = self._max_jobs - building
-        self._building_jobs_lock.release()
-        return (jobs, free)
-
-    def num_slots(self):
-        (free_slots, max_slots) = self._builder.slots()
-        return max_slots
-
-    def job_status(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            return job.status()
-        except KeyError:
-            pass
-        return ''
-
-    def supported_targets(self):
-        return self._builder.supported_targets()
-            
-
 class Builder(object):
     """ Abstract builder base object """
     def __init__(self, cfg):
@@ -174,13 +107,13 @@
             bcs.die(jobid)
 
         # wait for the jobs to clean up before quitting
-        log("Waiting for running jobs to stop...")
+        self._log("Waiting for running jobs to stop...")
         while True:
             (building_jobs, free) = bcs.building_jobs()
             if len(building_jobs.keys()) == 0:
                 break
             try:
-                log(".")
+                self._log(".")
                 time.sleep(0.5)
             except KeyboardInterrupt:
                 break
@@ -193,12 +126,18 @@
 
     def supported_targets(self):
         targets = []
-        for t in self._cfg.targets():
-            td = t.target_dict()
-            td['supported_arches'] = t.arches()
-            targets.append(td)
+        for target in self._cfg.targets():
+            target_dict = target.target_dict()
+            target_dict['supported_arches'] = target.arches()
+            targets.append(target_dict)
         return targets
 
+    def notify_job_done(self, archjob):
+        self._building_jobs_lock.acquire()
+        if archjob in self._building_jobs:
+            self._building_jobs.remove(archjob)
+        self._building_jobs_lock.release()
+
     def _get_target_cfg(self, target_dict):
         target_cfg = None
 
@@ -239,10 +178,10 @@
 
         uniqid = -1
         msg = "Success"
-        (free, max) = self.slots()
-        if free <= 0:
+        (free_slots, max_slots) = self.slots()
+        if free_slots <= 0:
             msg = "Error: Tried to build '%s' on target %s when already building" \
-                        " maximum (%d) jobs" % (srpm_url, target_str, max)
+                        " maximum (%d) jobs" % (srpm_url, target_str, max_slots)
             self._log(msg)
             return (uniqid, msg)
 
@@ -263,16 +202,15 @@
             filename = os.path.basename(srpm_url)
             msg = "%s: started %s on %s arch %s at time %d" % (uniqid, filename,
                         target_str, target_dict['arch'], archjob.starttime())
-#            job.start()
-        except (OSError, TypeError), err:
+            archjob.start()
+        except (OSError, TypeError), exc:
             msg = "Failed request for %s on %s: '%s'" % (srpm_url,
-                    target_str, err)
+                    target_str, exc)
 
         self._log(msg)
         return (uniqid, msg)
 
     def _get_building_jobs_cmd(self):
-        jobs_list = []
         cmd = Commands.PlgCommandBuildingJobs(self._seq_gen.next())
         self._building_jobs_lock.acquire()
         for job in self._building_jobs:
@@ -283,7 +221,7 @@
     def _handle_unlock_repo_request(self, cmd):
         uniqid = cmd.archjob_id()
         self._building_jobs_lock.acquire()
-        for item in self._building_jobs:
+        for job in self._building_jobs:
             if job.uniqid() == uniqid:
                 job.unlock_repo()
         self._building_jobs_lock.release()
@@ -294,10 +232,74 @@
             uniqid = cmd.archjob_id()
             job = self._all_jobs[uniqid]
             reply = Commands.PlgCommandJobStatusAck(uniqid, job.status(), cmd.seq(), self._seq_gen.next())
-        except KeyError, e:
+        except KeyError:
             pass
         return reply
 
+
+class PassiveBuilderRequestHandler:
+    def __init__(self, cfg, builder):
+        self._builder = builder
+        self._all_jobs = {} # unique id => awclass instance
+        self._building_jobs_lock = threading.Lock()
+        self._building_jobs = []
+        self._cfg = cfg
+
+    def _log(self, string):
+        if self._cfg.get_bool("General", "debug"):
+            print string
+
+    def die(self, uniqid):
+        try:
+            job = self._all_jobs[uniqid]
+            job.die()
+        except KeyError:
+            pass
+        return 0
+
+    def files(self, uniqid):
+        try:
+            job = self._all_jobs[uniqid]
+            return job.files()
+        except KeyError:
+            pass
+        return []
+    
+    def repo_unlocked(self, uniqid):
+        try:
+            job = self._all_jobs[uniqid]
+            job.unlock_repo()
+        except KeyError:
+            pass
+        return 0
+
+    def building_jobs(self):
+        jobs = {}
+        self._building_jobs_lock.acquire()
+        building = 0
+        for job in self._building_jobs:
+            jobs[job.uniqid()] = job.status()
+            building = building + 1
+        free = self._max_jobs - building
+        self._building_jobs_lock.release()
+        return (jobs, free)
+
+    def num_slots(self):
+        (free_slots, max_slots) = self._builder.slots()
+        return max_slots
+
+    def job_status(self, uniqid):
+        try:
+            job = self._all_jobs[uniqid]
+            return job.status()
+        except KeyError:
+            pass
+        return ''
+
+    def supported_targets(self):
+        return self._builder.supported_targets()
+            
+
 class PassiveBuilder(Builder):
     """
     Passive builders initiate no communication of their own.  They wait
@@ -317,15 +319,15 @@
         self._http_server = HTTPServer.PlgHTTPServerManager((hostname, port), work_dir, self._certs)
         self._http_server.start()
 
-        log("Binding to address '%s' with arches: [%s]\n" % (hostname, string.join(build_arches, ",")))
+        self._log("Binding to address '%s' with arches: [%s]\n" % (hostname, string.join(build_arches, ",")))
         xmlrpc_port = cfg.get_int("Passive", "xmlrpc_port")
         try:
             if cfg.get_bool("SSL", "use_ssl") == True:
                 self._xmlrpc_server = AuthedXMLRPCServer.AuthedSSLXMLRPCServer((hostname, xmlrpc_port), None, self._certs)
             else:
                 self._xmlrpc_server = AuthedXMLRPCServer.AuthedXMLRPCServer((hostname, xmlrpc_port), None)
-        except socket.error, e:
-            if e[0] == 98:
+        except socket.error, exc:
+            if exc[0] == 98:
                 raise BuilderInitException("Error: couldn't bind to address '%s:%s'.  "  \
                            "Is the builder already running?\n" % (hostname, xmlrpc_port))
 
@@ -339,6 +341,12 @@
         except KeyboardInterrupt:
             pass
 
+    def download_srpm(self, url, target_dir, dl_callback, cb_data):
+        """For passive builders, the server uploads the RPM to the builder.
+        Therefore, we already have it.  Move it from the HTTP server's upload
+        directory to the requested target_dir, if the SRPM exists."""
+        pass
+
     def _stop_servers(self):
         self._http_server.stop()
         self._xmlrpc_server.stop()
@@ -373,8 +381,8 @@
             self.response = self._server.request(cmd_stream)
         except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
             self.failed = True
-        except xmlrpclib.Fault, e:
-            print "Builder Error (%s) in request(): server replied '%s'" % (self._address, e)
+        except xmlrpclib.Fault, exc:
+            print "Builder Error (%s) in request(): server replied '%s'" % (self._address, exc)
             self.failed = True
         self.done = True
 
@@ -414,6 +422,30 @@
         except KeyboardInterrupt:
             pass
 
+    def download_srpm(self, url, target_dir, dl_callback, cb_data=None):
+        """Download an SRPM from the build server.  Only used by BuilderMock
+        objects."""
+        downloader = FileDownloader.FileDownloader(url, target_dir, ['.src.rpm'],
+                self._certs)
+        downloader.set_callback(dl_callback, url)
+        downloader.start()
+        return downloader
+
+    def upload_files(self, files, ul_callback, cb_data=None):
+        server = self._cfg.get_str("Active", "server")
+        (urltype, urlrest) = urllib.splittype(server)
+        (server, urlrest) = urllib.splithost(urlrest)
+        if self._cfg.get_bool("SSL", "use_ssl"):
+            url = "https://" + server
+        else:
+            url = "http://" + server
+        url = url + ":%d/upload" % self._cfg.get_int("Active", "fileserver_port")
+        uploader = FileUploader.FileUploader(url, files, 'filedata', None,
+                self._certs)
+        uploader.set_callback(ul_callback, cb_data)
+        uploader.start()
+        return uploader
+
     def _get_default_commands(self):
         """Return a python list of serialized commands that the builder
         sends to the server every time it contacts the server."""
@@ -427,8 +459,8 @@
 
         # always send free & max slots
         next_seq = self._seq_gen.next()
-        (free, max) = self.slots()
-        cmd = Commands.PlgCommandSlots(free, max, next_seq)
+        (free_slots, max_slots) = self.slots()
+        cmd = Commands.PlgCommandSlots(free_slots, max_slots, next_seq)
         defcmds.append(cmd)
 
         defcmds.append(self._get_building_jobs_cmd())
@@ -455,7 +487,7 @@
             time.sleep(0.5)
 
         if req.done and not req.failed:
-            self.queued_cmds = []
+            self._queued_cmds = []
             return req.response
         return None
 
@@ -474,7 +506,7 @@
                 self._queued_cmds.append(reply)
 
     def _process_server_response(self, response):
-        """Process the server's response."""
+        """Process the server's response command stream."""
 
         if not response:
             # Something went wrong...


Index: BuilderMock.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/builder/BuilderMock.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- BuilderMock.py	3 May 2006 04:04:27 -0000	1.2
+++ BuilderMock.py	9 May 2006 02:57:11 -0000	1.3
@@ -20,25 +20,20 @@
 import os
 
 
-import socket
 import shutil
 import string
 import fcntl
 import urllib
 import errno
 import exceptions
-import threading
 import sha
 import time
 
 from plague import ExecUtils
 from plague import FileDownloader
+from plague import FileTransfer
 
 
-def log(string):
-    sys.stdout.write(string)
-    sys.stdout.flush()
-
 def get_url_for_file(cfg, file_path):
     """ Return a URL pointing to a particular file in our work dir """
 
@@ -57,9 +52,9 @@
     return urllib.quote(full_url)
 
 def _generate_uniqid(target_str, srpm_url):
-    sum = sha.new()
-    sum.update('%d %s %s' % (time.time(), target_str, srpm_url))
-    return sum.hexdigest()
+    sha_hash = sha.new()
+    sha_hash.update('%d %s %s' % (time.time(), target_str, srpm_url))
+    return sha_hash.hexdigest()
 
 
 class BuilderMock(threading.Thread):
@@ -68,7 +63,7 @@
 
     def __init__(self, controller, target_cfg, buildarch, srpm_url):
         self._controller = controller
-        self.buildarch = buildarch
+        self._buildarch = buildarch
         self._starttime = time.time()
         self._endtime = 0
         self._mockstarttime = 0
@@ -77,7 +72,7 @@
         self._die = False
         self._repo_locked = True
         self._repo_wait_start = 0
-        self._files = []
+        self._files = {}
         self._childpid = 0
         self._target_cfg = target_cfg
         self._builder_cfg = target_cfg.parent_cfg()
@@ -87,7 +82,10 @@
         self._mock_config = None
         self._done_status = ''
         self._mock_log = None
-        self.buildroot = self._target_cfg.mock_config()
+        self._buildroot = self._target_cfg.mock_config()
+        self._downloader = None
+        self._uploader = None
+        self.arch_command = ""
 
         self._work_dir = self._builder_cfg.get_str("Directories", "builder_work_dir")
         self._result_dir = os.path.join(self._work_dir, self._uniqid, "result")
@@ -120,6 +118,11 @@
         self._done_status = 'killed'
         self._log("Killing build process...\n")
 
+        if self._downloader:
+            self._downloader.cancel()
+        if self._uploader:
+            self._uploader.cancel()
+
         # Don't try to kill a running cleanup process
         if self._status != 'cleanup':
             # Kill a running non-cleanup mock process, if any
@@ -145,36 +148,16 @@
 
         self._log("Killed.\n");
 
-    def _log(self, string):
-        if string and self._log_fd:
-            self._log_fd.write(string)
+    def _log(self, msg):
+        if msg and self._log_fd:
+            self._log_fd.write(msg)
             self._log_fd.flush()
             os.fsync(self._log_fd.fileno())
             if self._builder_cfg.get_bool("General", "debug"):
                 s = "%s: " % self._uniqid
-                sys.stdout.write(s + string)
+                sys.stdout.write(s + msg)
                 sys.stdout.flush()
 
-    def dl_callback(self, dl_status, cb_data, err_msg):
-        url = cb_data
-        if dl_status == 'done':
-            self._status = 'downloaded'
-            self._log("Retrieved %s.\n" % url)
-        elif dl_status == 'failed':
-            # If job was cancelled, just return
-            if self.is_done_status():
-                return
-
-            # Retry up to 5 times
-            self._srpm_tries = self._srpm_tries + 1
-            if self._srpm_tries >= 5:
-                self._status = 'failed'
-                self._log("ERROR: Failed to retrieve %s.\n" % url)
-            else:
-                # retry the download
-                self._status = 'init'
-                self._log("ERROR: Failed to retrieve %s on attempt %d (%s).  Trying again...\n" % (url, self._srpm_tries, err_msg))
-
     def _copy_mock_output_to_log(self):
         if self._mock_log and os.path.exists(self._mock_log):
             ml = open(self._mock_log, "r")
@@ -205,9 +188,9 @@
             cmd = os.path.abspath(arg_list[0])
         args.append(builder_cmd)
         args.append("-r")
-        args.append(self.buildroot)
+        args.append(self._buildroot)
         args.append("--arch")
-        args.append(self.buildarch)
+        args.append(self._buildarch)
         args.append("--resultdir=%s" % self._result_dir)
         args.append("--statedir=%s" % self._state_dir)
         args.append("--uniqueext=%s" % self._uniqid)
@@ -234,7 +217,7 @@
         args.append("clean")
         args.append("--uniqueext=%s" % self._uniqid)
         args.append("-r")
-        args.append(self.buildroot)
+        args.append(self._buildroot)
 
         self._log("   %s\n" % string.join(args))
         self._childpid = ExecUtils.exec_with_redirect(cmd, args, None, None, None)
@@ -279,7 +262,7 @@
         while True:
             try:
                 f.seek(0, 0)
-                string = f.read(4)
+                mockstat = f.read(4)
             except OSError, e:
                 if e.errno == errno.EAGAIN:
                     try:
@@ -288,12 +271,11 @@
                         pass
                     continue
             else:
-                if len(string) < 4:
+                if len(mockstat) < 4:
                     continue
                 break
         f.close()
-        string = string.lower()
-        return string
+        return mockstat.lower()
 
     def _read_mock_config(self):
         mockconfigfile = os.path.join(self._result_dir, 'mockconfig.log')
@@ -310,17 +292,32 @@
         f.close()
         return contents
 
+    def dl_callback(self, dl_status, cb_data, err_msg=None):
+        url = cb_data
+        if dl_status == FileTransfer.FT_RESULT_SUCCESS:
+            self._status = 'downloaded'
+            self._log("Retrieved %s.\n" % url)
+        elif dl_status == FileTransfer.FT_RESULT_FAILED:
+            # If job was cancelled, just return
+            if self.is_done_status():
+                return
+            self._status = 'failed'
+            self._log("ERROR: Failed to retrieve %s.\n" % url)
+        elif dl_status == FileTransfer.FT_RESULT_CANCELED:
+            # Ignore cancelation
+            pass
+        self._downloader = None
+
     def _status_init(self):
         self._log("Starting download of %s.\n" % self._srpm_url)
         self._status = 'downloading'
         target_dir = os.path.dirname(self._srpm_path)
         try:
-            dl_thread = FileDownloader.FileDownloader(self.dl_callback, self._srpm_url, self._srpm_url,
-                        target_dir, ['.src.rpm'], certs)
-            dl_thread.start()
+            self._downloader = self._controller.download_srpm(self._srpm_url,
+                    target_dir, self.dl_callback, self._srpm_url)
         except FileDownloader.FileNameException, e:
             self._status = 'failed'
-            self._log("ERROR: Failed to begin SRPM download.  Error: '%s'   URL: %s\n" % (e, self._srpm_url))
+            self._log("ERROR: Failed to begin SRPM download.  Error: '%s'  URL: %s\n" % (e, self._srpm_url))
             
     def _status_downloading(self):
         pass
@@ -360,7 +357,7 @@
         # something is wrong
         self._watch_mock('failed', 'failed')
         if self._status != 'prepping':
-               return
+            return
 
         # We need to make sure that mock has dumped the status file withing a certain
         # amount of time, otherwise we can't tell what it's doing
@@ -400,7 +397,7 @@
                 if source_dir.endswith(os.path.join(self._uniqid, "source")):
                     shutil.rmtree(source_dir, ignore_errors=True)
 
-        # Ensure child process is reaped, if any
+        # Ensure child process is reaped if it's still around
         if self._childpid:
             try:
                 self._log("Waiting for child process %d to exit.\n" % self._childpid)
@@ -410,9 +407,21 @@
                 pass
 
         self._copy_mock_output_to_log()
-
         self._files = self._find_files()
+        self._uploader = self._controller.upload_files(self._files, self.ul_callback, None)
+        self._status = "uploading"
+
+    def _ul_callback(self, status, cb_data, msg):
+        if status == FileTransfer.FT_RESULT_SUCCESS:
+            pass
+        elif status == FileTransfer.FT_RESULT_FAILED:
+            self._done_status = 'failed'
+            self._log("Job failed because files could not be uploaded: %s" % msg)
         self._status = self._done_status
+        self._uploader = None
+
+    def _status_uploading(self):
+        pass
 
     def _job_done(self):
         self._log("-----------------------\n")
@@ -437,7 +446,7 @@
    Target: %s
    UID: %s
    Architecture: %s
-   SRPM: %s\n\n""" % (time.asctime(time.localtime(self._starttime)), target_str, self._uniqid, self.buildarch, self._srpm_url))
+   SRPM: %s\n\n""" % (time.asctime(time.localtime(self._starttime)), target_str, self._uniqid, self._buildarch, self._srpm_url))
 
         try:
             srpm_filename = FileDownloader.get_base_filename_from_url(self._srpm_url, ['.src.rpm'])
@@ -474,13 +483,24 @@
         self._log("\n")
         self._log("Output File List:\n")
         self._log("-----------------\n")
-        for f in files_in_dir:
-            file_url = get_url_for_file(self._builder_cfg, os.path.join(self._result_dir, f))
-            if file_url:
-                file_list.append(file_url)
-                self._log("  Output File: %s\n" % urllib.unquote(file_url))
+        log_files = []
+        rpms = []
+        # sort into logs first, rpms later
+        for fname in files_in_dir:
+            fpath = os.path.join(self._result_dir, fname)
+            if fpath.endswith(".log"):
+                log_files.append(fpath)
             else:
-                self._log("  Error: Couldn't get file URL for file %s\n" % f)
+                rpms.append(fpath)
+
+        # Dump file list to log
+        file_list = log_files + rpms
+        i = 1
+        num_files = len(file_list)
+        for fpath in file_list:
+            self._log("  File (%d of %d): %s\n" % (i, num_files,
+                    os.path.basename(fpath)))
+            i = i + 1
         self._log("-----------------\n")
         return file_list
 




More information about the fedora-extras-commits mailing list