extras-buildsys/server ArchJob.py,1.32,1.33 Builder.py,1.42,1.43
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Sat May 20 05:10:10 UTC 2006
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv26467/server
Modified Files:
ArchJob.py Builder.py
Log Message:
2006-05-20 Dan Williams <dcbw at redhat.com>
* server/Builder.py
builder/Builder.py
- Make passive builders work more
- Consolidate some code into base Builder object in the server
- Don't hang a job if it fails before we've had a chance to contact
the builder after starting it
Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -r1.32 -r1.33
--- ArchJob.py 14 May 2006 05:43:07 -0000 1.32
+++ ArchJob.py 20 May 2006 05:10:08 -0000 1.33
@@ -191,6 +191,10 @@
self._builder.remove_suspend_listener(self)
self._set_status(AJ_STATUS_DONE)
+ def _handle_builder_finished(self):
+ self._set_status(AJ_STATUS_DOWNLOADING)
+ self._builder.request_job_files(self._id)
+
def _status_queued(self):
pass
@@ -201,7 +205,10 @@
if self._builder_status == 'downloaded':
self._set_status(AJ_STATUS_REPO_WAIT)
self._repo.request_unlock(self)
-
+
+ if self._builder_finished():
+ self._handle_builder_finished()
+
def _status_repo_wait(self):
pass
@@ -217,13 +224,15 @@
if self._builder_status != 'downloaded':
self._set_status(AJ_STATUS_RUNNING)
+ if self._builder_finished():
+ self._handle_builder_finished()
+
def _status_running(self):
if self._builder_status != 'prepping':
self._prepping = False
if self._builder_finished():
- self._set_status(AJ_STATUS_DOWNLOADING)
- self._builder.request_job_files(self._id)
+ self._handle_builder_finished()
def get_result_files_dir(self):
result_dir = os.path.join(self._parent.get_stage_dir(), self._target_dict['arch'])
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -r1.42 -r1.43
--- Builder.py 16 May 2006 15:49:56 -0000 1.42
+++ Builder.py 20 May 2006 05:10:08 -0000 1.43
@@ -59,6 +59,7 @@
self._seq_gen = Commands.SequenceGenerator()
self._lock = threading.Lock()
self._cmd_queue = []
+ self._ack_pending_list = []
self._suspend_listeners = []
self._status_listeners = []
self._ip = None
@@ -159,6 +160,10 @@
self._prepping_jobs = False
self._when_died = time.time()
self._jobs = {}
+ self._cmd_queue = []
+ self._ack_pending_list = []
+ self._free_slots = 0
+ self._ip = None
self._notify_suspend_listeners(reason, msg)
@@ -255,10 +260,10 @@
old_cmd = None
self._lock.acquire()
- for queued_cmd in self._cmd_queue:
- if queued_cmd.seq() == ack.acked_seq() and isinstance(queued_cmd, old_cmd_type):
- old_cmd = queued_cmd
- self._cmd_queue.remove(queued_cmd)
+ for cmd in self._ack_pending_list[:]:
+ if cmd.seq() == ack.acked_seq() and isinstance(cmd, old_cmd_type):
+ old_cmd = cmd
+ self._ack_pending_list.remove(cmd)
break
self._lock.release()
return old_cmd
@@ -416,6 +421,46 @@
self._certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
self._certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
+ def _download_cb(self, result, (archjob, urls), msg):
+ """Notify archjob of the result of its download request."""
+ if result == FileTransfer.FT_RESULT_FAILED:
+ print "Builder Error (%s): result files download failed for %s (%s). " \
+ " '%s'" % (self._address, archjob.archjob_id(), archjob.arch(), msg)
+
+ files = {}
+ result_files_dir = archjob.get_result_files_dir()
+ for url in urls:
+ try:
+ fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
+ except FileDownloader.FileNameException, exc:
+ print "Error in JobFilesAck for %s: %s" % (url, exc)
+ continue
+ if result == FileTransfer.FT_RESULT_SUCCESS:
+ fpath = os.path.join(result_files_dir, fname)
+ if os.path.exists(fpath):
+ files[fname] = FileTransfer.FT_RESULT_SUCCESS
+ else:
+ files[fname] = FileTransfer.FT_RESULT_FAILED
+ else:
+ files[fname] = FileTransfer.FT_RESULT_FAILED
+ archjob.download_cb(files)
+
+ def _handle_job_files_ack(self, cmd):
+ (archjob, urls) = self._decompose_job_files_ack(cmd)
+ if not archjob:
+ return
+
+ if not urls or not len(urls):
+ archjob.download_cb({})
+ return
+
+ # Basic sanity checks; whether the files exist, etc
+ result_files_dir = archjob.get_result_files_dir()
+ downloader = FileDownloader.FileDownloader(urls, result_files_dir,
+ ['.rpm', '.log'], self._certs)
+ downloader.set_callback(self._download_cb, (archjob, urls))
+ downloader.start()
+
def _send_commands(self):
"""Send queued commands to the builder, and then get it's list
of reply commands."""
@@ -432,13 +477,15 @@
# Copy command queue
self._lock.acquire()
self._cmd_queue = self._cmd_queue + new_cmds
- cmd_list = self._cmd_queue
- # FIXME: deal with keeping ack-requiring cmds around
+ cmd_list = self._cmd_queue[:]
+ for cmd in self._cmd_queue:
+ if cmd.need_ack():
+ self._ack_pending_list.append(cmd)
self._cmd_queue = []
self._lock.release()
# The actual XML-RPC request runs in a different thread because SSL
- # calls sometimes hang
+ # calls sometimes hang
req = PassiveBuilderRequest(self._address, self._certs, cmd_list)
curtime = time.time()
req.start()
@@ -509,7 +556,6 @@
self._get_ip()
# Try to talk to the builder
- print "builder contact"
cmd_list = self._send_commands()
if cmd_list:
# Builder is alive
@@ -537,14 +583,7 @@
def _handle_builder_suspend(self, reason, msg):
Builder._handle_builder_suspend(self, reason, msg)
self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL
- self._ip = None
self._target_list = None
- # Set free slots to zero so we don't send the
- # builder a job on first contact
- self._free_slots = 0
-
- # Clear out the command queue; we start clean
- self._cmd_queue = []
def _handle_builder_reactivate(self):
mail = True
@@ -571,9 +610,6 @@
def __init__(self, manager, cfg, address, weight):
Builder.__init__(self, manager, cfg, address, weight, TYPE_ACTIVE)
- def _init_builder(self, target_list):
- self._target_list = target_list
-
def _handle_job_files_ack(self, cmd):
(archjob, urls) = self._decompose_job_files_ack(cmd)
if not archjob:
@@ -641,13 +677,11 @@
self._cmd_queue = self._cmd_queue + new_cmds
cmd_list = self._cmd_queue[:]
- # Remove commands that don't require an ack,
- # since we don't need to keep track of those
- tmp_cmd_queue = []
+ # Keep around commands that need an ack
for cmd in self._cmd_queue:
if cmd.need_ack():
- tmp_cmd_queue.append(cmd)
- self._cmd_queue = tmp_cmd_queue
+ self._ack_pending_list.append(cmd)
+ self._cmd_queue = []
self._lock.release()
return cmd_list
@@ -678,11 +712,6 @@
def _handle_builder_suspend(self, reason, msg):
Builder._handle_builder_suspend(self, reason, msg)
self._last_contact = 0
- self._ip = None
-
- # Clear out the command queue; we start clean when the
- # builder contacts us again
- self._cmd_queue = []
def _handle_builder_reactivate(self, cmd_list):
# Grab an updated target list from the command stream when
@@ -710,5 +739,5 @@
self._lock.acquire()
Builder._handle_builder_reactivate(self, mail=mail)
- self._init_builder(target_list)
+ self._target_list = target_list
self._lock.release()
More information about the fedora-extras-commits
mailing list