extras-buildsys/server FileDownloader.py, NONE, 1.1 FileServer.py, NONE, 1.1 buildserver.py, NONE, 1.1 client_manager.py, NONE, 1.1 CONFIG.py, 1.2, 1.3 buildjob.py, 1.2, 1.3 buildmaster.py, 1.1.1.1, 1.2 aw_manager.py, 1.2, NONE bm_server.py, 1.2, NONE fileserver.py, 1.1, NONE

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Tue Jun 7 12:10:25 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv28321/server

Modified Files:
	CONFIG.py buildjob.py buildmaster.py 
Added Files:
	FileDownloader.py FileServer.py buildserver.py 
	client_manager.py 
Removed Files:
	aw_manager.py bm_server.py fileserver.py 
Log Message:
2005-06-07  Dan Williams <dcbw at redhat.com>

    * Rework much of the build system to support HTTP transfer of SRPMs to
      build clients, and of logs & RPMs back to the build server.  Simple
      SRPM building is broken right now but will be fixed soon.




--- NEW FILE FileDownloader.py ---
#!/usr/bin/python -t
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# Copyright 2005 Dan Williams and Red Hat, Inc.
#

import threading
import urlgrabber
import urllib
import string
import os


def log(stuff=''):
    print stuff


def get_base_filename_from_url(url):
    """ Safely unquotes a URL and gets the base file name from it.
        We're not using urlparse here because it doesn't un-escape stuff """

    unquoted = url
    last_unquoted = None
    count = 5
    
    # Keep unquoting the string until the last two unquote operations
    # produce the same string
    while (unquoted != last_unquoted) and (count > 0):
        last_unquoted = unquoted
        unquoted = urllib.unquote_plus(unquoted)
        count = count - 1

    # If after 5 iterations of unquoting, the strings still aren't the same,
    # something is wrong.
    if (count == 0) and (unquoted != last_unquoted):
        return None

    # Try to grab the filename off the end of the URL
    index = url.rfind('/')
    if index is -1:
        return None
    filename = url[index+1:]

    if not filename.endswith('.rpm') and not filename.endswith('.log'):
        return None

    # FIXME: what other validation can we do here?
    for c in filename:
        # For now, legal characters are '_-.' plus alphanumeric
        if (c == '_') or (c == '-') or (c == '.') or c.isalnum():
            pass
        else:
            return None

    return filename


class FileDownloader(threading.Thread):

    def __init__(self, build_job, url, target_dir):
        self._build_job = build_job
        self._url = url
        self._target = None
        filename = get_base_filename_from_url(self._url)
        if filename:
            self._target = os.path.join(target_dir, filename)
        else:
            print "Couldn't get base filename from url!!  target_dir=%s, url=%s" % (target_dir, url)
        threading.Thread.__init__(self)

    def run(self):
        success = False
        if self._url and self._target:
            os.chdir(os.path.dirname(self._target))
            result = urlgrabber.urlgrab(self._url, self._target)
#            try:
#                # Get the SRPM from the build server
#                result = urlgrabber.urlgrab(self._url, self._target)
#            except Exception, e:
#                pass
#            else:
            if result:
                success = True

        if success:
            self._build_job.set_download_status(self._url, 3)
            log("%s/%s: Finished downloading %s" % (self._build_job.jobid, self._build_job.arch, self._url))
        else:
            # Indicate an error
            self._build_job.set_download_status(self._url, 2)
            log("%s/%s: Failed to download %s" % (self._build_job.jobid, self._build_job.arch, self._url))



--- NEW FILE FileServer.py ---
#!/usr/bin/python -t
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# Copyright 2005 Dan Williams and Red Hat, Inc.
#

import SimpleHTTPServer
import SocketServer
import threading
import os
import urllib
import posixpath
import CONFIG

BaseRequestHandler = SimpleHTTPServer.SimpleHTTPRequestHandler
BaseHttpServer = SocketServer.TCPServer

class HttpRequestHandler(BaseRequestHandler):

    def __init__(self, request, client_address, server):
        self._server = server
        BaseRequestHandler.__init__(self, request, client_address, server)

    def list_directory(self, path):
        self.send_error(404, "No permission to list directory")

    def log_request(self, code='-', size='-'):
        # Don't log requests
        pass

    def translate_path(self, path):
        """Translate a /-separated PATH to the local filename syntax.

        Components that mean special things to the local file system
        (e.g. drive or directory names) are ignored.  (XXX They should
        probably be diagnosed.)

        This code is lifted from SimpleHTTPRequestHandler so that we can
        make sure the request is always based in our download directory,
        not the current directory.
        """
        path = posixpath.normpath(urllib.unquote(path))
        words = path.split('/')
        words = filter(None, words)
        path = self._server.http_dir
        for word in words:
            drive, word = os.path.splitdrive(word)
            head, word = os.path.split(word)
            if word in (os.curdir, os.pardir): continue
            path = os.path.join(path, word)
        return path

    def do_GET(self):
        BaseRequestHandler.do_GET(self)
#        try:
#            BaseRequestHandler.do_GET(self)
#        except Exception, e:
#            # We get an exception if the client drops the transfer
#            pass

class ThreadingHttpServer(SocketServer.ThreadingTCPServer):

    def __init__(self, server_address, RequestHandlerClass, http_dir):
        self.protocol_version = "HTTP/1.0"    # Don't want keepalive
        self.allow_reuse_address = 1
        self.http_dir = http_dir
        BaseHttpServer.__init__(self, server_address, RequestHandlerClass)


class FileServer(threading.Thread):

    def __init__(self, address_tuple, http_dir):
        self._server = ThreadingHttpServer(address_tuple, HttpRequestHandler, http_dir)
        threading.Thread.__init__(self)

    def run(self):
        self._server.serve_forever()


--- NEW FILE buildserver.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# copyright 2005 Duke University
# written by Seth Vidal


import time
import CONFIG
import socket
import os
import SimpleXMLRPCServer
import xmlrpclib
from buildjob import BuildJob
import sqlite
import smtplib
from email.MIMEText import MIMEText
import threading
from buildmaster import BuildMaster
from buildmaster import ensure_build_db_tables
from client_manager import BuildClientManager
import FileServer

def email_result(username, cvs_tag, resultstring, subject=None):
    """send 'resultstring' to username"""
    
    msg = MIMEText(resultstring)
    if not subject:
        subject = 'Build Result: %s' % cvs_tag
    msg['Subject'] = subject
    msg['From'] = CONFIG.get('email_from')
    email_to = '%s@%s' % (username, CONFIG.get('email_to_domain'))
    msg['To'] = email_to
    s = smtplib.SMTP()
    s.connect()
    s.sendmail(CONFIG.get('email_from'), [email_to], msg.as_string())
    s.close()


class XMLRPCBuildMaster:
    def __init__(self, client_manager):
        self.bcm = client_manager
        self.dbcx = sqlite.connect("buildmaster_db", encoding="utf-8",
                                        timeout=2)
        self.curs = self.dbcx.cursor()
        ensure_build_db_tables(self.dbcx)

    def __del__(self):
        self.dbcx.close()

    def enqueue(self, username, package, cvs_tag, target, buildreq=None):
        """ Accept a job to build and stuff it into the job database """

        # FIXME  we should be passing back a tuple of (error/success, reason)

        if CONFIG.get('use_srpm_not_cvs') == True:
            email_result(username, cvs_tag, "Error setting up build for %s on "\
                    "%s: this server builds SRPMs, not CVS checkouts." % (cvs_tag, target))
            return -1

        print "Request to enqueue '%s' tag '%s' for target '%s' (user '%s')" \
                % (package, cvs_tag, target, username)
        targets = CONFIG.get('targets')
        if not targets.has_key(target):
            print "Error setting up build for %s on %s: target does not exist."\
                    % (cvs_tag, target)
            email_result(username, cvs_tag, "Error setting up build for %s on "\
                    "%s: target does not exist." % (cvs_tag, target))
            return -1
        else:
            # Insert request into the database
            self.curs.execute('INSERT INTO jobs (uid, username, package,' \
                    ' cvs_tag, target, buildreq, time_submitted, status)' \
                    ' VALUES (NULL, "%s", "%s", "%s", "%s", "%s", %d, "%s")' \
                    % (username, package, cvs_tag, target, buildreq,        \
                    time.time(), 'waiting'))
            self.dbcx.commit()
        return 0

    def enqueue_srpm(self, username, package, srpm_file, target, buildreq=None):
        """ Accept a job to build from SRPM file and stuff it into the job database """

        # FIXME  we should be passing back a tuple of (error/success, reason)

        if CONFIG.get('use_srpm_not_cvs') == False:
            email_result(username, srpm_file, "Error setting up build for %s on "\
                    "%s: this server builds CVS checkouts, not SRPMS." % (srpm_file, target))
            return -1

        # We limit the database field to 255 chars
        if len(srpm_file) > 255:
            email_result(username, srpm_file, "Error setting up build for %s on "\
                    "%s: try using a shorter path to the SRPM (< 255 chars)." % (srpm_file, target))
            return -1

        print "Request to enqueue '%s' file '%s' for target '%s' (user '%s')" \
                % (package, srpm_file, target, username)
        targets = CONFIG.get('targets')
        if not targets.has_key(target):
            print "Error setting up build for %s on %s: target does not exist."\
                    % (srpm_file, target)
            email_result(username, srpm_file, "Error setting up build for %s on "\
                    "%s: target does not exist." % (srpm_file, target))
            return -1
        else:
            # Insert request into the database
            self.curs.execute('INSERT INTO jobs (uid, username, package,' \
                    ' cvs_tag, target, buildreq, time_submitted, status)' \
                    ' VALUES (NULL, "%s", "%s", "%s", "%s", "%s", %d, "%s")' \
                    % (username, package, srpm_file, target, buildreq,        \
                    time.time(), 'waiting'))
            self.dbcx.commit()
        return 0

    def list_waiting_jobs(self):
        self.curs.execute('SELECT uid, username, package, cvs_tag, target' \
                ' FROM jobs WHERE status="waiting"')
        self.dbcx.commit()
        data = self.curs.fetchall()
        job_list = []
        for row in data:
            tempX = [ item for item in row]
            job_list.append(tempX)
        return job_list

    def list_building_jobs(self):
        self.curs.execute('SELECT uid, username, package, cvs_tag, target' \
                ' FROM jobs WHERE status="building"')
        self.dbcx.commit()
        data = self.curs.fetchall()
        job_list = []
        for row in data:
            tempX = [ item for item in row]
            job_list.append(tempX)
        return job_list

    def look_for_clients(self):
        reload(CONFIG)
        print "-----------------------------------------------------"
        print " Looking for Build Clients..."
        self.bcm.update_client_instances()
        print "-----------------------------------------------------\n"
        return 0


class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
    """ XMLRPC server subclass that turns on SO_REUSEADDR """

    def __init__(self, address):
        SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, addr=address, logRequests=False)

    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        self.socket.bind(self.server_address)


if __name__ == '__main__':
    bcm = BuildClientManager()

    # Create the BuildMaster thread
    bm = BuildMaster(bcm)
    bm.start()

    # Create the BuildMaster XMLRPC server
    xmlrpc_bm = XMLRPCBuildMaster(bcm)
    bm_server = MyXMLRPCServer((CONFIG.get('hostname'), 8887))
    bm_server.register_instance(xmlrpc_bm)

    # SRPM fileserver
    http_dir = os.path.join(CONFIG.get('server_work_dir'), "srpm_http_dir")
    srpm_server = FileServer.FileServer((CONFIG.get('hostname'), 8886), http_dir)
    srpm_server.start()

    print "BuildMaster accepting requests on %s:8887.\n" % CONFIG.get('hostname')
    try:
        bm_server.serve_forever()
    except Exception:
        # Make sure the BuildMaster thread shuts down
        print "Shutting down..."
        bm.stop()

    print "Done."
    os._exit(0)




--- NEW FILE client_manager.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# copyright 2005 Duke University
# written by Seth Vidal

import time
import string
import xmlrpclib
import CONFIG
import sys
import socket
import FileDownloader
import os
import urllib

def result_is_finished(result):
    if result == 'done' or result == 'killed' or result == 'failed':
        return True
    return False


class BuildClientJob:
    """ Tracks a single build instance for a single arch on an BuildClient """

    def __init__(self, bci, parent_job, server, target, arch, srpm_url):
        self.parent_job = parent_job
        self.bci = bci
        self.jobid = None
        self.status = 'initialize'
        self.client_result = ''
        self.target = target
        self.arch = arch
        self.srpm_url = srpm_url
        self._server = server
        self.starttime = None
        self.downloads = {}

    def start(self):
        try:
            self.jobid = self._server.start(self.target, self.arch, self.srpm_url)
        except Exception, e:
            print "Error starting job on host %s\n\t---error: %s" % (self.bci.address(), e)
            self.status = 'failed'
            return False
        else:
            self.starttime = time.time()
            self.status = 'running'
            self.process()
            return True

    def process(self):
        if self.status == 'done':
            return
        elif self.status == 'running':
            result = ''
            try:
                result = self._server.status(self.jobid)
            except Exception, e:
                print "BuildClientJob(%s): got error '%s' from build client while attempting " \
                        "to get its status." % (self.bci.address(), e)
            self.client_result = result

            # if the builder is done, grab list of files to download
            if result == 'done' or result == 'killed' or result == 'failed':
                self.status = 'downloading'
                print "%s (%s): Files to download from build client:" % (self.jobid, self.arch)
                for f in self._server.files(self.jobid):
                    uf = urllib.unquote(f)
                    print "     %s" % uf
                    self.downloads[uf] = 0
                print "%s (%s): Done" % (self.jobid, self.arch)
        elif self.status == 'downloading':
            # Start grabbing the next undownloaded file, but only
            # if we aren't already pulling one down
            #
            # Download states:
            #   0: waiting
            #   1: in progress
            #   2: error
            #   3: done
            undownloaded = False
            for url in self.downloads.keys():
                dl_status = self.downloads[url]
                if dl_status == 0:
                    # spawn the download
                    target_dir = os.path.join(self.parent_job.get_stage_dir(), self.arch)
                    if not os.path.exists(target_dir):
                        os.makedirs(target_dir)
                    dl_thread = FileDownloader.FileDownloader(self, url, target_dir)
                    dl_thread.start()
                    undownloaded = True
                    self.downloads[url] = 1
                    break
                elif dl_status == 1:
                    # in progress
                    break
                elif dl_status == 2:
                    # error
                    continue
                elif dl_status == 3:
                    # this one is done
                    continue
            if not undownloaded:
                self.status = 'done'

    def get_status(self):
        return self.status

    def get_builder_result(self):
        return self.client_result

    def get_files(self):
        files = []
        for url in self.downloads.keys():
            fname = FileDownloader.get_base_filename_from_url(url)
            if fname and self.downloads[url] == 3:
                files.append(fname)
        return files

    def set_download_status(self, url, status):
        if self.downloads.has_key(url):
            self.downloads[url] = status

    def server_gone(self):
        if self.status == 'done':
            return
        self.parent_job.job_server_gone(self)

    def die(self):
        self._server.die(self.jobid)
        self.status = 'done'


class BuildClientInstance:
    """ Tracks an single arch on an BuildClient """

    def __init__(self, awm, address, arch):
        self._awm = awm
        self._jobs = []
        self._arch = arch
        self._address = address
        self._server = xmlrpclib.Server(self._address)
        self._unavail_count = 0
        self._cur_job = self._get_cur_job()

    def arch(self):
        return self._arch

    def address(self):
        return self._address
    
    def new_job(self, parent_job, target, srpm_url):
        return BuildClientJob(self, parent_job, self._server, target, self._arch, srpm_url)

    def track_job(self, job):
        self._jobs.append(job)

    def _get_cur_job(self):
        cur_job = None
        try:
            cur_job = self._server.get_cur_job()
        except socket.error, e:
            # Check for "Connection refused" or "Connection reset by peer"
            if e[0] == 111 or e[0] == 104:
                self._unavail_count = self._unavail_count + 1
            else:
                print "bci(%s): got error '%s' from AW while trying to get " \
                        "current job number" % (self._address, e)
        else:
            self._unavail_count = 0
            if cur_job == 0:
                cur_job = None
        return cur_job

    def process(self):
        self._cur_job = self._get_cur_job()

        # Update status of all jobs
        for j in self._jobs:
            j.process()

        # If we haven't been able to contact the BuildClient for a bit, kill build
        # jobs on this BuildClient
        if self._unavail_count > 2:
            for job in self._jobs:
                job.server_gone()
                del job
            # Return 1 to indicate we should be killed
            return 1

        return 0
    
    def available(self):
        if self._cur_job:
            return False
        else:
            return True

class BuildClientManager:
    def __init__(self):
        # List of addresses of possible builders
        self.possible_aw = CONFIG.get('builders')

        # Dict:  arches => available builders
        # Like so:  [ 'i386':['10.0.0.1', '10.0.0.2'],
        #             'x86_64':['10.0.0.3']
        #           ]
        self.running_aw = {}

        print "-----------------------------------------------------"
        print " Looking for BuildClients..."
        self.update_client_instances()
        print "-----------------------------------------------------\n"

    def update_client_instances(self):
        # Figure out which clients are alive, and what they support
        # We create a separate client instance for each arch on each builder,
        # even though both instances talk to the same XMLRPC server on the builder
        for address in self.possible_aw:
            # If the address is already in our running_aw list, skip it
            skip = False
            for bci_list in self.running_aw.values():
                for bci in bci_list:
                    if address == bci.address():
                        skip = True
            if skip == True:
                continue

            arches = None
            server = xmlrpclib.Server(address)
            try:
                arches = server.supported_arches()
            except socket.error, e:
                pass
            if arches:
                arches.append('noarch')
                print "   New AW: '%s' [%s]" % (address, string.join(arches))
                for a in arches:
                    if not self.running_aw.has_key(a):
                        self.running_aw[a] = []
                    bci = BuildClientInstance(self, address, a)
                    self.running_aw[a].append(bci)
            del server

    def process(self):
        """ Allow each BuildClientInstance to update its status and do some processing """
        for bci_list in self.running_aw.values():
            for bci in bci_list:
                if bci.process() == 1:
                    # Remove the BuildClientInstance from our lists
                    print "Removing BuildClient '%s'/%s because it timed out." % (bci.address(), bci.arch())
                    bci_list.remove(bci)

    def track_job(self, job):
        if job:
            job.bci.track_job(job)

    def new_job_on_arch(self, parent_job, target, arch, srpm_url):
        """ Create a job on a free builder for this arch """

        if self.running_aw.has_key(arch):
            for aw in self.running_aw[arch]:
                if aw.available():
                    return aw.new_job(parent_job, target, srpm_url)
        return None



Index: CONFIG.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/CONFIG.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- CONFIG.py	4 Jun 2005 23:14:51 -0000	1.2
+++ CONFIG.py	7 Jun 2005 12:10:23 -0000	1.3
@@ -4,16 +4,21 @@
 config_opts['hostname'] = "localhost"
 config_opts['email_to_domain'] = "redhat.com"
 config_opts['email_from'] = "buildsys at fedoraproject.org"
-config_opts['stages_root'] = "/rpmbuild/extras/stages"
-config_opts['pkg_cvs_root'] = ":gserver:cvs.devel.redhat.com:/cvs/dist"
-config_opts['pkg_cvs_rsh'] = "/usr/kerberos/bin/krsh"
+config_opts['pkg_cvs_root'] = ":pserver:anonymous at cvs.fedora.redhat.com:/cvs/dist"
+config_opts['pkg_cvs_rsh'] = ""
 config_opts['cvs_cmd'] = "/usr/bin/cvs"
 config_opts['make_cmd'] = "/usr/bin/make"
 config_opts['tmpdir'] = "/tmp"
-config_opts['redhat_internal_cvs'] = 1
 config_opts['log_url'] = "http://foo.foo.org/logs/"
 
-config_opts['srpm_http_dir'] = "/tmp/srpm_stage_dir"
+# server_work_dir
+#   - Where logs and finished RPMs are stored
+config_opts['server_work_dir'] = "/rpmbuild"
+
+# repo_dir
+#   - Repository dir of up-to-date RPMs
+config_opts['repo_dir'] = "/repodir"
+
 
 # This option disables pulling from CVS.  Allowing jobs to be submitted
 # as unknown SRPMs from random people may be a security risk, so don't


Index: buildjob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/buildjob.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- buildjob.py	4 Jun 2005 23:14:51 -0000	1.2
+++ buildjob.py	7 Jun 2005 12:10:23 -0000	1.3
@@ -33,11 +33,12 @@
 import xmlrpclib
 import CONFIG
 import socket
-from aw_manager import ArchWelderManager
-from aw_manager import ArchWelderJob
+from client_manager import BuildClientManager
+from client_manager import BuildClientJob
 
 os.environ['CVSROOT'] = CONFIG.get('pkg_cvs_root')
-os.environ['CVS_RSH'] = CONFIG.get('pkg_cvs_rsh')
+if len(CONFIG.get('pkg_cvs_rsh')) > 0:
+    os.environ['CVS_RSH'] = CONFIG.get('pkg_cvs_rsh')
 
 DEBUG = True
 def debugprint(stuff=''):
@@ -65,17 +66,18 @@
 
         
 
+http_dir = os.path.join(CONFIG.get('server_work_dir'), "srpm_http_dir")
+
 class BuildJob:
     """ Controller object for building 1 SRPM on multiple arches """
 
-    def __init__(self, uid, username, package, cvs_tag, target, arch_welder_manager):
+    def __init__(self, uid, username, package, cvs_tag, target, client_manager):
         self.curstage = 'initialize'
-        self.awm = arch_welder_manager
+        self.bcm = client_manager
         self.uid = uid
         self.username = username
         self.starttime = time.time()
         self.endtime = None
-        self.stages_root = CONFIG.get('stages_root')
         self.package = package
         self.target = target
         self.buildarches = []
@@ -135,11 +137,12 @@
         
     def _make_stage_dir(self, rootdir):
         # The dir will look like this:
-        # /builder/devel/finished/foo-3463467347734/1.1.0-23
-        pkgsubdir = '%s-%d/%s-%s' % (self.name, self.starttime, self.ver, self.release)
-        stage_dir = os.path.join(rootdir, self.target, self.curstage, pkgsubdir)
-        if not os.path.exists(stage_dir):
-            os.makedirs(stage_dir)
+        # <rootdir>/devel/95-foo-1.1.0-23
+        pkgsubdir = '%d-%s-%s-%s' % (self.uid, self.name, self.ver, self.release)
+        stage_dir = os.path.join(rootdir, self.target, pkgsubdir)
+        if os.path.exists(stage_dir):
+            shutil.rmtree(stage_dir, ignore_errors=True)
+        os.makedirs(stage_dir)
         return stage_dir
 
         
@@ -148,6 +151,8 @@
         dir_prefix = self.cvs_tag + "-"
         self.checkout_tmpdir = tempfile.mkdtemp(prefix=dir_prefix, dir=CONFIG.get('tmpdir'))
         os.chdir(self.checkout_tmpdir)
+
+        # Checkout the module
         cmd = '%s co -r %s %s' % (CONFIG.get('cvs_cmd'), self.cvs_tag, self.package)
         debugprint("%d: Running %s" % (self.uid, cmd))
         s, o = commands.getstatusoutput(cmd)
@@ -160,14 +165,18 @@
             shutil.rmtree(self.checkout_tmpdir, True)
             return
 
-        if CONFIG.get('redhat_internal_cvs') == 1:
-            os.chdir(os.path.join(self.checkout_tmpdir, self.package))
+        # Just in case the 'common' directory didn't come along for the ride,
+        # get it from CVS
+        pkg_path = os.path.join(self.checkout_tmpdir, self.package)
+        if not os.path.exists(os.path.join(pkg_path, "common")):
+            os.chdir(pkg_path)
             cmd = '%s co common' % CONFIG.get('cvs_cmd')
             debugprint("%d: Running %s" % (self.uid, cmd))
             s, o = commands.getstatusoutput(cmd)
+            os.chdir(self.checkout_tmpdir)
             if s != 0:
                 subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
-                msg = "could not check out common directory for %s from %s - output was:\n %s" % (self.cvs_tag, self.target, o)
+                msg = "could not check out common directory - output was:\n %s" % (self.cvs_tag, self.target, o)
                 self.email_result(resultstring=msg, subject=subj)
                 self.curstage = 'finished'
                 self.failed = True
@@ -177,24 +186,20 @@
     def _make_srpm(self):
         self.curstage = 'make_srpm'
         self.srpm_path = None
-        packagedir = os.path.join(self.checkout_tmpdir, self.package)
-        if not os.path.exists(packagedir):
+        srpm_dir = os.path.join(self.checkout_tmpdir, self.package, self.target)
+        if not os.path.exists(srpm_dir):
             subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
-            msg = "could not find path %s for %s." % (packagedir, self.cvs_tag)
+            msg = "could not find path %s for %s." % (srpm_dir, self.cvs_tag)
             self.email_result(resultstring=msg, subject=subj)
             self.curstage = 'finished'
             self.failed = True
             shutil.rmtree(self.checkout_tmpdir, True)
             return
 
-        if CONFIG.get('redhat_internal_cvs') == 1:
-            make_srpm_dir = os.path.join(packagedir, self.target)
-        else:
-            make_srpm_dir = packagedir
-        os.chdir(make_srpm_dir)
+        os.chdir(srpm_dir)
 
         cmd = '%s srpm' % CONFIG.get('make_cmd')
-        debugprint("%d: Running %s in %s" % (self.uid, cmd, make_srpm_dir))
+        debugprint("%d: Running %s in %s" % (self.uid, cmd, srpm_dir))
         s, o = commands.getstatusoutput(cmd)
         if s != 0:
             subj = 'Prep Error: %s on %s' % (self.cvs_tag, self.target)
@@ -202,7 +207,7 @@
             self.email_result(resultstring=msg, subject=subj)
             self.curstage = 'finished'
             self.failed = True
-            shutil.rmtree(self.checkout_tmpdir, True)
+            #shutil.rmtree(self.checkout_tmpdir, True)
             return
         
         srpmpath = None
@@ -237,28 +242,31 @@
             self.failed = True
             self.curstage = 'finished'
             return
-        self.stage_dir = self._make_stage_dir(self.stages_root)
-        self._createrepo(stage='needsign')
 
+        # Make sure build clients see latest packages
+        self._createrepo()
+
+        self.stage_dir = self._make_stage_dir(CONFIG.get('server_work_dir'))
         for arch in self.buildarches:
             thisdir = os.path.join(self.stage_dir, arch)
             if not os.path.exists(thisdir):
                 os.makedirs(thisdir)
-        
+
+        # Copy the SRPM to the final package product dir
         srpm = os.path.basename(self.srpm_path)
         srpm_in_dir = os.path.join(self.stage_dir, srpm)
         if os.path.exists(srpm_in_dir):
             os.unlink(srpm_in_dir)
         shutil.copy(self.srpm_path, self.stage_dir)
 
-        # Must also copy it to where the build client can get it over HTTP
-        http_pkg_path = self._make_stage_dir(CONFIG.get('srpm_http_dir'))
+        # Must also copy SRPM to where the build client can get it over HTTP
+        http_pkg_path = self._make_stage_dir(http_dir)
         self.srpm_http_path = os.path.join(http_pkg_path, srpm)
         shutil.copy(self.srpm_path, self.srpm_http_path)
         self.srpm_path = srpm_in_dir
 
         # Remove CVS checkout and make_srpm dirs
-        shutil.rmtree(self.checkout_tmpdir, True)
+        shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
 
     def process(self):
         # Advance to next stage based on current stage
@@ -283,43 +291,41 @@
     def job_server_gone(self, job):
         """ Remove a job from our building queue if its server went away """
 
-        print "%d: ArchWelder for %s went away...  Will start new job for %s" % (self.uid, job.arch, job.arch)
+        print "%d: BuildClient for %s went away...  Will start new job for %s" % (self.uid, job.arch, job.arch)
         del self.sub_jobs[job.arch]
 
     def _start_unspawned_builds(self):
         for arch in self.buildarches:
             if not self.sub_jobs.has_key(arch):
                 # Construct SPRM URL
-                srpm_http_base = self.srpm_http_path[len(CONFIG.get('srpm_http_dir')):]
+                srpm_http_base = self.srpm_http_path[len(http_dir):]
                 srpm_url = "http://" + CONFIG.get('hostname') + ":8886" + srpm_http_base
-                print "SRPM URL: " + srpm_url
-                job = self.awm.new_job_on_arch(self, self.target, arch, srpm_url)
+                # print "SRPM URL: " + srpm_url
+                job = self.bcm.new_job_on_arch(self, self.target, arch, srpm_url)
                 if job:
                     if job.start() == True:
-                        self.awm.track_job(job)
+                        self.bcm.track_job(job)
                         self.sub_jobs[arch] = job
-                        print "%s: Started job %s with builder id %s" % (self.uid, self.package, job.jobid)
-                else:
-                    del job
+                        print "%s: Started job %s (%s) with builder id %s" % (self.uid, self.package, arch, job.jobid)
 
     def _monitor(self):
         self.curstage = 'building'
         self._start_unspawned_builds()
 
         have_jobs = False
-        jobs_running = 0
+        jobs_running = False
         for job in self.sub_jobs.values():
             have_jobs = True
-            if job.status == 'done':
+            job_status = job.get_status()
+            if job_status is 'downloading' or job_status is 'running':
+                jobs_running = True
                 continue
-            else:
-                jobs_running = 1
-                # if we'd rather have all builds finish
-                # even if an arch fails we should remove this check
-                if job.status == 'failed' or job.status == 'killed': 
+            elif job_status is 'done':
+                builder_result = job.get_builder_result()
+                if builder_result == 'failed' or builder_result == 'killed': 
                     self.failed = True
 
-        if self.failed or (have_jobs == True and jobs_running == 0):
+        if self.failed or (have_jobs == True and jobs_running == False):
             self.curstage = 'finished'
 
     def _cleanup(self):
@@ -327,71 +333,46 @@
         if self.failed:
             # Kill remaining jobs on other arches
             for job in self.sub_jobs.values():
-                if job.status != 'done' and job.status != 'killed' and job.status != 'failed':
+                job_status = job.get_status()
+                if job_status is 'initialize' or job_status is 'running':
                     job.die()
-            
+
+    def get_stage_dir(self):
+        return self.stage_dir
+
     def _failed(self):
         old_stage = self.curstage
         self.curstage = 'failed'
-        if old_stage is not 'initialize' and old_stage is not 'checkout' and old_stage is not 'make_srpm':
-            old_stage_dir = self.stage_dir
-            dest = self._make_stage_dir(self.stages_root)
-            if os.path.exists(dest):
-                shutil.rmtree(dest, ignore_errors=True)
-            shutil.move(old_stage_dir, dest)
-            for job in self.sub_jobs.values():
-                buildroot = 'fedora-%s-%s-core' % (self.target, job.arch)
-                stage_arch = os.path.join(self.stage_dir, job.arch)
-                build_log = '%s/mach/%s/%s-%s-%s/rpm.log' % (CONFIG.get('tmpdir'), buildroot,
-                                                self.name, self.ver, self.release)
-                if os.path.exists(build_log):
-                    bl = open(build_log, 'r')
-                else:
-                    bl = None
-                if not os.path.exists(stage_arch):
-                    os.makedirs(stage_arch)
-                fn = '%s/%s-%s-%s.failure.log' % (stage_arch, self.name, self.ver, self.release)
-                print "%d: Logfile (%s) is: %s" % (self.uid, job.arch, fn)
-                logfile = open(fn, 'w')
-
-                if job.status == 'killed':
-                    lines = ['Build process terminated due to failure on another arch\n']
-                else:
-                    lines = job.logs()
-
-                for line in lines:
-                    logfile.write(line)
-                if bl:
-                    for line in bl.readlines():
-                        logfile.write(line)
-                    bl.close()
-                logfile.close()
-                
-            # markup status file
-            resultstring = """
-    %s: Build of %s on %s failed to complete on one or more archs. Please see logs at:
-    %s/%s/%s""" % (self.uid, self.name, self.target, CONFIG.get('log_url'), self.target, self.name)
-        else:
-            resultstring = """
+
+        resultstring = """
     %s: Build of %s (%s) on %s failed to complete on one or more archs.
 """ % (self.uid, self.package, self.cvs_tag, self.target)
+
         self.email_result(resultstring)
-        return False
         
     def _succeeded(self):
         self.curstage = 'needsign'
-        old_stage = self.stage_dir
-        dest = self._make_stage_dir(self.stages_root)
-        if os.path.exists(dest):
-            shutil.rmtree(dest, ignore_errors=True)
-        shutil.move(old_stage, dest)
-        # markup status file
+
+        # Copy completed RPMs to repo dir here
+        for job in self.sub_jobs.values():
+            file_list = job.get_files()
+            for f in file_list:
+                if not f.endswith(".rpm"):
+                    continue
+                src_file = os.path.join(self.stage_dir, job.arch, f)
+                verrel = "%s-%s" % (self.ver, self.release)
+                dst_path = os.path.join(CONFIG.get('repo_dir'), self.target, self.name, verrel, job.arch)
+                if not os.path.exists(dst_path):
+                    os.makedirs(dst_path)
+                shutil.copy(src_file, dst_path)
+
         resultstring = """
 %s: Build of %s on %s succeeded.
 """ % (self.uid, self.name, self.target)
         self.email_result(resultstring)
+
+        # Udpate the repo with new packages
         self._createrepo()
-        return True
 
     def email_result(self, resultstring, subject=None):
         """send 'resultstring' to self.email from self.email_from"""
@@ -410,17 +391,16 @@
         s.sendmail(CONFIG.get('email_from'), [email_to], msg.as_string())
         s.close()
 
-    def _createrepo(self, stage=None):
+    def _createrepo(self):
         # createrepo on the needsign tree for new changes
-        if not stage:
-            stage = self.curstage
-        repodir = os.path.join(self.stages_root, stage, self.target)
-        print "%d: repodir %s" % (self.uid, repodir)
+        repodir = os.path.join(CONFIG.get('repo_dir'), self.target)
+        print "%d: updating repodir %s" % (self.uid, repodir)
         if not os.path.exists(repodir):
             os.makedirs(repodir)
         s, o = commands.getstatusoutput('/usr/bin/createrepo -q %s' % repodir)
         if s != 0:
-            self.curstage = 'failed'
-            raise PrepError(5, 'Error generating repodata for %s: %s' % (repodir, o))
+            print "Createrepo failed!!!"
+            #self.curstage = 'failed'
+            #raise PrepError(5, 'Error generating repodata for %s: %s' % (repodir, o))
 
 


Index: buildmaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/buildmaster.py,v
retrieving revision 1.1.1.1
retrieving revision 1.2
diff -u -r1.1.1.1 -r1.2
--- buildmaster.py	1 Jun 2005 04:15:01 -0000	1.1.1.1
+++ buildmaster.py	7 Jun 2005 12:10:23 -0000	1.2
@@ -42,8 +42,8 @@
 
 
 class BuildMaster(threading.Thread):
-    def __init__(self, arch_welder_manager):
-        self.awm = arch_welder_manager
+    def __init__(self, client_manager):
+        self.bcm = client_manager
         self.building_jobs = []
         self.should_stop = False
         self.dbcx = sqlite.connect("buildmaster_db", encoding="utf-8",
@@ -66,8 +66,8 @@
 
     def run(self):
         while True:
-            # Update all ArchWelders and known jobs
-            self.awm.process()
+            # Update all build clients and known jobs
+            self.bcm.process()
 
             # Allow each job some processing time
             for job in self.building_jobs:
@@ -86,7 +86,7 @@
                 print "%d: Adding (%s/'%s'/%s) to build queue" % (item['uid'], \
                         item['package'], item['cvs_tag'], item['target'])
                 job = BuildJob(item['uid'], item['username'], item['package'],
-                        item['cvs_tag'], item['target'], self.awm)
+                        item['cvs_tag'], item['target'], self.bcm)
                 self.building_jobs.append(job)
 
             time.sleep(5)


--- aw_manager.py DELETED ---


--- bm_server.py DELETED ---


--- fileserver.py DELETED ---




More information about the fedora-extras-commits mailing list