extras-buildsys/server ArchJob.py, 1.30, 1.31 Builder.py, 1.39, 1.40 BuilderManager.py, 1.23, 1.24 PackageJob.py, 1.47, 1.48
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Tue May 9 19:10:59 UTC 2006
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv29423/server
Modified Files:
ArchJob.py Builder.py BuilderManager.py PackageJob.py
Log Message:
2006-05-09 Dan Williams <dcbw at redhat.com>
* builder/Builder.py
- Handle the JobFiles request
* common/Commands.py
- Check arguments on PlgCommandJobFiles
* server/ArchJob.py
- pylint cleanups
- Rework result files download code; archjob now requests result files
from the builder, which knows how to handle builder-type specific
result files operations
- get_upload_dir() renamed to get_result_files_dir()
* server/Builder.py
- pylint cleanups
- Move dispatching of common commands into the base Builder class
- (_decompose_job_files_ack): new function; extract and return info
from a JobFilesAck command
- (_handle_job_files_ack): new function; handle a JobFilesAck in the
builder-type specific manner. For Active builders, we don't have
to do much since the file was uploaded to us by the builder itself
* server/BuilderManager.py
- (any_prepping_builders): fix usage of builder.alive() -> builder.available()
* server/PackageJob.py
- Small cleanup of result files bits
Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.30
retrieving revision 1.31
diff -u -r1.30 -r1.31
--- ArchJob.py 9 May 2006 06:04:54 -0000 1.30
+++ ArchJob.py 9 May 2006 19:10:57 -0000 1.31
@@ -15,25 +15,13 @@
# Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
import time
-import string
-import xmlrpclib
-import sys
import socket
import os
import threading
import urllib
import OpenSSL
-from plague import FileDownloader
-from plague import CommonErrors
+from plague import FileTransfer
-DL_RETRIES = 'retries'
-DL_STATUS = 'status'
-DL_WAIT_TIME = 'wait_time'
-
-STATUS_WAITING = 'waiting'
-STATUS_ERROR = 'error'
-STATUS_INPROGRESS = 'in-progress'
-STATUS_DONE = 'done'
class ArchJob:
""" Tracks a single build instance for a single arch on a builder """
@@ -50,8 +38,7 @@
self._internal_failure = False
self._target_dict = target_dict
self._builder_gone = False
- self._download_lock = threading.Lock()
- self._downloads = {}
+ self._result_files = {}
self._starttime = time.time()
self._endtime = 0
self._die = False
@@ -66,12 +53,12 @@
self._failure_noticed = True
def _builder_finished(self):
- if self._builder_status == 'done' or self._builder_status == 'killed' or self._builder_status == 'failed' or self._builder_status == 'orphaned':
+ if self._builder_status in ['done', 'killed', 'failed', 'orphaned']:
return True
return False
def builder_failed(self):
- if self._builder_status == 'killed' or self._builder_status == 'failed':
+ if self._builder_status in ['killed', 'failed']:
return True
return False
@@ -129,23 +116,6 @@
self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
del attrdict
- def _dl_files(self):
- files = []
- success = False
- try:
- files = self._server.files(self.jobid)
- success = True
- except socket.error, e:
- if not CommonErrors.canIgnoreSocketError(e):
- print "%s (%s/%s): [ %s ] Unknown error getting file list: '%s'" % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], self._builder.address(), e)
- except socket.timeout, e:
- print "%s (%s/%s): [ %s ] Timeout getting file list: '%s'" % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], self._builder.address(), e)
- except xmlrpclib.ProtocolError, e:
- pass
- return (success, files)
-
def _is_done_status(self):
if self._status == 'done':
return True
@@ -178,111 +148,50 @@
if self._builder_status != 'prepping':
self._prepping = False
- # if the builder is done, grab list of files to download
if self._builder_finished():
- (success, files) = self._dl_files()
- if success:
- self._set_status('downloading')
- for f in files:
- uf = urllib.unquote(f)
- dl_dict = {}
- dl_dict[DL_STATUS] = STATUS_WAITING
- dl_dict[DL_RETRIES] = 0
- dl_dict[DL_WAIT_TIME] = 0
- self._downloads[uf] = dl_dict
-
- def dl_callback(self, status, cb_data, err_msg):
- url = cb_data
- self._download_lock.acquire()
- dl_dict = self._downloads[url]
- if status == 'done':
- dl_dict[DL_STATUS] = STATUS_DONE
- elif status == 'failed':
- # Retry the download up to 10 times, then fail it
- if dl_dict[DL_RETRIES] >= 10:
- dl_dict[DL_STATUS] = STATUS_ERROR
- else:
- print "%s (%s/%s): Failed to retrieve %s (attempt %d) (Error %s), trying again..." % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], url, dl_dict[DL_RETRIES], err_msg)
- dl_dict[DL_STATUS] = STATUS_WAITING
- dl_dict[DL_WAIT_TIME] = 5 # Wait a bit before trying again
- dl_dict[DL_RETRIES] = dl_dict[DL_RETRIES] + 1
- self._download_lock.release()
+ self._set_status('downloading')
+ self._builder.request_job_files(self.jobid)
- def _print_downloaded_files(self):
+ def get_result_files_dir(self):
+ result_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
+ if not os.path.exists(result_dir):
+ os.makedirs(result_dir)
+ return result_dir
+
+ def _print_downloaded_files(self, files):
file_string = ""
- ndownloads = len(self._downloads.keys())
- for url in self._downloads.keys():
- filename = os.path.basename(url)
- string = "'" + filename + "'"
- dl_dict = self._downloads[url]
- if dl_dict[DL_STATUS] == STATUS_ERROR:
+ nresults = len(files.keys())
+ for fname in files.keys():
+ string = "'" + fname + "'"
+ if files[fname] == FileTransfer.FT_RESULT_FAILED:
string = string + " (failed)"
file_string = file_string + string
- if url != self._downloads.keys()[ndownloads - 1]:
+ if fname != files.keys()[nresults - 1]:
file_string = file_string + ", "
print "%s (%s/%s): Build result files - [ %s ]" % (self.par_job.uid,
self.par_job.package, self._target_dict['arch'], file_string)
- def get_upload_dir(self):
- upload_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
- if not os.path.exists(upload_dir):
- os.makedirs(upload_dir)
- return upload_dir
+ def download_cb(self, files):
+ """Called by the Builder to notify us that our job's files are available.
+ The files argument should be a list of _filenames_, not paths. All files
+ are assumed to be in the directory returned by get_result_files_dir."""
+ if len(files.keys()) == 0:
+ self._download_failed = True
+ else:
+ for fname in files.keys():
+ if files[fname] == FileTransfer.FT_RESULT_FAILED:
+ self._download_failed = True
+ self._result_files = files
+ self._print_downloaded_files(self._result_files)
+
+ self._endtime = time.time()
+ self._set_status('done')
+ self.par_job.wake()
def _status_downloading(self):
- # Start grabbing the next undownloaded file, but only
- # if we aren't already pulling one down
- undownloaded = False
- failed = False
- self._download_lock.acquire()
- for url in self._downloads.keys():
- dl_dict = self._downloads[url]
- dl_status = dl_dict[DL_STATUS]
- if dl_status == STATUS_WAITING:
- # If the download got retried due to a previous
- # download error, we may have to wait a bit
- if dl_dict[DL_WAIT_TIME] > 0:
- dl_dict[DL_WAIT_TIME] = dl_dict[DL_WAIT_TIME] - 1
- undownloaded = True
- continue
-
- # Otherwise, spawn the download thread to grab the file
- target_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
- if not os.path.exists(target_dir):
- os.makedirs(target_dir)
-
- try:
- dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
- target_dir, ['.rpm', '.log'], self._certs)
- dl_thread.start()
- undownloaded = True
- dl_dict[DL_STATUS] = STATUS_INPROGRESS
- except FileDownloader.FileNameException, e:
- print "%s (%s/%s): [ %s ] Bad file name error getting %s: '%s'" % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], self._builder.address(), url, e)
- # Hard error, we don't retry this one
- dl_dict[DL_STATUS] = STATUS_ERROR
- break
- elif dl_status == STATUS_INPROGRESS:
- undownloaded = True
- break
- elif dl_status == STATUS_ERROR:
- failed = True
- continue
- elif dl_status == STATUS_DONE:
- continue
- self._download_lock.release()
-
- # All done downloading?
- if not undownloaded:
- self._print_downloaded_files()
- self._endtime = time.time()
- if failed:
- self._download_failed = True
- self._set_status('done')
- self.par_job.wake()
+ # Wait to be notified that our files are downloaded
+ pass
def process(self):
if self._is_done_status():
@@ -323,20 +232,11 @@
return self._status
def get_files(self):
- """ Return a list of base filenames we got from the builder """
+ """Return a list result files that we got from the builder"""
files = []
- for url in self._downloads.keys():
- try:
- fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
- dl_dict = self._downloads[url]
- dl_status = dl_dict[DL_STATUS]
- if dl_status == STATUS_DONE:
- files.append(fname)
- except FileDownloader.FileNameException, e:
- # Just ignore the file then
- print "%s (%s/%s): Illegal file name. Error: '%s', URL: %s" % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], e, url)
- pass
+ for fname in self._result_files.keys():
+ if self._result_files[fname] == FileTransfer.FT_RESULT_SUCCESS:
+ files.append(fname)
return files
def builder_gone(self):
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -r1.39 -r1.40
--- Builder.py 9 May 2006 06:09:53 -0000 1.39
+++ Builder.py 9 May 2006 19:10:57 -0000 1.40
@@ -17,18 +17,17 @@
import time
import string
import xmlrpclib
-import sys
import socket
import os
import urllib
import threading
from plague import Commands
from plague import XMLRPCServerProxy
-from plague import CommonErrors
+from plague import FileDownloader
+from plague import FileTransfer
import OpenSSL
import ArchJob
import EmailUtils
-import Config
from plague import DebugUtils
SUSPEND_NONE = 'none'
@@ -214,7 +213,6 @@
# and its not done, explicitly get its status
job = self._jobs[jobid]
if jobid not in reported_uniqids and job.get_status() != 'done':
- print "Requesting job status for %s" % jobid
new_cmds.append(Commands.PlgCommandJobStatus(jobid, self._seq_gen.next()))
# Check for prepping jobs
@@ -252,6 +250,9 @@
parent.add_arch_job(archjob)
def _handle_job_status_ack(self, ack):
+ """Handle a job status ack by setting telling the job object
+ what the builder said its status was."""
+
old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandJobStatus)
if old_cmd:
archjob_id = ack.archjob_id()
@@ -259,6 +260,51 @@
job = self._jobs[archjob_id]
job.set_builder_job_status(status)
+ def _decompose_job_files_ack(self, ack):
+ """Handle a job files ack by finding the archjob it's for, then
+ notifying that archjob that its files are available."""
+ old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandJobFiles)
+ if old_cmd:
+ archjob_id = ack.archjob_id()
+ files = ack.files()
+ job = self._jobs[archjob_id]
+ return (job, files)
+ return (None, None)
+
+ def _dispatch_common_command(self, cmd):
+ """Handle commands that are common to all builder types."""
+ handled = True
+ if isinstance(cmd, Commands.PlgCommandSlots):
+ self._lock.acquire()
+ self._free_slots = cmd.free_slots()
+ self._num_slots = cmd.max_slots()
+ self._lock.release()
+ elif isinstance(cmd, Commands.PlgCommandTargets):
+ self._lock.acquire()
+ self._target_list = cmd.targets()
+ self._lock.release()
+ elif isinstance(cmd, Commands.PlgCommandNewJobAck):
+ self._handle_new_job_ack(cmd)
+ elif isinstance(cmd, Commands.PlgCommandBuildingJobs):
+ status_reqs = self._handle_building_jobs(cmd)
+ # Add any additional status requests onto our pending command queue
+ if len(status_reqs) > 0:
+ self._lock.acquire()
+ self._cmd_queue = self._cmd_queue + status_reqs
+ self._lock.release()
+ elif isinstance(cmd, Commands.PlgCommandJobStatusAck):
+ self._handle_job_status_ack(cmd)
+ else:
+ handled = False
+ return handled
+
+ def request_job_files(self, archjob_id):
+ """Construct and send a request for a job's files."""
+ cmd = Commands.PlgCommandJobFiles(archjob_id, self._seq_gen.next())
+ self._lock.acquire()
+ self._cmd_queue.append(cmd)
+ self._lock.release()
+
# HACK: This class is a hack to work around SSL hanging issues,
# which cause the whole server to grind to a halt
@@ -543,38 +589,49 @@
self._cmd_queue.append(cmd)
self._lock.release()
- def request_job_files(self, archjob):
- pass
+ def _handle_job_files_ack(self, cmd):
+ (archjob, urls) = self._decompose_job_files_ack(cmd)
+ if not archjob:
+ return
+
+ if not urls or not len(urls):
+ archjob.download_cb({})
+ return
+
+ # Basic sanity checks; whether the files exist, etc
+ result_files_dir = archjob.get_result_files_dir()
+ files = {}
+ for url in urls:
+ try:
+ fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
+ except FileDownloader.FileNameException, exc:
+ print "Error in JobFilesAck for %s: %s" % (url, exc)
+ continue
+ fpath = os.path.join(result_files_dir, fname)
+ if os.path.exists(fpath):
+ files[fname] = FileTransfer.FT_RESULT_SUCCESS
+ else:
+ files[fname] = FileTransfer.FT_RESULT_FAILED
+ archjob.download_cb(files)
def _dispatch_command(self, cmd):
- name = cmd.name()
- if isinstance(cmd, Commands.PlgCommandSlots):
- self._lock.acquire()
- self._free_slots = cmd.free_slots()
- self._num_slots = cmd.max_slots()
- self._lock.release()
- elif isinstance(cmd, Commands.PlgCommandTargets):
- self._lock.acquire()
- self._target_list = cmd.targets()
- self._lock.release()
- elif isinstance(cmd, Commands.PlgCommandNewJobAck):
- self._handle_new_job_ack(cmd)
- elif isinstance(cmd, Commands.PlgCommandBuildingJobs):
- status_reqs = self._handle_building_jobs(cmd)
- # Add any additional status requests onto our pending command queue
- if len(status_reqs) > 0:
- self._lock.acquire()
- self._cmd_queue = self._cmd_queue + status_reqs
- self._lock.release()
- elif isinstance(cmd, Commands.PlgCommandJobStatusAck):
- self._handle_job_status_ack(cmd)
- else:
+ """Dispatch one command. We let the superclass dispatch the common
+ commands, and handle only those that need action specific to the
+ Active builder type."""
+ handled = self._dispatch_common_command(cmd)
+ if not handled:
+ if isinstance(cmd, Commands.PlgCommandJobFilesAck):
+ self._handle_job_files_ack(cmd)
+ handled = True
+
+ if not handled:
print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name())
def request(self, cmd_list):
"""Process and respond to an active builder's request. Called
from the BuildMaster's XML-RPC server."""
+ # Reset unavailability counters and reactivate builder if needed
self._last_contact = time.time()
self._unavail_count = 0
if not self._available:
@@ -594,15 +651,19 @@
new_cmds.append(cmd)
self._lock.acquire()
+
# Copy command queue
self._cmd_queue = self._cmd_queue + new_cmds
cmd_list = self._cmd_queue[:]
- # Remove commands that don't require an ack
+
+ # Remove commands that don't require an ack,
+ # since we don't need to keep track of those
tmp_cmd_queue = []
for cmd in self._cmd_queue:
if cmd.need_ack():
tmp_cmd_queue.append(cmd)
self._cmd_queue = tmp_cmd_queue
+
self._lock.release()
return cmd_list
Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- BuilderManager.py 9 May 2006 06:04:54 -0000 1.23
+++ BuilderManager.py 9 May 2006 19:10:57 -0000 1.24
@@ -216,7 +216,7 @@
if jobid and filename and tmpfile:
archjob = builder.get_archjob(jobid)
if archjob:
- upload_dir = archjob.get_upload_dir()
+ upload_dir = archjob.get_result_files_dir()
import shutil, urllib
destpath = os.path.join(upload_dir, urllib.unquote(filename))
dest = file(destpath, "w+b")
@@ -369,7 +369,7 @@
def any_prepping_builders(self):
# query each build builder for any jobs that are in the 'prepping' state
for builder in self._builders:
- if builder.alive() and builder.any_prepping_jobs():
+ if builder.available() and builder.any_prepping_jobs():
return True
return False
Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.47
retrieving revision 1.48
diff -u -r1.47 -r1.48
--- PackageJob.py 9 May 2006 02:52:58 -0000 1.47
+++ PackageJob.py 9 May 2006 19:10:57 -0000 1.48
@@ -849,10 +849,11 @@
for job in self.archjobs.values():
if not job:
continue
+ job_result_dir = job.get_result_files_dir()
for f in job.get_files():
if not f.endswith(".rpm"):
continue
- src_file = os.path.join(self._result_dir, job.arch(), f)
+ src_file = os.path.join(job_result_dir, f)
if src_file.endswith(".src.rpm"):
# Keep an SRPM. We prefer built SRPMs from builders over
# the original SRPM.
@@ -874,16 +875,16 @@
for job in self.archjobs.values():
if not job:
continue
+ job_result_dir = job.get_result_files_dir()
for f in job.get_files():
if not f.endswith(".rpm"):
continue
- jobarch = job.arch()
- src_file = os.path.join(self._result_dir, jobarch, f)
+ src_file = os.path.join(job_result_dir, f)
verrel = "%s-%s" % (self.ver, self.release)
if f.endswith(".src.rpm"):
dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, "SRPM")
else:
- dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, jobarch)
+ dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, job.arch())
self.repofiles[src_file] = os.path.join(dst_path, f)
self._event.clear()
More information about the fedora-extras-commits
mailing list