extras-buildsys/server ArchJob.py, 1.8, 1.9 Builder.py, 1.9, 1.10 BuilderManager.py, 1.9, 1.10 PackageJob.py, 1.18, 1.19 UserInterface.py, 1.42, 1.43
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Sat Aug 6 02:35:09 UTC 2005
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv11669/server
Modified Files:
ArchJob.py Builder.py BuilderManager.py PackageJob.py
UserInterface.py
Log Message:
2005-08-05 Dan Williams <dcbw at redhat.com>
* Rework builder tracking code to always keep Builder objects around, and
to mark them as active/unavailable rather than dropping unavailable
builders. Unavailable builders are pinged every 5 minutes to see if
they are alive or not. Admins can still manually ping builders.
* Remove lots of locking code in between the Builders, ArchJobs, and the
BuildMaster since it was only relevant when pyOpenSSL still sucked.
Communication with the builder's XMLRPC server only happens from each
Builder object's thread now.
* Consolidate job-killing code in both the ArchJob and the PackageJob
Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ArchJob.py 25 Jul 2005 19:24:24 -0000 1.8
+++ ArchJob.py 6 Aug 2005 02:35:07 -0000 1.9
@@ -52,6 +52,8 @@
self.downloads = {}
self.starttime = time.time()
self.endtime = 0
+ self._die = False
+ self._die_lock = threading.Lock()
def _builder_finished(self):
if self.builder_status == 'done' or self.builder_status == 'killed' or self.builder_status == 'failed' or self.builder_status == 'orphaned':
@@ -109,7 +111,6 @@
del attrdict
def _send_repo_unlocked(self):
- self.builder.xmlrpc_lock_acquire()
try:
self._server.repo_unlocked(self.jobid)
except socket.error, e:
@@ -118,11 +119,9 @@
self.par_job.package, self.arch, self.bci.address(), e)
except xmlrpclib.ProtocolError, e:
pass
- self.builder.xmlrpc_lock_release()
def _dl_files(self):
files = []
- self.builder.xmlrpc_lock_acquire()
try:
files = self._server.files(self.jobid)
except socket.error, e:
@@ -131,13 +130,22 @@
self.par_job.package, self.arch, self.bci.address(), e)
except xmlrpclib.ProtocolError, e:
pass
- self.builder.xmlrpc_lock_release()
return files
def process(self):
if self.status == 'done':
return
- elif self.status == 'running':
+
+ # If we're supposed to die, tell the builder and clean up
+ self._die_lock.acquire()
+ should_die = self._die
+ self._die_lock.release()
+ if should_die:
+ self._server.die(self.jobid)
+ self._set_status('done')
+ return
+
+ if self.status == 'running':
# Builders pause before they enter the 'prep' state (which accesses
# the repo for this target), and wait for the server to allow them
# to proceed when the repo is unlocked.
@@ -247,10 +255,10 @@
self.par_job.remove_arch_job(self)
def die(self):
+ # Can be called from other threads
if self.status == 'initialize' or self.status == 'running':
- self.builder.xmlrpc_lock_acquire()
- self._server.die(self.jobid)
- self.builder.xmlrpc_lock_release()
- self._set_status('done')
+ self._die_lock.acquire()
+ self._die = True
+ self._die_lock.release()
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- Builder.py 2 Aug 2005 00:58:16 -0000 1.9
+++ Builder.py 6 Aug 2005 02:35:07 -0000 1.10
@@ -25,6 +25,7 @@
from plague import CommonErrors
import OpenSSL
import ArchJob
+import EmailUtils
# Load in the config
execfile("/etc/plague/server/CONFIG.py")
@@ -37,57 +38,58 @@
certs['peer_ca_cert'] = config_opts['ca_cert']
-# Python's xmlrpclib & httplib are a bit dodgy with
-# threads, so lets lock xmlrpc operations
-builder_xmlrpc_lock = threading.Lock()
-
-
class Builder(threading.Thread):
""" Tracks all jobs on a builder instance """
+ _BUILDER_PING_INTERVAL = 60 * 5 # In seconds
+
def __init__(self, manager, address):
self._cur_jobid = None
self._manager = manager
self._jobs = {}
self._address = address
+ self._alive = True
+ self._stop = False
+ self._prepping_jobs = False
+ self._unavail_count = 0
+ self._target_arches = {}
+ self._ping_timeout = 0
+ self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+ self._when_died = 0
+
if config_opts['ssl_builders']:
self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs)
else:
self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, None)
self._server_lock = threading.Lock()
- self._unavail_count = 0
- self._target_arches = []
- builder_xmlrpc_lock.acquire()
- try:
- self._target_arches = self._server.supported_target_arches()
- for target in self._target_arches.keys():
- self._target_arches[target].append('noarch')
- except socket.error:
- builder_xmlrpc_lock.release()
- raise RuntimeError
- builder_xmlrpc_lock.release()
- self._alive = True
- self._stop = False
- self._prepping_jobs = False
+ (self._alive, arches) = self._ping_builder()
+ if self._alive:
+ self._init_builder(arches)
+
+ threading.Thread.__init__(self)
+
+ def _init_builder(self, arches):
+ self._target_arches = arches
+ for target in self._target_arches.keys():
+ self._target_arches[target].append('noarch')
# Kill any jobs currently running on the builder
(jobid, status) = self._get_cur_job_and_status()
if jobid and jobid != 0:
- builder_xmlrpc_lock.acquire()
try:
self._server.die(jobid)
except:
pass
- builder_xmlrpc_lock.release()
-
- threading.Thread.__init__(self)
- def xmlrpc_lock_acquire(self):
- builder_xmlrpc_lock.acquire()
-
- def xmlrpc_lock_release(self):
- builder_xmlrpc_lock.release()
+ def _ping_builder(self):
+ target_arches = {}
+ alive = True
+ try:
+ target_arches = self._server.supported_target_arches()
+ except socket.error:
+ alive = False
+ return (alive, target_arches)
def arches(self, target):
arches = None
@@ -107,34 +109,34 @@
return self._address
def alive(self):
- """
- Is the builder responding to requests?
- """
+ """ Is the builder responding to requests? """
return self._alive
def start_job(self, par_job, target, arch, srpm_url):
+ self._server_lock.acquire()
if not self.available():
+ self._server_lock.release()
raise RuntimeError
if not self._target_arches.has_key(target) or len(self._target_arches[target]) == 0:
+ self._server_lock.release()
raise RuntimeError
if not arch in self._target_arches[target]:
+ self._server_lock.release()
raise RuntimeError
- builder_xmlrpc_lock.acquire()
- self._server_lock.acquire()
try:
jobid = self._server.start(target, arch, srpm_url)
except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, xmlrpclib.ProtocolError):
jobid = 0
- self._server_lock.release()
- builder_xmlrpc_lock.release()
if jobid == 0:
+ self._server_lock.release()
raise RuntimeError
job = ArchJob.ArchJob(self, self._server, par_job, jobid, target, arch)
self._jobs[jobid] = job
self._update_cur_job()
+ self._server_lock.release()
return job
@@ -142,16 +144,12 @@
jobid = None
status = None
- builder_xmlrpc_lock.acquire()
- self._server_lock.acquire()
try:
(jobid, status) = self._server.get_cur_job()
except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, xmlrpclib.ProtocolError):
self._unavail_count = self._unavail_count + 1
else:
self._unavail_count = 0
- self._server_lock.release()
- builder_xmlrpc_lock.release()
return (jobid, status)
@@ -175,24 +173,76 @@
def stop(self):
self._stop = True
+ def ping_asap(self):
+ # Reduce the ping interval to ping the builder right away
+ self._cur_ping_interval = 0
+
+ def _handle_builder_suspend(self):
+ for jobid in self._jobs.keys():
+ job = self._jobs[jobid]
+ job.builder_gone()
+ del self._jobs[jobid]
+ self._jobs = {}
+ self._alive = False
+ self._unavail_count = 0
+ self._prepping_jobs = False
+ self._ping_timeout = time.time()
+ self._when_died = time.time()
+ # Reset current ping interval to default
+ self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+
+ # Notify admins
+ print "Suspending builder '%s' because it timed out." % self._address
+ subject = "Builder Timeout: %s" % self._address
+ msg = "The builder '%s' timed out and was suspended." % self._address
+ for addr in config_opts['admin_emails']:
+ EmailUtils.email_result(addr, msg, subject)
+
+ def _handle_builder_reactivate(self, target_arches):
+ self._alive = True
+ self._ping_timeout = 0
+
+ self._init_builder(target_arches)
+
+ print "Re-activating builder '%s'." % self._address
+ subject = "Builder Re-activated: %s" % self._address
+ msg = """The builder '%s' was re-activated.
+
+ Suspended at: %s
+ Re-Enabled at: %s
+""" % (self._address, time.ctime(self._when_died), time.ctime(time.time()))
+ for addr in config_opts['admin_emails']:
+ EmailUtils.email_result(addr, msg, subject)
+ self._when_died = 0
+
def run(self):
while not self._stop:
- self._update_cur_job()
+ self._server_lock.acquire()
- # Kill all jobs on the client if it went away
- if self._unavail_count > 2:
- for jobid in self._jobs.keys():
- job = self._jobs[jobid]
- job.builder_gone()
- del self._jobs[jobid]
- self._alive = False
- self._stop = True
- self._manager.builder_gone()
- continue
-
- # Update status of all jobs
- for j in self._jobs.values():
- j.process()
+ if self._alive:
+ self._update_cur_job()
+
+ if self._unavail_count > 2:
+ # Kill all jobs on the client if it went away
+ self._handle_builder_suspend()
+ else:
+ # Update status of all archjobs on this builder
+ for j in self._jobs.values():
+ j.process()
+ else:
+ # Ping the builder every so often to see if it responds again
+ if time.time() > (self._ping_timeout + self._cur_ping_interval):
+ (alive, target_arches) = self._ping_builder()
+ if alive:
+ self._handle_builder_reactivate(target_arches)
+ else:
+ # Wait and ping again
+ self._ping_timeout = time.time()
+
+ # Reset current ping interval to default
+ self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+
+ self._server_lock.release()
time.sleep(3)
@@ -218,9 +268,12 @@
if not arch in arches:
arches.append(arch)
builder_dict['arches'] = arches
- if self._cur_jobid:
- builder_dict['status'] = 'building'
+ if self._alive:
+ if self._cur_jobid:
+ builder_dict['status'] = 'building'
+ else:
+ builder_dict['status'] = 'idle'
else:
- builder_dict['status'] = 'idle'
+ builder_dict['status'] = 'unavailable'
return builder_dict
Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- BuilderManager.py 29 Jul 2005 06:05:37 -0000 1.9
+++ BuilderManager.py 6 Aug 2005 02:35:07 -0000 1.10
@@ -40,17 +40,22 @@
self._builders_lock.acquire()
self.possible_builders = config_opts['builders']
self._builders_lock.release()
- self.running_builders = []
- builder_list = self.update_builders()
+
+ self._builders = []
+ self.add_new_builders()
# Print out builder list when starting up
print "\nBuilders:"
- print "-" * 60
- for builder in builder_list:
- string = " " + builder['address']
- string = string + " " * (40 - len(builder['address']))
- for arch in builder['arches']:
+ print "-" * 90
+ for builder in self._builders:
+ string = " " + builder.address()
+ string = string + " " * (40 - len(builder.address()))
+ builder_dict = builder.to_dict()
+ for arch in builder_dict['arches']:
string = string + arch + " "
+ string = string + " " * (80 - len(string))
+ string = string + builder_dict['status']
+ del builder_dict
print string
print ""
@@ -60,23 +65,20 @@
self._have_work = False
def __del__(self):
- for builder in self.running_builders:
+ for builder in self._builders:
builder.stop()
time.sleep(2)
- for builder in self.running_builders:
- del builder
def set_build_master(self, build_master):
self._build_master = build_master
- def update_builders(self):
+ def add_new_builders(self):
self._builders_lock.acquire()
# Load in any new builders from the config file
execfile("/etc/plague/server/CONFIG.py")
self.possible_builders = config_opts['builders']
- builder_list = []
for address in self.possible_builders:
# If the address is "https" but we aren't set up for SSL, exit
if address.startswith('https') and not config_opts['ssl_builders']:
@@ -86,79 +88,46 @@
print "Builder address (%s) starts with 'http', but the 'ssl_builders' option is set to True." % address
os._exit(1)
- # If the address is already in our running_builders list, skip it
+ # If the address is already in our _builders list, skip it
skip = False
- for builder in self.running_builders:
+ for builder in self._builders:
if address == builder.address():
skip = True
if skip == True:
continue
- # Try to connect to builder and add it to our builder
- # list if we can
- try:
- builder = Builder.Builder(self, address)
- except RuntimeError:
- pass
- else:
- builder_list.append(builder.to_dict())
- builder.start()
- self.running_builders.append(builder)
+ # Add the builder to our build list
+ builder = Builder.Builder(self, address)
+ builder.start()
+ self._builders.append(builder)
self._builders_lock.release()
- return builder_list
+
+ def ping_suspended_builders(self):
+ self._builders_lock.acquire()
+ for builder in self._builders:
+ if not builder.alive():
+ builder.ping_asap()
+ self._builders_lock.release()
def list_builders(self):
builder_list = []
- for builder in self.running_builders:
+ for builder in self._builders:
builder_list.append(builder.to_dict())
-
- # Add unavailable builders
- for builder in self.possible_builders:
- found = False
- for tmp in builder_list:
- if builder == tmp['address']:
- found = True
- if found:
- continue
- builder_dict = {}
- builder_dict['address'] = builder
- builder_dict['arches'] = []
- builder_dict['status'] = "unavailable"
- builder_list.append(builder_dict)
-
return builder_list
def have_work(self, paused):
avail = False
- for builder in self.running_builders:
+ for builder in self._builders:
if builder.available():
avail = True
if not paused and len(self._queue) > 0 and avail:
return True
return self._have_work
- def builder_gone(self):
- self._have_work = True
-
def process(self, paused):
self._have_work = False
- # Deal with dead/unreachable builders
- for builder in self.running_builders:
- if not builder.alive():
- print "Removing builder '%s' because it timed out." % builder.address()
-
- # Notify admins
- subject = "Builder Timeout: %s" % builder.address()
- msg = "The builder '%s' timed out and was removed from the active builder list." % builder.address()
- for addr in config_opts['admin_emails']:
- EmailUtils.email_result(addr, msg, subject)
-
- # Forget about the builder
- builder.stop()
- self.running_builders.remove(builder)
-
# Don't queue any new jobs if we are paused
if paused:
return
@@ -173,7 +142,7 @@
self._queue.remove(req)
continue
# Find a free builder for this request
- for builder in self.running_builders:
+ for builder in self._builders:
if builder.available() and builder.can_build_arch_on_target(req['arch'], req['target']):
try:
job = builder.start_job(parent, req['target'], req['arch'], req['srpm_url'])
@@ -213,7 +182,7 @@
def any_prepping_builders(self):
# query each build builder for any jobs that are in the 'prepping' state
- for builder in self.running_builders:
+ for builder in self._builders:
if builder.alive() and builder.any_prepping_jobs():
return True
return False
Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- PackageJob.py 4 Aug 2005 20:27:12 -0000 1.18
+++ PackageJob.py 6 Aug 2005 02:35:07 -0000 1.19
@@ -137,6 +137,8 @@
self.archjobs = {}
self._archjobs_lock = threading.Lock()
self._event = threading.Event()
+ self._killer = None
+ self._die = False
first_stage = 'initialize'
if self.no_cvs == True:
@@ -449,13 +451,27 @@
return False
def die(self, username):
- # Kill any building jobs
- resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self.target, username)
+ self._killer = username
+ self._die = True
+ self.wake()
+ def _handle_death(self):
+ resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self.target, self._killer)
self.result = 'killed'
self._set_cur_stage('finished', resultstring)
self.email_result(self.username, resultstring)
+ # Kill any building jobs
+ self._kill_all_archjobs()
+
+ # Wake us up if the Controller thread is still running
+ if not self._event.isSet():
+ self._event.set()
+
+ self.endtime = time.time()
+ self.bm.notify_job_done(self)
+
+ def _kill_all_archjobs(self):
self._archjobs_lock.acquire()
for job in self.archjobs.values():
if job:
@@ -463,9 +479,6 @@
self.archjobs = {}
self._archjobs_lock.release()
- self.endtime = time.time()
- self.bm.notify_job_done(self)
-
def wake(self):
self._event.set()
@@ -473,6 +486,10 @@
if self.is_done():
return
+ if self._die:
+ self._handle_death()
+ return
+
try:
func = getattr(self, "_stage_%s" % self.curstage)
if func():
@@ -494,11 +511,7 @@
msg = "%s\n-------------------------------------------------\n\n%s\n" % (msg, logtail)
self.email_result(self.username, resultstring=msg, subject=subj)
# Kill remaining jobs on other arches
- self._archjobs_lock.acquire()
- for job in self.archjobs.values():
- if job:
- job.die()
- self._archjobs_lock.release()
+ self._kill_all_archjobs()
self._stage_failed(e.msg)
def _stage_building(self):
Index: UserInterface.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/UserInterface.py,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -r1.42 -r1.43
--- UserInterface.py 5 Aug 2005 16:24:13 -0000 1.42
+++ UserInterface.py 6 Aug 2005 02:35:07 -0000 1.43
@@ -452,8 +452,10 @@
def update_builders(self):
- execfile("/etc/plague/server/CONFIG.py")
- builder_list = self._builder_manager.update_builders()
+ self._builder_manager.add_new_builders()
+ self._builder_manager.ping_suspended_builders()
+ time.sleep(2)
+ builder_list = self._builder_manager.list_builders()
return (0, "Success.", builder_list)
More information about the fedora-extras-commits
mailing list