extras-buildsys-temp/automation2/server aw_manager.py, NONE, 1.1 bm_server.py, NONE, 1.1 bm_server_config.py, NONE, 1.1 buildjob.py, NONE, 1.1 buildmaster.py, NONE, 1.1

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Wed May 11 19:49:00 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys-temp/automation2/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv488/automation2/server

Added Files:
	aw_manager.py bm_server.py bm_server_config.py buildjob.py 
	buildmaster.py 
Log Message:
2005-05-11  Dan Williams  <dcbw at redhat.com>

	* First cut of reworked build system.  Please see the soon-to-be
	committed README file for more details on how it works.




--- NEW FILE aw_manager.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# copyright 2005 Duke University
# written by Seth Vidal

import time
import string
import xmlrpclib
from bm_server_config import CONFIG
import base64

class ArchWelderJob:
    """ Tracks a single build instance for a single arch on an ArchWelder """

    def __init__(self, awi, server, srpm, target, mydir, arch):
        self.awi = awi
        self.jobid = None
        self.status = None
        self.srpm = srpm
        self.target = target
        self.mydir = mydir
        self.arch = arch
        self._server = server
        self.starttime = None

    def start(self):
        try:
            self.jobid = self._server.start(self.srpm, self.target, self.mydir, self.arch)
        except Exception, e:
            print "Error starting job on host %s\n\t---error: %s" % (self.awi.address(), e)
            return False
        else:
            self.starttime = time.time()
            self.update_status()
            return True

    def update_status(self):
        if self.status == 'failed' or self.status == 'done':
            return

        status = ''
        try:
            status = self._server.status(self.jobid)
        except Exception, e:
            print "XMLAW: got error '%s' from AW during update_status()" % e
        self.status = status
        return self.status

    def die(self):
        self._server.die(self.jobid)
        self.status = 'killed'

    def logs(self):
        result = []
        enclogs = self._server.logs(self.jobid)
        for line in enclogs:
            txt = base64.decodestring(line)
            result.append(txt)
        return result

class ArchWelderInstance:
    """ Tracks an single arch on an ArchWelder """

    def __init__(self, address, arch):
        self._jobs = []
        self._arch = arch
        self._address = address
        self._server = xmlrpclib.Server(self._address)
        self._cur_job = self._get_cur_job()

    def arch(self):
        return self._arch

    def address(self):
        return self._address
    
    def new_job(self, srpm, target, mydir):
        return ArchWelderJob(self, self._server, srpm, target, mydir, self._arch)

    def track_job(self, job):
        self._jobs.append(job)

    def _get_cur_job(self):
        cur_job = None
        try:
            cur_job = self._server.get_cur_job()
        except Exception, e:
            print "XMLAW: got error '%s' from AW during get_cur_job()" % e
        else:
            if cur_job == 0:
                cur_job = None
        return cur_job

    def process(self):
        self._cur_job = self._get_cur_job()

        # Update status of all jobs
        for j in self._jobs:
            j.update_status()
    
    def available(self):
        if self._cur_job:
            return False
        else:
            return True

class ArchWelderManager:
    def __init__(self):
        # List of addresses of possible builders
        self.possible_aw = CONFIG('builders')

        # Dict:  arches => available builders
        # Like so:  [ 'i386':['10.0.0.1', '10.0.0.2'],
        #             'x86_64':['10.0.0.3']
        #           ]
        self.running_aw = {}

        # Figure out which archwelders are alive, and what they support
        # We create a separate archwelder instance for each arch on each builder,
        # even though both instances talk to the same XMLRPC server on the builder
        print "-----------------------------------------------------"
        print " Looking or ArchWelders..."
        for address in self.possible_aw:
            arches = None
            server = xmlrpclib.Server(address)
            try:
                arches = server.supported_arches()
            except Exception, e:
                pass
            if arches:
                arches.append('noarch')
                print "   Found ArchWelder '%s' supporting arches %s." % (address, string.join(arches))
                for a in arches:
                    if not self.running_aw.has_key(a):
                        self.running_aw[a] = []
                    awi = ArchWelderInstance(address, a)
                    self.running_aw[a].append(awi)
            else:
                self.possible_aw.remove(address)
            del server
        print "-----------------------------------------------------\n"

    def process(self):
        for awi_list in self.running_aw.values():
            for awi in awi_list:
                awi.process()

    def track_job(self, job):
        if job:
            awi = job.awi.track_job(job)

    def new_job_on_arch(self, arch, srpm, target, mydir):
        """ Create a job on a free builder for this arch """

        if self.running_aw.has_key(arch):
            for aw in self.running_aw[arch]:
                if aw.available():
                    return aw.new_job(srpm, target, mydir)
        return None



--- NEW FILE bm_server.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# copyright 2005 Duke University
# written by Seth Vidal


import time
from bm_server_config import CONFIG
import socket
import SimpleXMLRPCServer
import xmlrpclib
from buildjob import BuildJob
from buildjob import PrepError
import sqlite
import smtplib
from email.MIMEText import MIMEText
import threading
from buildmaster import BuildMaster
from buildmaster import ensure_build_db_tables
from aw_manager import ArchWelderManager

def email_result(username, cvs_tag, resultstring, subject=None):
    """send 'resultstring' to username"""
    
    msg = MIMEText(resultstring)
    if not subject:
        subject = 'Build Result: %s' % cvs_tag
    msg['Subject'] = subject
    msg['From'] = CONFIG('email_from')
    email_to = '%s@%s' % (username, CONFIG('email_to_domain'))
    msg['To'] = email_to
    s = smtplib.SMTP()
    s.connect()
    s.sendmail(CONFIG('email_from'), [email_to], msg.as_string())
    s.close()


class XMLRPCBuildMaster:
    def __init__(self, arch_welder_manager):
        self.awm = arch_welder_manager
        self.dbcx = sqlite.connect("buildmaster_db", encoding="utf-8",
                                        timeout=2)
        self.curs = self.dbcx.cursor()
        ensure_build_db_tables(self.dbcx)

    def __del__(self):
        self.dbcx.close()

    def enqueue(self, username, package, cvs_tag, target, buildreq=None):
        """ Accept a job to build and stuff it into the job database """

        print "Request to enqueue '%s' tag '%s' for target '%s' (user '%s')" \
                % (package, cvs_tag, target, username)
        targets = CONFIG('targets')
        if not targets.has_key(target):
            print "Error setting up build for %s on %s: target does not exist."\
                    % (cvs_tag, target)
            email_result(username, cvs_tag, "Error setting up build for %s on "\
                    "%s: target does not exist." % (cvs_tag, target))
            return -1
        else:
            # Insert request into the database
            self.curs.execute('INSERT INTO jobs (uid, username, package,' \
                    ' cvs_tag, target, buildreq, time_submitted, status)' \
                    ' VALUES (NULL, "%s", "%s", "%s", "%s", "%s", %d, "%s")' \
                    % (username, package, cvs_tag, target, buildreq,        \
                    time.time(), 'waiting'))
            self.dbcx.commit()
        return 0

    def list_waiting_jobs(self):
        self.curs.execute('SELECT uid, username, package, cvs_tag, target' \
                ' FROM jobs WHERE status="waiting"')
        self.dbcx.commit()
        data = self.curs.fetchall()
        job_list = []
        for row in data:
            tempX = [ item for item in row]
            job_list.append(tempX)
        return job_list

    def list_building_jobs(self):
        self.curs.execute('SELECT uid, username, package, cvs_tag, target' \
                ' FROM jobs WHERE status="building"')
        self.dbcx.commit()
        data = self.curs.fetchall()
        job_list = []
        for row in data:
            tempX = [ item for item in row]
            job_list.append(tempX)
        return job_list


class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
    """ XMLRPC server subclass that turns on SO_REUSEADDR """

    def __init__(self, address):
        SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, addr=address, logRequests=False)

    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        self.socket.bind(self.server_address)


if __name__ == '__main__':
    # Create ArchWelderManager
    awm = ArchWelderManager()

    # Create the BuildMaster thread
    bm = BuildMaster(awm)
    bm.start()

    # Create the BuildMaster XMLRPC server
    xmlrpc_bm = XMLRPCBuildMaster(awm)
    bm_server = MyXMLRPCServer((CONFIG('hostname'), 8887))
    bm_server.register_instance(xmlrpc_bm)
    print "BuildMaster accepting requests on %s:8887.\n" % CONFIG('hostname')
    try:
        bm_server.serve_forever()
    except Exception:
        # Make sure the BuildMaster thread shuts down
        print "Shutting down..."
        bm.stop()



--- NEW FILE bm_server_config.py ---
# Configuration file for buildmaster.py

config_opts = {}
config_opts['hostname'] = "localhost"
config_opts['email_to_domain'] = "redhat.com"
config_opts['email_from'] = "buildsys at fedoraproject.org"
config_opts['stages_root'] = "/rpmbuild/extras/stages"
config_opts['pkg_cvs_root'] = ":gserver:cvs.devel.redhat.com:/cvs/dist"
config_opts['pkg_cvs_rsh'] = "/usr/kerberos/bin/krsh"
config_opts['cvs_cmd'] = "/usr/bin/cvs"
config_opts['make_cmd'] = "/usr/bin/make"
config_opts['tmpdir'] = "/tmp"
config_opts['redhat_internal_cvs'] = 1
config_opts['log_url'] = "http://foo.foo.org/logs/"

config_opts['targets'] = {  'FC-3' : ['i386', 'x86_64'],
                            'devel' : ['i386']
                         }

config_opts['builders'] = [ 'http://127.0.0.1:8888' ]

def CONFIG (key):
    if config_opts.has_key(key):
        return config_opts[key]
    else:
        print "Bad request for key '%s'" % (key)
        exit (1)


--- NEW FILE buildjob.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# copyright 2005 Duke University
# written by Seth Vidal


import os
import os.path
import sys
import commands
import time
import popen2
import rpmUtils
import exceptions
import shutil
import tempfile
import smtplib
from email.MIMEText import MIMEText
import string
import SimpleXMLRPCServer
import xmlrpclib
from bm_server_config import CONFIG
import socket
from aw_manager import ArchWelderManager
from aw_manager import ArchWelderJob

os.environ['CVSROOT'] = CONFIG('pkg_cvs_root')
os.environ['CVS_RSH'] = CONFIG('pkg_cvs_rsh')

DEBUG = True
def debugprint(stuff=''):
    if DEBUG:
        print stuff

def log(stuff=''):
    print stuff

class PrepError(exceptions.Exception):
    def __init__(self, errno=0, args=None):
        exceptions.Exception.__init__(self)
        self.args = args
        self.errno = errno
    def __str__(self):
        return self.args
        
class BuildError(exceptions.Exception):
    def __init__(self, errno=0, args=None):
        exceptions.Exception.__init__(self)
        self.args = args
        self.errno = errno
    def __str__(self):
        return self.args

        

class BuildJob:
    """ Controller object for building 1 SRPM on multiple arches """

    def __init__(self, uid, username, package, cvs_tag, target, arch_welder_manager):
        self.awm = arch_welder_manager
        self.uid = uid
        self.curstage = 'initialize'
        self.username = username
        self.starttime = time.time()
        self.endtime = None
        self.stages_root = CONFIG('stages_root')
        self.package = package
        self.cvs_tag = cvs_tag
        self.target = target
        self.buildarches = []
        self.sub_jobs = {}
        self.failed = False

    def get_cur_stage(self):
        return self.curstage
        
    def get_uid(self):
        return self.uid
        
    def arch_handling(self, hdr):
        archs = []
        targets = CONFIG('targets')
        buildable_arches = targets[self.target]
        
        ba = hdr['buildarchs']
        exclusive = hdr['exclusivearch']
        exclude = hdr['excludearch']
        
        if ba == ['noarch']:
            return ba

        # default to building all arches
        tmparchs = []
        for arch in buildable_arches:
            tmparchs.append(arch)

        if ba:
            tmparchs = ba
        else:
            if exclusive:
                tmparchs = exclusive
                
        if exclude:
            for arch in exclude:
                if arch in tmparchs:
                    tmparchs.remove(arch)
        
        # we probably need to check the archs, and make sure they are what 
        # we can build for
        for thisarch in tmparchs:
            if thisarch in buildable_arches:
                archs.append(thisarch)

        return archs

        
    def _make_stage_dir(self):
        self.pkgsubdir = '%s-%d/%s-%s' % (self.name, self.starttime, self.ver, self.release)
        self.target_and_pkg_subdir = '%s/%s' % (self.target, self.pkgsubdir)
        self.stage_dir = os.path.join(self.stages_root, self.curstage, self.target_and_pkg_subdir)
        if not os.path.exists(self.stage_dir):
            os.makedirs(self.stage_dir)
        return self.stage_dir

        
    def _checkout(self):
        self.curstage = 'checkout'
        self.tmpdir = tempfile.mkdtemp(prefix=self.cvs_tag, dir=CONFIG('tmpdir'))
        os.chdir(self.tmpdir)
        cmd = '%s co -r %s %s' % (CONFIG('cvs_cmd'), self.cvs_tag, self.package)
        debugprint("%d: Running %s" % (self.uid, cmd))
        s, o = commands.getstatusoutput(cmd)
        if s != 0:
            subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
            msg = "could not check out %s from %s - output was:\n %s" % (self.cvs_tag, self.target, o)
            self.email_result(resultstring=msg, subject=subj)
            self.curstage = 'finished'
            self.failed = True
            return

        if CONFIG('redhat_internal_cvs') == 1:
            os.chdir(os.path.join(self.tmpdir, self.package))
            cmd = '%s co common' % CONFIG('cvs_cmd')
            debugprint("%d: Running %s" % (self.uid, cmd))
            s, o = commands.getstatusoutput(cmd)
            if s != 0:
                subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
                msg = "could not check out common directory for %s from %s - output was:\n %s" % (self.cvs_tag, self.target, o)
                self.email_result(resultstring=msg, subject=subj)
                self.curstage = 'finished'
                self.failed = True
                return

    def _make_srpm(self):
        self.curstage = 'make_srpm'
        self.srpmpath = None
        packagedir = os.path.join(self.tmpdir, self.package)
        if not os.path.exists(packagedir):
            subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
            msg = "could not find path %s for %s." % (packagedir, self.cvs_tag)
            self.email_result(resultstring=msg, subject=subj)
            self.curstage = 'finished'
            self.failed = True
            return

        if CONFIG('redhat_internal_cvs') == 1:
            make_srpm_dir = os.path.join(packagedir, self.target)
        else:
            make_srpm_dir = packagedir
        os.chdir(make_srpm_dir)

        cmd = '%s srpm' % CONFIG('make_cmd')
        debugprint("%d: Running %s in %s" % (self.uid, cmd, make_srpm_dir))
        s, o = commands.getstatusoutput(cmd)
        if s != 0:
            subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
            msg = "could not make srpm for %s - output was:\n %s" % (self.cvs_tag, o)
            self.email_result(resultstring=msg, subject=subj)
            self.curstage = 'finished'
            self.failed = True
            return
        
        srpmpath = None
        for line in o.split("\n"):
            if line.startswith("Wrote:"):
                line.replace("\n", "")
                (garbage, path) = line.split(':')
                srpmpath = path.strip()
                break
        if not srpmpath:
            subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
            msg = "could not find srpm for %s - output was:\n %s" % (self.cvs_tag, o)
            self.email_result(resultstring=msg, subject=subj)
            self.curstage = 'finished'
            self.failed = True
            return
        self.srpmpath = srpmpath

    def _prep(self):
        self.curstage = 'prep'
        ts = rpmUtils.transaction.initReadOnlyTransaction()
        hdr = rpmUtils.miscutils.hdrFromPackage(ts, self.srpmpath)
        self.name = hdr['name']
        self.ver = hdr['version']
        self.release = hdr['release']
        self.buildarches = self.arch_handling(hdr)
        if len(self.buildarches) == 0:
            self.failed = True
            self.curstage = 'finished'
            del hdr
            del ts
            return;
        self._make_stage_dir()
        self._createrepo(stage='needsign')
        
        for arch in self.buildarches:
            thisdir = os.path.join(self.stage_dir, arch)
            if not os.path.exists(thisdir):
                os.makedirs(thisdir)
        
        srpm = os.path.basename(self.srpmpath)
        srpm_in_dir = os.path.join(self.stage_dir, srpm)
        if os.path.exists(srpm_in_dir):
            os.unlink(srpm_in_dir)
        shutil.move(self.srpmpath, self.stage_dir)
        self.srpmpath = srpm_in_dir
        del hdr
        del ts
        
    def process(self):
        # Advance to next stage based on current stage
        if self.curstage == 'initialize':
            self._checkout()
        elif self.curstage == 'checkout':
            self._make_srpm()
        elif self.curstage == 'make_srpm':
            self._prep()
        elif self.curstage == 'prep' or self.curstage == 'building':
            self._monitor()
        elif self.curstage == 'finished':
            self._cleanup()
        elif self.curstage == 'cleanup':
            if self.failed:
                self._failed()
            else:
                self._succeeded()


    def _start_unspawned_builds(self):
        for arch in self.buildarches:
            if not self.sub_jobs.has_key(arch):
                job = self.awm.new_job_on_arch(arch, self.srpmpath, self.target, self.stage_dir)
                if job:
                    if job.start() == True:
                        self.awm.track_job(job)
                        self.sub_jobs[arch] = job
                        print "%s: Started job %s with builder id %s" % (self.uid, self.package, job.jobid)
                else:
                    print "%d: Waiting for free buildhost on %s" % (self.uid, arch)
                    del job

    def _monitor(self):
        self.curstage = 'building'
        self._start_unspawned_builds()

        have_jobs = False
        jobs_running = 0
        for job in self.sub_jobs.values():
            have_jobs = True
            if job.status == 'done':
                continue
            else:
                jobs_running = 1
                # if we'd rather have all builds finish
                # even if an arch fails we should remove this check
                if job.status == 'failed' or job.status == 'killed': 
                    self.failed = True

        if self.failed or (have_jobs == True and jobs_running == 0):
            self.curstage = 'finished'

    def _cleanup(self):
        self.curstage = 'cleanup'
        if self.failed:
            # Kill remaining jobs on other arches
            for job in self.sub_jobs.values():
                if job.status != 'done' and job.status != 'killed' and job.status != 'failed':
                    job.die()
            
    def _failed(self):
        self.curstage = 'failed'
        old_stage = self.stage_dir
        dest = self._make_stage_dir()
        if os.path.exists(dest):
            shutil.rmtree(dest, ignore_errors=True)
        shutil.move(old_stage, dest)
        for job in self.sub_jobs.values():
            buildroot = 'fedora-%s-%s-core' % (self.target, job.arch)
            stage_arch = os.path.join(self.stage_dir, job.arch)
            build_log = '%s/mach/%s/%s-%s-%s/rpm.log' % (CONFIG('tmpdir'), buildroot,
                                            self.name, self.ver, self.release)
            if os.path.exists(build_log):
                bl = open(build_log, 'r')
            else:
                bl = None
            if not os.path.exists(stage_arch):
                os.makedirs(stage_arch)
            fn = '%s/%s-%s-%s.failure.log' % (stage_arch, self.name, self.ver, self.release)
            print "%d: Logfile (%s) is: %s" % (self.uid, job.arch, fn)
            logfile = open(fn, 'w')

            if job.status == 'killed':
                lines = ['Build process terminated due to failure on another arch\n']
            else:
                lines = job.logs()

            for line in lines:
                logfile.write(line)
            if bl:
                for line in bl.readlines():
                    logfile.write(line)
                bl.close()
            logfile.close()
            
        # markup status file
        resultstring = """
%s: Build of %s on %s failed to complete on one or more archs. Please see logs at:
%s/%s/%s""" % (self.uid, self.name, self.target, CONFIG('log_url'), self.target, self.name)
        self.email_result(resultstring)
        return False
        
    def _succeeded(self):
        self.curstage = 'needsign'
        old_stage = self.stage_dir
        dest = self._make_stage_dir()
        if os.path.exists(dest):
            shutil.rmtree(dest, ignore_errors=True)
        shutil.move(old_stage, dest)
        # markup status file
        resultstring = """
%s: Build of %s on %s succeeded.
""" % (self.uid, self.name, self.target)
        self.email_result(resultstring)
        self._createrepo()
        return True

    def email_result(self, resultstring, subject=None):
        """send 'resultstring' to self.email from self.email_from"""
        
        print "%d:   resultstring: %s" % (self.uid, resultstring)
        return
        msg = MIMEText(resultstring)
        if not subject:
            subject = 'Build Result: %s on %s' % (self.name, self.target)
        msg['Subject'] = subject
        msg['From'] = CONFIG('email_from')
        email_to = '%s@%s' % (self.username, CONFIG('email_to_domain'))
        msg['To'] = email_to
        s = smtplib.SMTP()
        s.connect()
        s.sendmail(CONFIG('email_from'), [email_to], msg.as_string())
        s.close()

    def _createrepo(self, stage=None):
        # createrepo on the needsign tree for new changes
        if not stage:
            stage = self.curstage
        repodir = os.path.join(self.stages_root, stage, self.target)
        print "%d: repodir %s" % (self.uid, repodir)
        if not os.path.exists(repodir):
            os.makedirs(repodir)
        s, o = commands.getstatusoutput('/usr/bin/createrepo -q %s' % repodir)
        if s != 0:
            self.curstage = 'failed'
            raise PrepError(5, 'Error generating repodata for %s: %s' % (repodir, o))




--- NEW FILE buildmaster.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# copyright 2005 Duke University
# written by Seth Vidal


import time
from bm_server_config import CONFIG
from buildjob import BuildJob
from buildjob import PrepError
import sqlite
import threading


def ensure_build_db_tables(dbcx):
    """ Central routine to create the database table structure """

    curs = dbcx.cursor()
    try:
        curs.execute('SELECT * FROM jobs')
        dbcx.commit()
    except Exception, e:
        # If DB wasn't created, try to create it
        curs.execute('CREATE TABLE jobs (uid INTEGER PRIMARY KEY, ' \
                'username VARCHAR(20), package VARCHAR(50), ' \
                'cvs_tag VARCHAR(75), target VARCHAR(20), ' \
                'buildreq VARCHAR(75), time_submitted BIGINT, ' \
                'status VARCHAR(15))')
        dbcx.commit()


class BuildMaster(threading.Thread):
    def __init__(self, arch_welder_manager):
        self.awm = arch_welder_manager
        self.building_jobs = []
        self.should_stop = False
        self.dbcx = sqlite.connect("buildmaster_db", encoding="utf-8",
                                        timeout=2)
        self.curs = self.dbcx.cursor()
        ensure_build_db_tables(self.dbcx)
        threading.Thread.__init__(self)

    def __del__(self):
        self.dbcx.close()

    def stop(self):
        self.should_stop = True

    def set_job_status(self, job):
        status = job.get_cur_stage()
        job_uid = job.get_uid()
        self.curs.execute('UPDATE jobs SET status="%s" WHERE uid=%d' \
                % (status, job_uid))

    def run(self):
        while True:
            # Update all ArchWelders and known jobs
            self.awm.process()

            # Allow each job some processing time
            for job in self.building_jobs:
                job.process()
                self.set_job_status(job)
                if job.get_cur_stage() == 'failed' or job.get_cur_stage() == 'needsign':
                    print "%d: Job finished." % job.get_uid()
                    self.building_jobs.remove(job)

            # Grab one waiting job from database and start it
            self.curs.execute('SELECT uid, username, package, cvs_tag, target' \
                    ' FROM jobs WHERE status="waiting"')
            self.dbcx.commit()
            item = self.curs.fetchone()
            if item:
                print "%d: Adding (%s/%s/%s) to build queue" % (item['uid'], \
                        item['package'], item['cvs_tag'], item['target'])
                job = BuildJob(item['uid'], item['username'], item['package'],
                        item['cvs_tag'], item['target'], self.awm)
                self.building_jobs.append(job)

            time.sleep(1)
            if self.should_stop == True:
                break





More information about the fedora-extras-commits mailing list