extras-buildsys/server ArchJob.py, 1.30, 1.31 Builder.py, 1.39, 1.40 BuilderManager.py, 1.23, 1.24 PackageJob.py, 1.47, 1.48

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Tue May 9 19:10:59 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv29423/server

Modified Files:
	ArchJob.py Builder.py BuilderManager.py PackageJob.py 
Log Message:
2006-05-09  Dan Williams  <dcbw at redhat.com>

    * builder/Builder.py
        - Handle the JobFiles request

    * common/Commands.py
        - Check arguments on PlgCommandJobFiles

    * server/ArchJob.py
        - pylint cleanups
        - Rework result files download code; archjob now requests result files
            from the builder, which knows how to handle builder-type specific
            result files operations
        - get_upload_dir() renamed to get_result_files_dir()

    * server/Builder.py
        - pylint cleanups
        - Move dispatching of common commands into the base Builder class
        - (_decompose_job_files_ack): new function; extract and return info
            from a JobFilesAck command
        - (_handle_job_files_ack): new function; handle a JobFilesAck in the
            builder-type specific manner.  For Active builders, we don't have
            to do much since the file was uploaded to us by the builder itself

    * server/BuilderManager.py
        - (any_prepping_builders): fix usage of builder.alive() -> builder.available()

    * server/PackageJob.py
        - Small cleanup of result files bits




Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.30
retrieving revision 1.31
diff -u -r1.30 -r1.31
--- ArchJob.py	9 May 2006 06:04:54 -0000	1.30
+++ ArchJob.py	9 May 2006 19:10:57 -0000	1.31
@@ -15,25 +15,13 @@
 # Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
 
 import time
-import string
-import xmlrpclib
-import sys
 import socket
 import os
 import threading
 import urllib
 import OpenSSL
-from plague import FileDownloader
-from plague import CommonErrors
+from plague import FileTransfer
 
-DL_RETRIES = 'retries'
-DL_STATUS = 'status'
-DL_WAIT_TIME = 'wait_time'
-
-STATUS_WAITING = 'waiting'
-STATUS_ERROR = 'error'
-STATUS_INPROGRESS = 'in-progress'
-STATUS_DONE = 'done'
 
 class ArchJob:
     """ Tracks a single build instance for a single arch on a builder """
@@ -50,8 +38,7 @@
         self._internal_failure = False
         self._target_dict = target_dict
         self._builder_gone = False
-        self._download_lock = threading.Lock()
-        self._downloads = {}
+        self._result_files = {}
         self._starttime = time.time()
         self._endtime = 0
         self._die = False
@@ -66,12 +53,12 @@
         self._failure_noticed = True
 
     def _builder_finished(self):
-        if self._builder_status == 'done' or self._builder_status == 'killed' or self._builder_status == 'failed' or self._builder_status == 'orphaned':
+        if self._builder_status in ['done', 'killed', 'failed', 'orphaned']:
             return True
         return False
 
     def builder_failed(self):
-        if self._builder_status == 'killed' or self._builder_status == 'failed':
+        if self._builder_status in ['killed', 'failed']:
             return True
         return False
 
@@ -129,23 +116,6 @@
             self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
             del attrdict
 
-    def _dl_files(self):
-        files = []
-        success = False
-        try:
-            files = self._server.files(self.jobid)
-            success = True
-        except socket.error, e:
-            if not CommonErrors.canIgnoreSocketError(e):
-                print "%s (%s/%s): [ %s ] Unknown error getting file list: '%s'" % (self.par_job.uid,
-                            self.par_job.package, self._target_dict['arch'], self._builder.address(), e)
-        except socket.timeout, e:
-            print "%s (%s/%s): [ %s ] Timeout getting file list: '%s'" % (self.par_job.uid,
-                        self.par_job.package, self._target_dict['arch'], self._builder.address(), e)
-        except xmlrpclib.ProtocolError, e:
-            pass
-        return (success, files)
-
     def _is_done_status(self):
         if self._status == 'done':
             return True
@@ -178,111 +148,50 @@
         if self._builder_status != 'prepping':
             self._prepping = False
 
-        # if the builder is done, grab list of files to download
         if self._builder_finished():
-            (success, files) = self._dl_files()
-            if success:
-                self._set_status('downloading')
-                for f in files:
-                    uf = urllib.unquote(f)
-                    dl_dict = {}
-                    dl_dict[DL_STATUS] = STATUS_WAITING
-                    dl_dict[DL_RETRIES] = 0
-                    dl_dict[DL_WAIT_TIME] = 0
-                    self._downloads[uf] = dl_dict
-
-    def dl_callback(self, status, cb_data, err_msg):
-        url = cb_data
-        self._download_lock.acquire()
-        dl_dict = self._downloads[url]
-        if status == 'done':
-            dl_dict[DL_STATUS] = STATUS_DONE
-        elif status == 'failed':
-            # Retry the download up to 10 times, then fail it
-            if dl_dict[DL_RETRIES] >= 10:
-                dl_dict[DL_STATUS] = STATUS_ERROR
-            else:
-                print "%s (%s/%s): Failed to retrieve %s (attempt %d) (Error %s), trying again..." % (self.par_job.uid,
-                        self.par_job.package, self._target_dict['arch'], url, dl_dict[DL_RETRIES], err_msg)
-                dl_dict[DL_STATUS] = STATUS_WAITING
-                dl_dict[DL_WAIT_TIME] = 5  # Wait a bit before trying again
-                dl_dict[DL_RETRIES] = dl_dict[DL_RETRIES] + 1
-        self._download_lock.release()
+            self._set_status('downloading')
+            self._builder.request_job_files(self.jobid)
 
-    def _print_downloaded_files(self):
+    def get_result_files_dir(self):
+        result_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
+        if not os.path.exists(result_dir):
+            os.makedirs(result_dir)
+        return result_dir
+
+    def _print_downloaded_files(self, files):
         file_string = ""
-        ndownloads = len(self._downloads.keys())
-        for url in self._downloads.keys():
-            filename = os.path.basename(url)
-            string = "'" + filename + "'"
-            dl_dict = self._downloads[url]
-            if dl_dict[DL_STATUS] == STATUS_ERROR:
+        nresults = len(files.keys())
+        for fname in files.keys():
+            string = "'" + fname + "'"
+            if files[fname] == FileTransfer.FT_RESULT_FAILED:
                 string = string + " (failed)"
             file_string = file_string + string
-            if url != self._downloads.keys()[ndownloads - 1]:
+            if fname != files.keys()[nresults - 1]:
                 file_string = file_string + ", "
 
         print "%s (%s/%s): Build result files - [ %s ]" % (self.par_job.uid,
                     self.par_job.package, self._target_dict['arch'], file_string)
 
-    def get_upload_dir(self):
-        upload_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
-        if not os.path.exists(upload_dir):
-            os.makedirs(upload_dir)
-        return upload_dir
+    def download_cb(self, files):
+        """Called by the Builder to notify us that our job's files are available.
+        The files argument should be a list of _filenames_, not paths.  All files
+        are assumed to be in the directory returned by get_result_files_dir."""
+        if len(files.keys()) == 0:
+            self._download_failed = True
+        else:
+            for fname in files.keys():
+                if files[fname] == FileTransfer.FT_RESULT_FAILED:
+                    self._download_failed = True
+        self._result_files = files
+        self._print_downloaded_files(self._result_files)
+
+        self._endtime = time.time()
+        self._set_status('done')
+        self.par_job.wake()
 
     def _status_downloading(self):
-        # Start grabbing the next undownloaded file, but only
-        # if we aren't already pulling one down
-        undownloaded = False
-        failed = False
-        self._download_lock.acquire()
-        for url in self._downloads.keys():
-            dl_dict = self._downloads[url]
-            dl_status = dl_dict[DL_STATUS]
-            if dl_status == STATUS_WAITING:
-                # If the download got retried due to a previous
-                # download error, we may have to wait a bit
-                if dl_dict[DL_WAIT_TIME] > 0:
-                    dl_dict[DL_WAIT_TIME] = dl_dict[DL_WAIT_TIME] - 1
-                    undownloaded = True
-                    continue
-
-                # Otherwise, spawn the download thread to grab the file
-                target_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
-                if not os.path.exists(target_dir):
-                    os.makedirs(target_dir)
-
-                try:
-                    dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
-                                    target_dir, ['.rpm', '.log'], self._certs)
-                    dl_thread.start()
-                    undownloaded = True
-                    dl_dict[DL_STATUS] = STATUS_INPROGRESS
-                except FileDownloader.FileNameException, e:
-                    print "%s (%s/%s): [ %s ] Bad file name error getting %s: '%s'" % (self.par_job.uid,
-                                self.par_job.package, self._target_dict['arch'], self._builder.address(), url, e)
-                    # Hard error, we don't retry this one
-                    dl_dict[DL_STATUS] = STATUS_ERROR
-                break
-            elif dl_status == STATUS_INPROGRESS:
-                undownloaded = True
-                break
-            elif dl_status == STATUS_ERROR:
-                failed = True
-                continue
-            elif dl_status == STATUS_DONE:
-                continue
-        self._download_lock.release()
-
-        # All done downloading?
-        if not undownloaded:
-            self._print_downloaded_files()
-            self._endtime = time.time()
-            if failed:
-                self._download_failed = True
-            self._set_status('done')
-            self.par_job.wake()
+        # Wait to be notified that our files are downloaded
+        pass
 
     def process(self):
         if self._is_done_status():
@@ -323,20 +232,11 @@
         return self._status
 
     def get_files(self):
-        """ Return a list of base filenames we got from the builder """
+        """Return a list result files that we got from the builder"""
         files = []
-        for url in self._downloads.keys():
-            try:
-                fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
-                dl_dict = self._downloads[url]
-                dl_status = dl_dict[DL_STATUS]
-                if dl_status == STATUS_DONE:
-                    files.append(fname)
-            except FileDownloader.FileNameException, e:
-                # Just ignore the file then
-                print "%s (%s/%s): Illegal file name.  Error: '%s', URL: %s" % (self.par_job.uid,
-                        self.par_job.package, self._target_dict['arch'], e, url)
-                pass
+        for fname in self._result_files.keys():
+            if self._result_files[fname] == FileTransfer.FT_RESULT_SUCCESS:
+                files.append(fname)
         return files
 
     def builder_gone(self):


Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -r1.39 -r1.40
--- Builder.py	9 May 2006 06:09:53 -0000	1.39
+++ Builder.py	9 May 2006 19:10:57 -0000	1.40
@@ -17,18 +17,17 @@
 import time
 import string
 import xmlrpclib
-import sys
 import socket
 import os
 import urllib
 import threading
 from plague import Commands
 from plague import XMLRPCServerProxy
-from plague import CommonErrors
+from plague import FileDownloader
+from plague import FileTransfer
 import OpenSSL
 import ArchJob
 import EmailUtils
-import Config
 from plague import DebugUtils
 
 SUSPEND_NONE = 'none'
@@ -214,7 +213,6 @@
             # and its not done, explicitly get its status
             job = self._jobs[jobid]
             if jobid not in reported_uniqids and job.get_status() != 'done':
-                print "Requesting job status for %s" % jobid
                 new_cmds.append(Commands.PlgCommandJobStatus(jobid, self._seq_gen.next()))                    
 
             # Check for prepping jobs
@@ -252,6 +250,9 @@
             parent.add_arch_job(archjob)
 
     def _handle_job_status_ack(self, ack):
+        """Handle a job status ack by setting telling the job object
+        what the builder said its status was."""
+
         old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandJobStatus)
         if old_cmd:
             archjob_id = ack.archjob_id()
@@ -259,6 +260,51 @@
             job = self._jobs[archjob_id]
             job.set_builder_job_status(status)
 
+    def _decompose_job_files_ack(self, ack):
+        """Handle a job files ack by finding the archjob it's for, then
+        notifying that archjob that its files are available."""
+        old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandJobFiles)
+        if old_cmd:
+            archjob_id = ack.archjob_id()
+            files = ack.files()
+            job = self._jobs[archjob_id]
+            return (job, files)
+        return (None, None)
+
+    def _dispatch_common_command(self, cmd):
+        """Handle commands that are common to all builder types."""
+        handled = True
+        if isinstance(cmd, Commands.PlgCommandSlots):
+            self._lock.acquire()
+            self._free_slots = cmd.free_slots()
+            self._num_slots = cmd.max_slots()
+            self._lock.release()
+        elif isinstance(cmd, Commands.PlgCommandTargets):
+            self._lock.acquire()
+            self._target_list = cmd.targets()
+            self._lock.release()
+        elif isinstance(cmd, Commands.PlgCommandNewJobAck):
+            self._handle_new_job_ack(cmd)
+        elif isinstance(cmd, Commands.PlgCommandBuildingJobs):
+            status_reqs = self._handle_building_jobs(cmd)
+            # Add any additional status requests onto our pending command queue
+            if len(status_reqs) > 0:
+                self._lock.acquire()
+                self._cmd_queue = self._cmd_queue + status_reqs
+                self._lock.release()
+        elif isinstance(cmd, Commands.PlgCommandJobStatusAck):
+            self._handle_job_status_ack(cmd)
+        else:
+            handled = False
+        return handled
+
+    def request_job_files(self, archjob_id):
+        """Construct and send a request for a job's files."""
+        cmd = Commands.PlgCommandJobFiles(archjob_id, self._seq_gen.next())
+        self._lock.acquire()
+        self._cmd_queue.append(cmd)
+        self._lock.release()
+
 
 # HACK: This class is a hack to work around SSL hanging issues,
 # which cause the whole server to grind to a halt
@@ -543,38 +589,49 @@
             self._cmd_queue.append(cmd)
         self._lock.release()
 
-    def request_job_files(self, archjob):
-        pass
+    def _handle_job_files_ack(self, cmd):
+        (archjob, urls) = self._decompose_job_files_ack(cmd)
+        if not archjob:
+            return
+
+        if not urls or not len(urls):
+            archjob.download_cb({})
+            return
+
+        # Basic sanity checks; whether the files exist, etc
+        result_files_dir = archjob.get_result_files_dir()
+        files = {}
+        for url in urls:
+            try:
+                fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
+            except FileDownloader.FileNameException, exc:
+                print "Error in JobFilesAck for %s: %s" % (url, exc)
+                continue
+            fpath = os.path.join(result_files_dir, fname)
+            if os.path.exists(fpath):
+                files[fname] = FileTransfer.FT_RESULT_SUCCESS
+            else:
+                files[fname] = FileTransfer.FT_RESULT_FAILED
+        archjob.download_cb(files)
 
     def _dispatch_command(self, cmd):
-        name = cmd.name()
-        if isinstance(cmd, Commands.PlgCommandSlots):
-            self._lock.acquire()
-            self._free_slots = cmd.free_slots()
-            self._num_slots = cmd.max_slots()
-            self._lock.release()
-        elif isinstance(cmd, Commands.PlgCommandTargets):
-            self._lock.acquire()
-            self._target_list = cmd.targets()
-            self._lock.release()
-        elif isinstance(cmd, Commands.PlgCommandNewJobAck):
-            self._handle_new_job_ack(cmd)
-        elif isinstance(cmd, Commands.PlgCommandBuildingJobs):
-            status_reqs = self._handle_building_jobs(cmd)
-            # Add any additional status requests onto our pending command queue
-            if len(status_reqs) > 0:
-                self._lock.acquire()
-                self._cmd_queue = self._cmd_queue + status_reqs
-                self._lock.release()
-        elif isinstance(cmd, Commands.PlgCommandJobStatusAck):
-            self._handle_job_status_ack(cmd)
-        else:
+        """Dispatch one command.  We let the superclass dispatch the common
+        commands, and handle only those that need action specific to the
+        Active builder type."""
+        handled = self._dispatch_common_command(cmd)
+        if not handled:
+            if isinstance(cmd, Commands.PlgCommandJobFilesAck):
+                self._handle_job_files_ack(cmd)
+                handled = True
+
+        if not handled:
             print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name())
 
     def request(self, cmd_list):
         """Process and respond to an active builder's request.  Called
         from the BuildMaster's XML-RPC server."""
 
+        # Reset unavailability counters and reactivate builder if needed
         self._last_contact = time.time()
         self._unavail_count = 0
         if not self._available:
@@ -594,15 +651,19 @@
                 new_cmds.append(cmd)
 
         self._lock.acquire()
+
         # Copy command queue
         self._cmd_queue = self._cmd_queue + new_cmds
         cmd_list = self._cmd_queue[:]
-        # Remove commands that don't require an ack
+
+        # Remove commands that don't require an ack,
+        # since we don't need to keep track of those
         tmp_cmd_queue = []
         for cmd in self._cmd_queue:
             if cmd.need_ack():
                 tmp_cmd_queue.append(cmd)
         self._cmd_queue = tmp_cmd_queue
+
         self._lock.release()
         return cmd_list
 


Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- BuilderManager.py	9 May 2006 06:04:54 -0000	1.23
+++ BuilderManager.py	9 May 2006 19:10:57 -0000	1.24
@@ -216,7 +216,7 @@
         if jobid and filename and tmpfile:
             archjob = builder.get_archjob(jobid)
             if archjob:
-                upload_dir = archjob.get_upload_dir()
+                upload_dir = archjob.get_result_files_dir()
                 import shutil, urllib
                 destpath = os.path.join(upload_dir, urllib.unquote(filename))
                 dest = file(destpath, "w+b")
@@ -369,7 +369,7 @@
     def any_prepping_builders(self):
         # query each build builder for any jobs that are in the 'prepping' state
         for builder in self._builders:
-            if builder.alive() and builder.any_prepping_jobs():
+            if builder.available() and builder.any_prepping_jobs():
                 return True
         return False
 


Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.47
retrieving revision 1.48
diff -u -r1.47 -r1.48
--- PackageJob.py	9 May 2006 02:52:58 -0000	1.47
+++ PackageJob.py	9 May 2006 19:10:57 -0000	1.48
@@ -849,10 +849,11 @@
         for job in self.archjobs.values():
             if not job:
                 continue
+            job_result_dir = job.get_result_files_dir()
             for f in job.get_files():
                 if not f.endswith(".rpm"):
                     continue
-                src_file = os.path.join(self._result_dir, job.arch(), f)
+                src_file = os.path.join(job_result_dir, f)
                 if src_file.endswith(".src.rpm"):
                     # Keep an SRPM.  We prefer built SRPMs from builders over
                     # the original SRPM.
@@ -874,16 +875,16 @@
         for job in self.archjobs.values():
             if not job:
                 continue
+            job_result_dir = job.get_result_files_dir()
             for f in job.get_files():
                 if not f.endswith(".rpm"):
                     continue
-                jobarch = job.arch()
-                src_file = os.path.join(self._result_dir, jobarch, f)
+                src_file = os.path.join(job_result_dir, f)
                 verrel = "%s-%s" % (self.ver, self.release)
                 if f.endswith(".src.rpm"):
                     dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, "SRPM")
                 else:
-                    dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, jobarch)
+                    dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, job.arch())
                 self.repofiles[src_file] = os.path.join(dst_path, f)
 
         self._event.clear()




More information about the fedora-extras-commits mailing list