extras-buildsys/builder Builder.py,1.3,1.4 BuilderMock.py,1.2,1.3
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Tue May 9 02:57:14 UTC 2006
Author: dcbw
Update of /cvs/fedora/extras-buildsys/builder
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv359/builder
Modified Files:
Builder.py BuilderMock.py
Log Message:
2006-05-08 Dan Williams <dcbw at redhat.com>
* builder/Builder.py
- pylint cleanups
- make SRPM downloads and uploads work
* builder/BuilderMock.py
- pyling cleanups
- remove log()
- make download and upload work
* server/Builder.py
- Don't send multiple UnlockRepo commands
- Actually let archjobs have some processing time
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/builder/Builder.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- Builder.py 5 May 2006 02:10:38 -0000 1.3
+++ Builder.py 9 May 2006 02:57:11 -0000 1.4
@@ -20,20 +20,22 @@
import socket
import time
import threading
-import sha
-import exceptions
+import urllib
import xmlrpclib
import OpenSSL
from plague import Commands
from plague import AuthedXMLRPCServer
from plague import HTTPServer
from plague import XMLRPCServerProxy
+from plague import FileDownloader
+from plague import FileUploader
import Config
import BuilderMock
-class BuilderInitException(Exception): pass
+class BuilderInitException(Exception):
+ pass
def get_hostname(cfg, bind_all):
cfg_hostname = cfg.get_str("Passive", "hostname")
@@ -49,10 +51,10 @@
import commands
max_jobs = 1
cmd = "/usr/bin/getconf _NPROCESSORS_ONLN"
- (s, o) = commands.getstatusoutput(cmd)
- if s == 0:
+ (status, output) = commands.getstatusoutput(cmd)
+ if status == 0:
try:
- max_jobs = int(o)
+ max_jobs = int(output)
except ValueError:
pass
return max_jobs
@@ -62,75 +64,6 @@
sys.stdout.flush()
-class PassiveBuilderRequestHandler:
- def __init__(self, cfg, builder):
- self._builder = builder
- self._all_jobs = {} # unique id => awclass instance
- self._building_jobs_lock = threading.Lock()
- self._building_jobs = []
- self._cfg = cfg
-
- def _log(self, string):
- if self._cfg.get_bool("General", "debug"):
- print string
-
- def notify_job_done(self, archjob):
- self._building_jobs_lock.acquire()
- if archjob in self._building_jobs:
- self._building_jobs.remove(archjob)
- self._building_jobs_lock.release()
-
- def die(self, uniqid):
- try:
- job = self._all_jobs[uniqid]
- job.die()
- except KeyError:
- pass
- return 0
-
- def files(self, uniqid):
- try:
- job = self._all_jobs[uniqid]
- return job.files()
- except KeyError:
- pass
- return []
-
- def repo_unlocked(self, uniqid):
- try:
- job = self._all_jobs[uniqid]
- job.unlock_repo()
- except KeyError:
- pass
- return 0
-
- def building_jobs(self):
- jobs = {}
- self._building_jobs_lock.acquire()
- building = 0
- for job in self._building_jobs:
- jobs[job.uniqid()] = job.status()
- building = building + 1
- free = self._max_jobs - building
- self._building_jobs_lock.release()
- return (jobs, free)
-
- def num_slots(self):
- (free_slots, max_slots) = self._builder.slots()
- return max_slots
-
- def job_status(self, uniqid):
- try:
- job = self._all_jobs[uniqid]
- return job.status()
- except KeyError:
- pass
- return ''
-
- def supported_targets(self):
- return self._builder.supported_targets()
-
-
class Builder(object):
""" Abstract builder base object """
def __init__(self, cfg):
@@ -174,13 +107,13 @@
bcs.die(jobid)
# wait for the jobs to clean up before quitting
- log("Waiting for running jobs to stop...")
+ self._log("Waiting for running jobs to stop...")
while True:
(building_jobs, free) = bcs.building_jobs()
if len(building_jobs.keys()) == 0:
break
try:
- log(".")
+ self._log(".")
time.sleep(0.5)
except KeyboardInterrupt:
break
@@ -193,12 +126,18 @@
def supported_targets(self):
targets = []
- for t in self._cfg.targets():
- td = t.target_dict()
- td['supported_arches'] = t.arches()
- targets.append(td)
+ for target in self._cfg.targets():
+ target_dict = target.target_dict()
+ target_dict['supported_arches'] = target.arches()
+ targets.append(target_dict)
return targets
+ def notify_job_done(self, archjob):
+ self._building_jobs_lock.acquire()
+ if archjob in self._building_jobs:
+ self._building_jobs.remove(archjob)
+ self._building_jobs_lock.release()
+
def _get_target_cfg(self, target_dict):
target_cfg = None
@@ -239,10 +178,10 @@
uniqid = -1
msg = "Success"
- (free, max) = self.slots()
- if free <= 0:
+ (free_slots, max_slots) = self.slots()
+ if free_slots <= 0:
msg = "Error: Tried to build '%s' on target %s when already building" \
- " maximum (%d) jobs" % (srpm_url, target_str, max)
+ " maximum (%d) jobs" % (srpm_url, target_str, max_slots)
self._log(msg)
return (uniqid, msg)
@@ -263,16 +202,15 @@
filename = os.path.basename(srpm_url)
msg = "%s: started %s on %s arch %s at time %d" % (uniqid, filename,
target_str, target_dict['arch'], archjob.starttime())
-# job.start()
- except (OSError, TypeError), err:
+ archjob.start()
+ except (OSError, TypeError), exc:
msg = "Failed request for %s on %s: '%s'" % (srpm_url,
- target_str, err)
+ target_str, exc)
self._log(msg)
return (uniqid, msg)
def _get_building_jobs_cmd(self):
- jobs_list = []
cmd = Commands.PlgCommandBuildingJobs(self._seq_gen.next())
self._building_jobs_lock.acquire()
for job in self._building_jobs:
@@ -283,7 +221,7 @@
def _handle_unlock_repo_request(self, cmd):
uniqid = cmd.archjob_id()
self._building_jobs_lock.acquire()
- for item in self._building_jobs:
+ for job in self._building_jobs:
if job.uniqid() == uniqid:
job.unlock_repo()
self._building_jobs_lock.release()
@@ -294,10 +232,74 @@
uniqid = cmd.archjob_id()
job = self._all_jobs[uniqid]
reply = Commands.PlgCommandJobStatusAck(uniqid, job.status(), cmd.seq(), self._seq_gen.next())
- except KeyError, e:
+ except KeyError:
pass
return reply
+
+class PassiveBuilderRequestHandler:
+ def __init__(self, cfg, builder):
+ self._builder = builder
+ self._all_jobs = {} # unique id => awclass instance
+ self._building_jobs_lock = threading.Lock()
+ self._building_jobs = []
+ self._cfg = cfg
+
+ def _log(self, string):
+ if self._cfg.get_bool("General", "debug"):
+ print string
+
+ def die(self, uniqid):
+ try:
+ job = self._all_jobs[uniqid]
+ job.die()
+ except KeyError:
+ pass
+ return 0
+
+ def files(self, uniqid):
+ try:
+ job = self._all_jobs[uniqid]
+ return job.files()
+ except KeyError:
+ pass
+ return []
+
+ def repo_unlocked(self, uniqid):
+ try:
+ job = self._all_jobs[uniqid]
+ job.unlock_repo()
+ except KeyError:
+ pass
+ return 0
+
+ def building_jobs(self):
+ jobs = {}
+ self._building_jobs_lock.acquire()
+ building = 0
+ for job in self._building_jobs:
+ jobs[job.uniqid()] = job.status()
+ building = building + 1
+ free = self._max_jobs - building
+ self._building_jobs_lock.release()
+ return (jobs, free)
+
+ def num_slots(self):
+ (free_slots, max_slots) = self._builder.slots()
+ return max_slots
+
+ def job_status(self, uniqid):
+ try:
+ job = self._all_jobs[uniqid]
+ return job.status()
+ except KeyError:
+ pass
+ return ''
+
+ def supported_targets(self):
+ return self._builder.supported_targets()
+
+
class PassiveBuilder(Builder):
"""
Passive builders initiate no communication of their own. They wait
@@ -317,15 +319,15 @@
self._http_server = HTTPServer.PlgHTTPServerManager((hostname, port), work_dir, self._certs)
self._http_server.start()
- log("Binding to address '%s' with arches: [%s]\n" % (hostname, string.join(build_arches, ",")))
+ self._log("Binding to address '%s' with arches: [%s]\n" % (hostname, string.join(build_arches, ",")))
xmlrpc_port = cfg.get_int("Passive", "xmlrpc_port")
try:
if cfg.get_bool("SSL", "use_ssl") == True:
self._xmlrpc_server = AuthedXMLRPCServer.AuthedSSLXMLRPCServer((hostname, xmlrpc_port), None, self._certs)
else:
self._xmlrpc_server = AuthedXMLRPCServer.AuthedXMLRPCServer((hostname, xmlrpc_port), None)
- except socket.error, e:
- if e[0] == 98:
+ except socket.error, exc:
+ if exc[0] == 98:
raise BuilderInitException("Error: couldn't bind to address '%s:%s'. " \
"Is the builder already running?\n" % (hostname, xmlrpc_port))
@@ -339,6 +341,12 @@
except KeyboardInterrupt:
pass
+ def download_srpm(self, url, target_dir, dl_callback, cb_data):
+ """For passive builders, the server uploads the RPM to the builder.
+ Therefore, we already have it. Move it from the HTTP server's upload
+ directory to the requested target_dir, if the SRPM exists."""
+ pass
+
def _stop_servers(self):
self._http_server.stop()
self._xmlrpc_server.stop()
@@ -373,8 +381,8 @@
self.response = self._server.request(cmd_stream)
except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
self.failed = True
- except xmlrpclib.Fault, e:
- print "Builder Error (%s) in request(): server replied '%s'" % (self._address, e)
+ except xmlrpclib.Fault, exc:
+ print "Builder Error (%s) in request(): server replied '%s'" % (self._address, exc)
self.failed = True
self.done = True
@@ -414,6 +422,30 @@
except KeyboardInterrupt:
pass
+ def download_srpm(self, url, target_dir, dl_callback, cb_data=None):
+ """Download an SRPM from the build server. Only used by BuilderMock
+ objects."""
+ downloader = FileDownloader.FileDownloader(url, target_dir, ['.src.rpm'],
+ self._certs)
+ downloader.set_callback(dl_callback, url)
+ downloader.start()
+ return downloader
+
+ def upload_files(self, files, ul_callback, cb_data=None):
+ server = self._cfg.get_str("Active", "server")
+ (urltype, urlrest) = urllib.splittype(server)
+ (server, urlrest) = urllib.splithost(urlrest)
+ if self._cfg.get_bool("SSL", "use_ssl"):
+ url = "https://" + server
+ else:
+ url = "http://" + server
+ url = url + ":%d/upload" % self._cfg.get_int("Active", "fileserver_port")
+ uploader = FileUploader.FileUploader(url, files, 'filedata', None,
+ self._certs)
+ uploader.set_callback(ul_callback, cb_data)
+ uploader.start()
+ return uploader
+
def _get_default_commands(self):
"""Return a python list of serialized commands that the builder
sends to the server every time it contacts the server."""
@@ -427,8 +459,8 @@
# always send free & max slots
next_seq = self._seq_gen.next()
- (free, max) = self.slots()
- cmd = Commands.PlgCommandSlots(free, max, next_seq)
+ (free_slots, max_slots) = self.slots()
+ cmd = Commands.PlgCommandSlots(free_slots, max_slots, next_seq)
defcmds.append(cmd)
defcmds.append(self._get_building_jobs_cmd())
@@ -455,7 +487,7 @@
time.sleep(0.5)
if req.done and not req.failed:
- self.queued_cmds = []
+ self._queued_cmds = []
return req.response
return None
@@ -474,7 +506,7 @@
self._queued_cmds.append(reply)
def _process_server_response(self, response):
- """Process the server's response."""
+ """Process the server's response command stream."""
if not response:
# Something went wrong...
Index: BuilderMock.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/builder/BuilderMock.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- BuilderMock.py 3 May 2006 04:04:27 -0000 1.2
+++ BuilderMock.py 9 May 2006 02:57:11 -0000 1.3
@@ -20,25 +20,20 @@
import os
-import socket
import shutil
import string
import fcntl
import urllib
import errno
import exceptions
-import threading
import sha
import time
from plague import ExecUtils
from plague import FileDownloader
+from plague import FileTransfer
-def log(string):
- sys.stdout.write(string)
- sys.stdout.flush()
-
def get_url_for_file(cfg, file_path):
""" Return a URL pointing to a particular file in our work dir """
@@ -57,9 +52,9 @@
return urllib.quote(full_url)
def _generate_uniqid(target_str, srpm_url):
- sum = sha.new()
- sum.update('%d %s %s' % (time.time(), target_str, srpm_url))
- return sum.hexdigest()
+ sha_hash = sha.new()
+ sha_hash.update('%d %s %s' % (time.time(), target_str, srpm_url))
+ return sha_hash.hexdigest()
class BuilderMock(threading.Thread):
@@ -68,7 +63,7 @@
def __init__(self, controller, target_cfg, buildarch, srpm_url):
self._controller = controller
- self.buildarch = buildarch
+ self._buildarch = buildarch
self._starttime = time.time()
self._endtime = 0
self._mockstarttime = 0
@@ -77,7 +72,7 @@
self._die = False
self._repo_locked = True
self._repo_wait_start = 0
- self._files = []
+ self._files = {}
self._childpid = 0
self._target_cfg = target_cfg
self._builder_cfg = target_cfg.parent_cfg()
@@ -87,7 +82,10 @@
self._mock_config = None
self._done_status = ''
self._mock_log = None
- self.buildroot = self._target_cfg.mock_config()
+ self._buildroot = self._target_cfg.mock_config()
+ self._downloader = None
+ self._uploader = None
+ self.arch_command = ""
self._work_dir = self._builder_cfg.get_str("Directories", "builder_work_dir")
self._result_dir = os.path.join(self._work_dir, self._uniqid, "result")
@@ -120,6 +118,11 @@
self._done_status = 'killed'
self._log("Killing build process...\n")
+ if self._downloader:
+ self._downloader.cancel()
+ if self._uploader:
+ self._uploader.cancel()
+
# Don't try to kill a running cleanup process
if self._status != 'cleanup':
# Kill a running non-cleanup mock process, if any
@@ -145,36 +148,16 @@
self._log("Killed.\n");
- def _log(self, string):
- if string and self._log_fd:
- self._log_fd.write(string)
+ def _log(self, msg):
+ if msg and self._log_fd:
+ self._log_fd.write(msg)
self._log_fd.flush()
os.fsync(self._log_fd.fileno())
if self._builder_cfg.get_bool("General", "debug"):
s = "%s: " % self._uniqid
- sys.stdout.write(s + string)
+ sys.stdout.write(s + msg)
sys.stdout.flush()
- def dl_callback(self, dl_status, cb_data, err_msg):
- url = cb_data
- if dl_status == 'done':
- self._status = 'downloaded'
- self._log("Retrieved %s.\n" % url)
- elif dl_status == 'failed':
- # If job was cancelled, just return
- if self.is_done_status():
- return
-
- # Retry up to 5 times
- self._srpm_tries = self._srpm_tries + 1
- if self._srpm_tries >= 5:
- self._status = 'failed'
- self._log("ERROR: Failed to retrieve %s.\n" % url)
- else:
- # retry the download
- self._status = 'init'
- self._log("ERROR: Failed to retrieve %s on attempt %d (%s). Trying again...\n" % (url, self._srpm_tries, err_msg))
-
def _copy_mock_output_to_log(self):
if self._mock_log and os.path.exists(self._mock_log):
ml = open(self._mock_log, "r")
@@ -205,9 +188,9 @@
cmd = os.path.abspath(arg_list[0])
args.append(builder_cmd)
args.append("-r")
- args.append(self.buildroot)
+ args.append(self._buildroot)
args.append("--arch")
- args.append(self.buildarch)
+ args.append(self._buildarch)
args.append("--resultdir=%s" % self._result_dir)
args.append("--statedir=%s" % self._state_dir)
args.append("--uniqueext=%s" % self._uniqid)
@@ -234,7 +217,7 @@
args.append("clean")
args.append("--uniqueext=%s" % self._uniqid)
args.append("-r")
- args.append(self.buildroot)
+ args.append(self._buildroot)
self._log(" %s\n" % string.join(args))
self._childpid = ExecUtils.exec_with_redirect(cmd, args, None, None, None)
@@ -279,7 +262,7 @@
while True:
try:
f.seek(0, 0)
- string = f.read(4)
+ mockstat = f.read(4)
except OSError, e:
if e.errno == errno.EAGAIN:
try:
@@ -288,12 +271,11 @@
pass
continue
else:
- if len(string) < 4:
+ if len(mockstat) < 4:
continue
break
f.close()
- string = string.lower()
- return string
+ return mockstat.lower()
def _read_mock_config(self):
mockconfigfile = os.path.join(self._result_dir, 'mockconfig.log')
@@ -310,17 +292,32 @@
f.close()
return contents
+ def dl_callback(self, dl_status, cb_data, err_msg=None):
+ url = cb_data
+ if dl_status == FileTransfer.FT_RESULT_SUCCESS:
+ self._status = 'downloaded'
+ self._log("Retrieved %s.\n" % url)
+ elif dl_status == FileTransfer.FT_RESULT_FAILED:
+ # If job was cancelled, just return
+ if self.is_done_status():
+ return
+ self._status = 'failed'
+ self._log("ERROR: Failed to retrieve %s.\n" % url)
+ elif dl_status == FileTransfer.FT_RESULT_CANCELED:
+ # Ignore cancelation
+ pass
+ self._downloader = None
+
def _status_init(self):
self._log("Starting download of %s.\n" % self._srpm_url)
self._status = 'downloading'
target_dir = os.path.dirname(self._srpm_path)
try:
- dl_thread = FileDownloader.FileDownloader(self.dl_callback, self._srpm_url, self._srpm_url,
- target_dir, ['.src.rpm'], certs)
- dl_thread.start()
+ self._downloader = self._controller.download_srpm(self._srpm_url,
+ target_dir, self.dl_callback, self._srpm_url)
except FileDownloader.FileNameException, e:
self._status = 'failed'
- self._log("ERROR: Failed to begin SRPM download. Error: '%s' URL: %s\n" % (e, self._srpm_url))
+ self._log("ERROR: Failed to begin SRPM download. Error: '%s' URL: %s\n" % (e, self._srpm_url))
def _status_downloading(self):
pass
@@ -360,7 +357,7 @@
# something is wrong
self._watch_mock('failed', 'failed')
if self._status != 'prepping':
- return
+ return
# We need to make sure that mock has dumped the status file withing a certain
# amount of time, otherwise we can't tell what it's doing
@@ -400,7 +397,7 @@
if source_dir.endswith(os.path.join(self._uniqid, "source")):
shutil.rmtree(source_dir, ignore_errors=True)
- # Ensure child process is reaped, if any
+ # Ensure child process is reaped if it's still around
if self._childpid:
try:
self._log("Waiting for child process %d to exit.\n" % self._childpid)
@@ -410,9 +407,21 @@
pass
self._copy_mock_output_to_log()
-
self._files = self._find_files()
+ self._uploader = self._controller.upload_files(self._files, self.ul_callback, None)
+ self._status = "uploading"
+
+ def _ul_callback(self, status, cb_data, msg):
+ if status == FileTransfer.FT_RESULT_SUCCESS:
+ pass
+ elif status == FileTransfer.FT_RESULT_FAILED:
+ self._done_status = 'failed'
+ self._log("Job failed because files could not be uploaded: %s" % msg)
self._status = self._done_status
+ self._uploader = None
+
+ def _status_uploading(self):
+ pass
def _job_done(self):
self._log("-----------------------\n")
@@ -437,7 +446,7 @@
Target: %s
UID: %s
Architecture: %s
- SRPM: %s\n\n""" % (time.asctime(time.localtime(self._starttime)), target_str, self._uniqid, self.buildarch, self._srpm_url))
+ SRPM: %s\n\n""" % (time.asctime(time.localtime(self._starttime)), target_str, self._uniqid, self._buildarch, self._srpm_url))
try:
srpm_filename = FileDownloader.get_base_filename_from_url(self._srpm_url, ['.src.rpm'])
@@ -474,13 +483,24 @@
self._log("\n")
self._log("Output File List:\n")
self._log("-----------------\n")
- for f in files_in_dir:
- file_url = get_url_for_file(self._builder_cfg, os.path.join(self._result_dir, f))
- if file_url:
- file_list.append(file_url)
- self._log(" Output File: %s\n" % urllib.unquote(file_url))
+ log_files = []
+ rpms = []
+ # sort into logs first, rpms later
+ for fname in files_in_dir:
+ fpath = os.path.join(self._result_dir, fname)
+ if fpath.endswith(".log"):
+ log_files.append(fpath)
else:
- self._log(" Error: Couldn't get file URL for file %s\n" % f)
+ rpms.append(fpath)
+
+ # Dump file list to log
+ file_list = log_files + rpms
+ i = 1
+ num_files = len(file_list)
+ for fpath in file_list:
+ self._log(" File (%d of %d): %s\n" % (i, num_files,
+ os.path.basename(fpath)))
+ i = i + 1
self._log("-----------------\n")
return file_list
More information about the fedora-extras-commits
mailing list