#!/usr/bin/python # Copyright (C) 2006 Red Hat, Inc # Author by Dan Walsh, # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # avc_snap - this script is is launched by the auditd and listense on # stdin for Audit messages. It then searches for AVC Messages and assembles # them into a python dictionary for later processing by the AVC Plugins # # import os, string, select, struct, syslog # audit python provided by audit-python package # avc python provided by policycoreutils package import audit, avc # # Collect a list of audit messages until a timeout or the audit signature changes # Check to see if the list contains an AUDIT_AVC message # If yes # create the dictionary using serules. # For now just syslog the dictionary, # eventually will call analyze and process avc message # else # discard # class avc_snap: def __init__(self): self.audit_list = [] self.cur_sig = "" def is_avc(self): for i in self.audit_list: if i[0] == audit.AUDIT_AVC: return True return False def out(self): if self.is_avc(): rules=avc.SERules() l=[] for ( type, data_list ) in self.audit_list: l += data_list syslog.syslog("data %s" % string.join(data_list, " ")) rules.translate(l) for dict in rules.AVCS: for d in dict.keys(): syslog.syslog("%s->%s" % (d, dict[d])) self.audit_list = [] def process(self, type, data): data_list=data.split() new_sig=data_list[0] if len(self.audit_list) > 0 and new_sig != self.cur_sig: self.out() self.cur_sig = new_sig self.audit_list.append((type, data_list[1:])) def run(self): while 1: input,output, err = select.select([0],[], [], 5) try: if 0 in input: baseformat="iiii" theline=os.read(0,16 + audit.MAX_AUDIT_MESSAGE_LENGTH) numremain = len(theline)-struct.calcsize(baseformat) format = "%s%ds" % (baseformat, numremain) version, size, type, length, data = struct.unpack(format, theline) # syslog.syslog("append type=%d data %s" % (type,data[:length])) self.process(type, data[:length]) else: self.out() except struct.error, e: syslog.syslog("struct exception %s " % e.args) except TypeError, e: syslog.syslog("Type exception %s " % e.args) try: snap=avc_snap() snap.run() except IOError,e: syslog.syslog("IOError exception %s" % e.args) except Exception, e: syslog.syslog("Unexpected exception %s " % e.args)