#!/usr/bin/python # Authors: # Petr Vobornik # # Copyright (C) 2012 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pycurl import json import StringIO import urllib from collections import OrderedDict # # Script configuration # class ipa_config: def __init__(self): self.domain = 'idm.lab.bos.redhat.com' self.host = 'vm-126' self.user = 'admin' self.password = 'kokos123'#'blablabla' self.verbose = False self.verbose_curl = False self.target_dir = '/home/pvoborni/dev/freeipa/json/' self.target_domain = 'example.com' self.target_hostname = 'dev.example.com' self.methods = None self.groups = None self.display_output = False self.save_output = False self.preserve_output = False self.show_methods = False self.show_groups = False self.cookie_file = '/home/pvoborni/bin/pyjson.cookie' def hostname(self): return self.host+'.'+self.domain # # Globals # true = 'true' false = 'false' ipa_commands = {} #keys: command names, values: commands ipa_groups = {} #keys: group names, values: list of command names def new_command(name, group, method, params, options): command = ipa_command(name) command.init_by_attrs(method, params, options) ipa_commands[name] = command add_to_group(group, name) def copy_command(name, group, command_spec): command = ipa_command(name) command.init_by_command(command_spec) ipa_commands[name] = command add_to_group(group, name) def group(name): ipa_groups[name] = []; def add_to_group(name, command_name): group = ipa_groups[name] group.append(command_name) # # Base class for custom commands # class ipa_command: name = '' method = '' params = [] options = {} def __init__(self, name): self.name = name def create_json_obj(self): json_obj = { "method": self.method, "params": [self.params, self.options], } return json.dumps(json_obj) def init_by_attrs(self, method, params, options): self.method = method self.params = params self.options = options def init_by_command(self, command): self.method = command['method'] params = command['params'] self.params = list(params[0]) self.options = dict(params[1]) def login(self, config): url = 'https://%s/ipa/session/login_password' % config.hostname() request = { 'user': config.user, 'password': config.password } request_data = urllib.urlencode(request) result = StringIO.StringIO() c = pycurl.Curl() c.setopt(pycurl.URL, url) c.setopt(pycurl.HTTPHEADER, [ "Content-Type: application/x-www-form-urlencoded", "Referer: %s" % url ] ) #set POST fields c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, request_data) c.setopt(pycurl.WRITEFUNCTION, result.write) c.setopt(pycurl.SSL_VERIFYPEER, False) c.setopt(pycurl.COOKIEFILE, config.cookie_file) c.setopt(pycurl.COOKIEJAR, config.cookie_file) if config.verbose_curl: c.setopt(pycurl.VERBOSE, 1) c.perform() def execute(self, config): request_json = self.create_json_obj() url = 'https://%s/ipa/session/json' % config.hostname() result = StringIO.StringIO() c = pycurl.Curl() c.setopt(pycurl.URL, url) c.setopt(pycurl.HTTPHEADER, [ "Content-Type: application/json", "Accept: applicaton/json", "Referer: %s" % url ] ) c.setopt(pycurl.COOKIEFILE, config.cookie_file) c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, request_json) c.setopt(pycurl.WRITEFUNCTION, result.write) c.setopt(pycurl.SSL_VERIFYPEER, False) if config.verbose_curl: print '\nRequest:\n' print request_json print '\n' c.setopt(pycurl.VERBOSE, 1) c.perform() return result.getvalue() def run(self, config): self.login(config) result = self.execute(config) return result def filename(self): return "%s.json" % self.name # # Custom Commands # ## Users group('user') new_command( 'user_show', 'user', 'user_show', ['karel'], {'all': true, 'rights': true} ) ## Hosts group('host') new_command( 'host_show', 'host', 'host_show', ['vm-125.idm.lab.bos.redhat.com'], {'all': true, 'rights': true} ) ## IPA init group('init') new_command( 'ipa_init', 'init', 'batch', [ {"method":"i18n_messages","params":[[],{}]}, {"method":"user_find","params":[[],{"whoami":true,"all":true}]}, {"method":"env","params":[[],{}]}, {"method":"dns_is_enabled","params":[[],{}]}], {} ) new_command( 'ipa_init_objects', 'init', 'json_metadata', [], {"object":"all"} ) new_command( 'ipa_init_commands', 'init', 'json_metadata', [], {"command":"all"} ) new_command( 'ipa_init_methods', 'init', 'json_metadata', [], {"method":"all"} ) ## Selinuxusermaps group('selinuxusermap') new_command( 'selinuxusermap_find_pkeys', 'selinuxusermap', 'selinuxusermap_find', [], {"all":true,"pkey_only":true,"sizelimit":0} ) copy_command( 'selinuxusermap_get_records', 'selinuxusermap', { "method":"batch", "params":[ [{"method":"selinuxusermap_show","params":[["karel_unconfined"],{"all":true}]}], {} ] } ) new_command( 'selinuxusermap_show', 'selinuxusermap', 'selinuxusermap_show', ["karel_unconfined"], {"all":true,"rights":true} ) copy_command( 'selinuxusermap_details_update', 'selinuxusermap', {"method":"selinuxusermap_mod","params":[["karel_unconfined"],{"all":true,"rights":true,"description":"description"}]} ) copy_command( 'selinuxusermap_add_user','selinuxusermap', {"method":"selinuxusermap_add_user","params":[["karel_unconfined"],{"group":"user-group"}]} ) copy_command( 'selinuxusermap_remove_user','selinuxusermap', {"method":"selinuxusermap_remove_user","params":[["karel_unconfined"],{"group":"user-group"}]} ) copy_command( 'selinuxusermap_add_host','selinuxusermap', {"method":"selinuxusermap_add_host","params":[["karel_unconfined"],{"host":"vm-040.idm.lab.bos.redhat.com"}]} ) copy_command( 'selinuxusermap_remove_host','selinuxusermap', {"method":"selinuxusermap_remove_host","params":[["karel_unconfined"],{"host":"vm-040.idm.lab.bos.redhat.com"}]} ) #dns group('dnsrecord') new_command( 'dnsrecord_show', 'dnsrecord', 'dnsrecord_show', ["idm.lab.bos.redhat.com", "my"], {"all":true,"rights":true,"structured":true} ) new_command( 'dnsrecord_show_old', 'dnsrecord', 'dnsrecord_show', ["idm.lab.bos.redhat.com", "my"], {"all":true,"rights":true} ) new_command( 'dnsrecord_del', 'dnsrecord', 'dnsrecord_del', ["idm.lab.bos.redhat.com", "my"], {"arecord": "10.10.10.10", "structured":true} ) new_command( 'dnsrecord_add', 'dnsrecord', 'dnsrecord_add', ["idm.lab.bos.redhat.com", "my"], {"a_part_ip_address":"10.10.10.15", "structured":true} ) group('dnszone') new_command( 'dnszone_show', 'dnszone', 'dnszone_show', ['idm.lab.bos.redhat.com'], {"all":true,"rights":true} ) #dnsconfig group('dnsconfig') new_command( 'dnsconfig_show', 'dnsconfig', 'dnsconfig_show', [], {"all":true,"rights":true} ) new_command( 'dnsconfig_mod', 'dnsconfig', 'dnsconfig_mod', [], {"all":true,"rights":true, "idnsforwarders": "2001:beef::1"} ) #automember group('automember') new_command( 'automembergroup_add', 'automember', 'automember_add', ["foogroup"], {"type": "group"} ) new_command( 'automemberhostgroup_add', 'automember', 'automember_add', ["foohostgroup"], {"type": "hostgroup"} ) new_command( 'automembergroup_find_pkeys', 'automember', 'automember_find', [], {"type": "group", "all":true,"pkey_only":true,"sizelimit":0} ) new_command( 'automemberhostgroup_find_pkeys', 'automember', 'automember_find', [], {"type": "hostgroup", "all":true,"pkey_only":true,"sizelimit":0} ) copy_command( 'automembergroup_get_records', 'automember', { "method":"batch", "params":[ [{"method":"automember_show", "params":[["foogroup"],{"type": "group", "all":true}]}], {} ] } ) copy_command( 'automemberhostgroup_get_records', 'automember', { "method":"batch", "params":[ [{"method":"automember_show", "params":[["foohostgroup"],{"type": "hostgroup", "all":true}]}], {} ] } ) new_command( 'automembergroup_show', 'automember', 'automember_show', ["foogroup"], {"type": "group", "all":true,"rights":true} ) new_command( 'automemberhostgroup_show', 'automember', 'automember_show', ["foohostgroup"], {"type": "hostgroup", "all":true,"rights":true} ) new_command( 'automembergroup_default_group_show', 'automember', 'automember_default_group_show', [], {"type": "group"} ) new_command( 'automemberhostgroup_default_group_show', 'automember', 'automember_default_group_show', [], {"type": "hostgroup"} ) new_command( 'automembergroup_default_group_set', 'automember', 'automember_default_group_set', [], {"type": "group", "automemberdefaultgroup": "foogroup"} ) new_command( 'automemberhostgroup_default_group_set', 'automember', 'automember_default_group_set', [], {"type": "hostgroup", "automemberdefaultgroup": "foohostgroup"} ) new_command( 'automembergroup_default_group_remove', 'automember', 'automember_default_group_remove', [], {"type": "group"} ) new_command( 'automemberhostgroup_default_group_remove', 'automember', 'automember_default_group_remove', [], {"type": "hostgroup"} ) # Sessions group('session') new_command( 'session_logout', 'session', 'session_logout', [], {} ) # # Formatter # class replace_host_formatter: pairs = OrderedDict() def __init__(self, config): self.config = config self.init_pairs() def init_pairs(self): source_host = self.config.hostname() dest_host = self.config.target_hostname source_dn = self.get_host_dn(source_host) dest_dn = self.get_host_dn(dest_host) source_upper = source_host.upper() dest_upper = dest_host.upper() source_upper_dash = source_upper.replace('.','-') dest_upper_dash = dest_upper.replace('.','-') source_domain = self.config.domain dest_domain = self.config.target_domain self.pairs[source_host] = dest_host self.pairs[source_dn] = dest_dn self.pairs[source_upper] = dest_upper self.pairs[source_upper_dash] = dest_upper_dash self.pairs[source_domain] = dest_domain def get_host_dn(self, hostname): hostname = hostname parts = hostname.split('.') for i, part in enumerate(parts): parts[i] = 'dc='+part dn = ','.join(parts) return dn def format(self, input_str): for k, v in self.pairs.iteritems(): input_str = input_str.replace(k, v) #remove trailing white spaces lines = input_str.split('\n'); count = len(lines) for i, line in enumerate(lines): line = line.rstrip() if i == count - 3: line = line.rstrip(',') # preserve valid JSON for version deletion lines[i] = line del lines[3] # delete principal del lines[count - 3] #delete version input_str = '\n'.join(lines) return input_str # # Program # class ipa_json: def __init__(self): self.config = ipa_config() def parse_args(self): parser = argparse.ArgumentParser(description='Tool for creating ipa .json file for offline Web UI testing') parser.add_argument('--host', metavar='HOST', action='store', default = 'vm-126', help='Base of ipa server host name') parser.add_argument('--domain', metavar='DOMAIN', action='store', default = 'idm.lab.bos.redhat.com', help='Base domain of the ipa server host.') parser.add_argument('--methods', metavar='METHOD', dest='methods', action='store', nargs='+', help='Methods to execute') parser.add_argument('--groups', metavar='GROUP', dest='groups', action='store', nargs='+', help='Method groups to execute') parser.add_argument('-v', dest='verbose', action='store_true', help='More verbose output') parser.add_argument('--vcurl', dest='verbose_curl', action='store_true', help='Verbose curl call') parser.add_argument('--preserve-output', dest='preserve_output', action='store_true', help='Force to preserve output as recieved') parser.add_argument('-d', '--display-output', dest='display_output', action='store_true', help='Display final output') parser.add_argument('-s', '--save-output', dest='save_output', action='store_true', help='Save final output to file') parser.add_argument('--show-methods', metavar='GROUP|all', dest='show_methods', action='store', help='Print available methods') parser.add_argument('--show-groups', dest='show_groups', action='store_true', help='Print available groups') args = parser.parse_args() return args def get_and_run_command(self, command_name): if command_name == '': if self.config.verbose: print 'Command name not specified' return if command_name not in ipa_commands: if self.config.verbose: print 'Not existing command: %s' % command_name return command = ipa_commands[command_name] self.run_single_command(command) def run_single_command(self, command): if self.config.verbose: print 'Source host: %s' % self.config.hostname() print 'Command: %s' % command.filename() print 'Target file: %s%s' % (self.config.target_dir, command.filename()) result = command.run(self.config) if not self.config.preserve_output: formatter = replace_host_formatter(self.config) result = formatter.format(result) if self.config.display_output: print result if self.config.save_output: filename = '%s%s' % (self.config.target_dir, command.filename()) if self.config.verbose: print 'Writing to: %s' % filename f = open(filename,"w") f.write(result) f.close() return def run_command_group(self, name): if name not in ipa_groups: if self.config.verbose: print 'Not existing group: %s' % name return group = ipa_groups[name] self.run_methods(group) def run_methods(self, method_names): for method in method_names: self.get_and_run_command(method) def run_groups(self, group_names): for group in group_names: self.run_command_group(group) def print_methods(self): which = self.config.show_methods methods = None if which == 'all': methods = list(ipa_commands.keys()) elif which in ipa_groups: methods = ipa_groups[which] if methods == None: print 'Not existing group: %s' % which return methods.sort() print 'Methods: %s' % ' '.join(methods) def print_groups(self): groups = list(ipa_groups.keys()) groups.sort() print 'Groups: %s' % ' '.join(groups) def run(self): args = self.parse_args() self.config.__dict__.update(args.__dict__) something_done = False if self.config.methods is not None: self.run_methods(self.config.methods) something_done = True if self.config.groups is not None: self.run_groups(self.config.groups) something_done = True if self.config.show_methods: self.print_methods() something_done = True if self.config.show_groups: self.print_groups() something_done = True if not something_done and self.config.verbose: print 'Nothing to do' return 0 # # Main # if __name__ == '__main__': import argparse program = ipa_json() program.run()