extras-buildsys/server ArchJob.py,1.32,1.33 Builder.py,1.42,1.43

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Sat May 20 05:10:10 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv26467/server

Modified Files:
	ArchJob.py Builder.py 
Log Message:
2006-05-20  Dan Williams  <dcbw at redhat.com>

    * server/Builder.py
      builder/Builder.py
        - Make passive builders work more
        - Consolidate some code into base Builder object in the server
        - Don't hang a job if it fails before we've had a chance to contact
            the builder after starting it




Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -r1.32 -r1.33
--- ArchJob.py	14 May 2006 05:43:07 -0000	1.32
+++ ArchJob.py	20 May 2006 05:10:08 -0000	1.33
@@ -191,6 +191,10 @@
         self._builder.remove_suspend_listener(self)
         self._set_status(AJ_STATUS_DONE)
 
+    def _handle_builder_finished(self):
+        self._set_status(AJ_STATUS_DOWNLOADING)
+        self._builder.request_job_files(self._id)
+
     def _status_queued(self):
         pass
 
@@ -201,7 +205,10 @@
         if self._builder_status == 'downloaded':
             self._set_status(AJ_STATUS_REPO_WAIT)
             self._repo.request_unlock(self)
-            
+
+        if self._builder_finished():
+            self._handle_builder_finished()
+
     def _status_repo_wait(self):
         pass
 
@@ -217,13 +224,15 @@
         if self._builder_status != 'downloaded':
             self._set_status(AJ_STATUS_RUNNING)
 
+        if self._builder_finished():
+            self._handle_builder_finished()
+
     def _status_running(self):
         if self._builder_status != 'prepping':
             self._prepping = False
 
         if self._builder_finished():
-            self._set_status(AJ_STATUS_DOWNLOADING)
-            self._builder.request_job_files(self._id)
+            self._handle_builder_finished()
 
     def get_result_files_dir(self):
         result_dir = os.path.join(self._parent.get_stage_dir(), self._target_dict['arch'])


Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -r1.42 -r1.43
--- Builder.py	16 May 2006 15:49:56 -0000	1.42
+++ Builder.py	20 May 2006 05:10:08 -0000	1.43
@@ -59,6 +59,7 @@
         self._seq_gen = Commands.SequenceGenerator()
         self._lock = threading.Lock()
         self._cmd_queue = []
+        self._ack_pending_list = []
         self._suspend_listeners = []
         self._status_listeners = []
         self._ip = None
@@ -159,6 +160,10 @@
         self._prepping_jobs = False
         self._when_died = time.time()
         self._jobs = {}
+        self._cmd_queue = []
+        self._ack_pending_list = []
+        self._free_slots = 0
+        self._ip = None
 
         self._notify_suspend_listeners(reason, msg)
 
@@ -255,10 +260,10 @@
 
         old_cmd = None
         self._lock.acquire()
-        for queued_cmd in self._cmd_queue:
-            if queued_cmd.seq() == ack.acked_seq() and isinstance(queued_cmd, old_cmd_type):
-                old_cmd = queued_cmd
-                self._cmd_queue.remove(queued_cmd)
+        for cmd in self._ack_pending_list[:]:
+            if cmd.seq() == ack.acked_seq() and isinstance(cmd, old_cmd_type):
+                old_cmd = cmd
+                self._ack_pending_list.remove(cmd)
                 break
         self._lock.release()
         return old_cmd
@@ -416,6 +421,46 @@
             self._certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
             self._certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
 
+    def _download_cb(self, result, (archjob, urls), msg):
+        """Notify archjob of the result of its download request."""
+        if result == FileTransfer.FT_RESULT_FAILED:
+            print "Builder Error (%s): result files download failed for %s (%s). " \
+                    " '%s'" % (self._address, archjob.archjob_id(), archjob.arch(), msg)
+
+        files = {}
+        result_files_dir = archjob.get_result_files_dir()
+        for url in urls:
+            try:
+                fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
+            except FileDownloader.FileNameException, exc:
+                print "Error in JobFilesAck for %s: %s" % (url, exc)
+                continue
+            if result == FileTransfer.FT_RESULT_SUCCESS:
+                fpath = os.path.join(result_files_dir, fname)
+                if os.path.exists(fpath):
+                    files[fname] = FileTransfer.FT_RESULT_SUCCESS
+                else:
+                    files[fname] = FileTransfer.FT_RESULT_FAILED
+            else:
+                files[fname] = FileTransfer.FT_RESULT_FAILED            
+        archjob.download_cb(files)
+
+    def _handle_job_files_ack(self, cmd):
+        (archjob, urls) = self._decompose_job_files_ack(cmd)
+        if not archjob:
+            return
+
+        if not urls or not len(urls):
+            archjob.download_cb({})
+            return
+
+        # Basic sanity checks; whether the files exist, etc
+        result_files_dir = archjob.get_result_files_dir()
+        downloader = FileDownloader.FileDownloader(urls, result_files_dir,
+                ['.rpm', '.log'], self._certs)
+        downloader.set_callback(self._download_cb, (archjob, urls))
+        downloader.start()
+
     def _send_commands(self):
         """Send queued commands to the builder, and then get it's list
         of reply commands."""
@@ -432,13 +477,15 @@
         # Copy command queue
         self._lock.acquire()
         self._cmd_queue = self._cmd_queue + new_cmds
-        cmd_list = self._cmd_queue
-        # FIXME: deal with keeping ack-requiring cmds around
+        cmd_list = self._cmd_queue[:]
+        for cmd in self._cmd_queue:
+            if cmd.need_ack():
+                self._ack_pending_list.append(cmd)
         self._cmd_queue = []
         self._lock.release()
 
         # The actual XML-RPC request runs in a different thread because SSL
-        # calls sometimes hang 
+        # calls sometimes hang
         req = PassiveBuilderRequest(self._address, self._certs, cmd_list)
         curtime = time.time()
         req.start()
@@ -509,7 +556,6 @@
                 self._get_ip()
 
                 # Try to talk to the builder
-                print "builder contact"
                 cmd_list = self._send_commands()
                 if cmd_list:
                     # Builder is alive
@@ -537,14 +583,7 @@
     def _handle_builder_suspend(self, reason, msg):
         Builder._handle_builder_suspend(self, reason, msg)
         self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL
-        self._ip = None
         self._target_list = None
-        # Set free slots to zero so we don't send the
-        # builder a job on first contact
-        self._free_slots = 0
-
-        # Clear out the command queue; we start clean
-        self._cmd_queue = []
 
     def _handle_builder_reactivate(self):
         mail = True
@@ -571,9 +610,6 @@
     def __init__(self, manager, cfg, address, weight):
         Builder.__init__(self, manager, cfg, address, weight, TYPE_ACTIVE)
 
-    def _init_builder(self, target_list):
-        self._target_list = target_list
-
     def _handle_job_files_ack(self, cmd):
         (archjob, urls) = self._decompose_job_files_ack(cmd)
         if not archjob:
@@ -641,13 +677,11 @@
         self._cmd_queue = self._cmd_queue + new_cmds
         cmd_list = self._cmd_queue[:]
 
-        # Remove commands that don't require an ack,
-        # since we don't need to keep track of those
-        tmp_cmd_queue = []
+        # Keep around commands that need an ack
         for cmd in self._cmd_queue:
             if cmd.need_ack():
-                tmp_cmd_queue.append(cmd)
-        self._cmd_queue = tmp_cmd_queue
+                self._ack_pending_list.append(cmd)
+        self._cmd_queue = []
 
         self._lock.release()
         return cmd_list
@@ -678,11 +712,6 @@
     def _handle_builder_suspend(self, reason, msg):
         Builder._handle_builder_suspend(self, reason, msg)
         self._last_contact = 0
-        self._ip = None
-
-        # Clear out the command queue; we start clean when the
-        # builder contacts us again
-        self._cmd_queue = []
 
     def _handle_builder_reactivate(self, cmd_list):
         # Grab an updated target list from the command stream when
@@ -710,5 +739,5 @@
 
         self._lock.acquire()
         Builder._handle_builder_reactivate(self, mail=mail)
-        self._init_builder(target_list)
+        self._target_list = target_list
         self._lock.release()




More information about the fedora-extras-commits mailing list