[Freeipa-devel] [PATCH] 0201 Add support for an external trust to Active Directory domain
Alexander Bokovoy
abokovoy at redhat.com
Tue Jun 7 20:25:43 UTC 2016
On Tue, 07 Jun 2016, Alexander Bokovoy wrote:
> > del attrs['ipanttrusttype']
> > + if attributes:
> > + del attrs['ipanttrustattributes']
> > """
> Updated patch is attached.
Another update, forgot one space in the allow_behavior().
I also spent some time and did pep8 fixes for dcerpc.py. I reduced
reported errors down to 22 from 260+. These 22 are for lines longer than
79 characters and I don't want to reduce them further because they are
smaller than 84 characters already.
--
/ Alexander Bokovoy
-------------- next part --------------
From e9e8b4af5bf834aaceb9a7835733bd3641d5f93c Mon Sep 17 00:00:00 2001
From: Alexander Bokovoy <abokovoy at redhat.com>
Date: Mon, 6 Jun 2016 08:55:44 +0300
Subject: [PATCH 1/5] trusts: Add support for an external trust to Active
Directory domain
External trust is a trust that can be created between Active Directory
domains that are in different forests or between an Active Directory
domain. Since FreeIPA does not support non-Kerberos means of
communication, external trust to Windows NT 4.0 or earlier domains is
not supported.
The external trust is not transitive and can be established to any
domain in another forest. This means no access beyond the external
domain is possible via the trust link.
Resolves: https://fedorahosted.org/freeipa/ticket/5743
---
API.txt | 3 ++-
ipaserver/dcerpc.py | 50 +++++++++++++++++++++++++++----------
ipaserver/plugins/trust.py | 61 +++++++++++++++++++++++++++++++++++-----------
3 files changed, 86 insertions(+), 28 deletions(-)
diff --git a/API.txt b/API.txt
index f170930..d5fbc27 100644
--- a/API.txt
+++ b/API.txt
@@ -5208,12 +5208,13 @@ arg: Str('cn', cli_name='name')
option: Str('version?')
output: Output('result')
command: trust_add
-args: 1,14,3
+args: 1,15,3
arg: Str('cn', cli_name='realm')
option: Str('addattr*', cli_name='addattr')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Int('base_id?', cli_name='base_id')
option: Bool('bidirectional?', cli_name='two_way', default=False)
+option: Bool('external?', cli_name='external', default=False)
option: Int('range_size?', cli_name='range_size')
option: StrEnum('range_type?', cli_name='range_type', values=[u'ipa-ad-trust-posix', u'ipa-ad-trust'])
option: Flag('raw', autofill=True, cli_name='raw', default=False)
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index 25ffbfa..5f56643 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -77,6 +77,11 @@ and Samba4 python bindings.
TRUST_ONEWAY = 1
TRUST_BIDIRECTIONAL = 3
+# Trust join behavior
+# External trust -- allow creating trust to a non-root domain in the forest
+TRUST_JOIN_EXTERNAL = 1
+
+
def is_sid_valid(sid):
try:
security.dom_sid(sid)
@@ -1037,7 +1042,7 @@ class TrustDomainInstance(object):
# We can ignore the error here -- setting up name suffix routes may fail
pass
- def establish_trust(self, another_domain, trustdom_secret, trust_type='bidirectional'):
+ def establish_trust(self, another_domain, trustdom_secret, trust_type='bidirectional', trust_external=False):
"""
Establishes trust between our and another domain
Input: another_domain -- instance of TrustDomainInstance, initialized with #retrieve call
@@ -1060,6 +1065,8 @@ class TrustDomainInstance(object):
info.trust_direction |= lsa.LSA_TRUST_DIRECTION_OUTBOUND
info.trust_type = lsa.LSA_TRUST_TYPE_UPLEVEL
info.trust_attributes = 0
+ if trust_external:
+ info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_NON_TRANSITIVE
try:
dname = lsa.String()
@@ -1096,14 +1103,17 @@ class TrustDomainInstance(object):
# server as that one doesn't support AES encryption types
pass
- try:
- info = self._pipe.QueryTrustedDomainInfo(trustdom_handle, lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX)
- info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE
- self._pipe.SetInformationTrustedDomain(trustdom_handle, lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info)
- except RuntimeError as e:
- root_logger.error('unable to set trust to transitive: %s' % (str(e)))
+ if not trust_external:
+ try:
+ info = self._pipe.QueryTrustedDomainInfo(trustdom_handle,
+ lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX)
+ info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE
+ self._pipe.SetInformationTrustedDomain(trustdom_handle,
+ lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info)
+ except RuntimeError as e:
+ root_logger.error('unable to set trust transitivity status: %s' % (str(e)))
- if self.info['is_pdc']:
+ if self.info['is_pdc'] or trust_external:
self.update_ftinfo(another_domain)
def verify_trust(self, another_domain):
@@ -1262,6 +1272,7 @@ class TrustDomainJoins(object):
self.api = api
self.local_domain = None
self.remote_domain = None
+ self.__allow_behavior = 0
domain_validator = DomainValidator(api)
self.configured = domain_validator.is_configured()
@@ -1271,6 +1282,10 @@ class TrustDomainJoins(object):
self.local_dn = domain_validator.dn
self.__populate_local_domain()
+ def allow_behavior(self, *flags):
+ for f in flags:
+ self.__allow_behavior |= int(f)
+
def __populate_local_domain(self):
# Initialize local domain info using kerberos only
ld = TrustDomainInstance(self.local_flatname)
@@ -1358,14 +1373,19 @@ class TrustDomainJoins(object):
realm_passwd
)
+ trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL)
if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']:
- raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'], domain=self.remote_domain.info['dns_domain'])
+ if not trust_external:
+ raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'],
+ domain=self.remote_domain.info['dns_domain'])
if not self.remote_domain.read_only:
trustdom_pass = samba.generate_random_password(128, 128)
self.get_realmdomains()
- self.remote_domain.establish_trust(self.local_domain, trustdom_pass, trust_type)
- self.local_domain.establish_trust(self.remote_domain, trustdom_pass, trust_type)
+ self.remote_domain.establish_trust(self.local_domain,
+ trustdom_pass, trust_type, trust_external)
+ self.local_domain.establish_trust(self.remote_domain,
+ trustdom_pass, trust_type, trust_external)
# if trust is inbound, we don't need to verify it because AD DC will respond
# with WERR_NO_SUCH_DOMAIN -- in only does verification for outbound trusts.
result = True
@@ -1381,8 +1401,12 @@ class TrustDomainJoins(object):
if not(isinstance(self.remote_domain, TrustDomainInstance)):
self.populate_remote_domain(realm, realm_server, realm_passwd=None)
+ trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL)
if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']:
- raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'], domain=self.remote_domain.info['dns_domain'])
+ if not trust_external:
+ raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'],
+ domain=self.remote_domain.info['dns_domain'])
- self.local_domain.establish_trust(self.remote_domain, trustdom_passwd, trust_type)
+ self.local_domain.establish_trust(self.remote_domain,
+ trustdom_passwd, trust_type, trust_external)
return dict(local=self.local_domain, remote=self.remote_domain, verified=False)
diff --git a/ipaserver/plugins/trust.py b/ipaserver/plugins/trust.py
index ee0ab5d..744be93 100644
--- a/ipaserver/plugins/trust.py
+++ b/ipaserver/plugins/trust.py
@@ -62,8 +62,10 @@ except Exception as e:
if api.env.in_server and api.env.context in ['lite', 'server']:
try:
- import ipaserver.dcerpc #pylint: disable=F0401
- from ipaserver.dcerpc import TRUST_ONEWAY, TRUST_BIDIRECTIONAL
+ import ipaserver.dcerpc # pylint: disable=F0401
+ from ipaserver.dcerpc import (TRUST_ONEWAY,
+ TRUST_BIDIRECTIONAL,
+ TRUST_JOIN_EXTERNAL)
import dbus
import dbus.mainloop.glib
_bindings_installed = True
@@ -162,11 +164,18 @@ trust_output_params = (
label=_('Trust type')),
Str('truststatus',
label=_('Trust status')),
+ Str('ipantadditionalsuffixes*',
+ label=_('UPN suffixes')),
)
+# Trust type is a combination of ipanttrusttype and ipanttrustattributes
+# We shift trust attributes by 3 bits to left so bit 0 becomes bit 3 and
+# 2+(1 << 3) becomes 10.
_trust_type_dict = {1 : _('Non-Active Directory domain'),
2 : _('Active Directory domain'),
- 3 : _('RFC4120-compliant Kerberos realm')}
+ 3 : _('RFC4120-compliant Kerberos realm'),
+ 10: _('Non-transitive external trust to a domain in another Active Directory forest')}
+
_trust_direction_dict = {1 : _('Trusting forest'),
2 : _('Trusted forest'),
3 : _('Two-way trust')}
@@ -189,14 +198,17 @@ DBUS_IFACE_TRUST = 'com.redhat.idm.trust'
CRED_STYLE_SAMBA = 1
CRED_STYLE_KERBEROS = 2
-def trust_type_string(level):
+LSA_TRUST_ATTRIBUTE_NON_TRANSITIVE = 0x00000001
+
+def trust_type_string(level, attrs):
"""
Returns a string representing a type of the trust. The original field is an enum:
LSA_TRUST_TYPE_DOWNLEVEL = 0x00000001,
LSA_TRUST_TYPE_UPLEVEL = 0x00000002,
LSA_TRUST_TYPE_MIT = 0x00000003
"""
- string = _trust_type_dict.get(int(level), _trust_type_dict_unknown)
+ transitive = int(attrs) & LSA_TRUST_ATTRIBUTE_NON_TRANSITIVE
+ string = _trust_type_dict.get(int(level) | (transitive << 3), _trust_type_dict_unknown)
return unicode(string)
def trust_direction_string(level):
@@ -677,6 +689,12 @@ sides.
doc=(_('Establish bi-directional trust. By default trust is inbound one-way only.')),
default=False,
),
+ Bool('external?',
+ label=_('External trust'),
+ cli_name='external',
+ doc=(_('Establish external trust to a domain in another forest. The trust is not transitive beyond the domain.')),
+ default=False,
+ ),
)
msg_summary = _('Added Active Directory trust for realm "%(value)s"')
@@ -735,12 +753,15 @@ sides.
fetch_trusted_domains_over_dbus(self.api, self.log, result['value'])
# Format the output into human-readable values
+ attributes = int(result['result'].get('ipanttrustattributes', [0])[0])
result['result']['trusttype'] = [trust_type_string(
- result['result']['ipanttrusttype'][0])]
+ result['result']['ipanttrusttype'][0], attributes)]
result['result']['trustdirection'] = [trust_direction_string(
result['result']['ipanttrustdirection'][0])]
result['result']['truststatus'] = [trust_status_string(
result['verified'])]
+ if attributes:
+ result['result'].pop('ipanttrustattributes', None)
del result['verified']
result['result'].pop('ipanttrustauthoutgoing', None)
@@ -929,6 +950,11 @@ sides.
trust_type = TRUST_ONEWAY
if options.get('bidirectional', False):
trust_type = TRUST_BIDIRECTIONAL
+
+ # If we are forced to establish external trust, allow it
+ if options.get('external', False):
+ self.trustinstance.allow_behavior(TRUST_JOIN_EXTERNAL)
+
# 1. Full access to the remote domain. Use admin credentials and
# generate random trustdom password to do work on both sides
if full_join:
@@ -1033,7 +1059,7 @@ class trust_mod(LDAPUpdate):
class trust_find(LDAPSearch):
__doc__ = _('Search for trusts.')
has_output_params = LDAPSearch.has_output_params + trust_output_params +\
- (Str('ipanttrusttype'),)
+ (Str('ipanttrusttype'), Str('ipanttrustattributes'))
msg_summary = ngettext(
'%(count)d trust matched', '%(count)d trusts matched', 0
@@ -1043,7 +1069,7 @@ class trust_find(LDAPSearch):
# search needs to be done on a sub-tree scope
def pre_callback(self, ldap, filters, attrs_list, base_dn, scope, *args, **options):
# list only trust, not trust domains
- trust_filter = '(ipaNTTrustPartner=*)'
+ trust_filter = '(&(ipaNTTrustPartner=*)(&(objectclass=ipaIDObject)(objectclass=ipaNTTrustedDomain)))'
filter = ldap.combine_filters((filters, trust_filter), rules=ldap.MATCH_ALL)
return (filter, base_dn, ldap.SCOPE_SUBTREE)
@@ -1060,10 +1086,13 @@ class trust_find(LDAPSearch):
for attrs in entries:
# Translate ipanttrusttype to trusttype if --raw not used
- trust_type = attrs.get('ipanttrusttype', [None])[0]
+ trust_type = attrs.single_value.get('ipanttrusttype', None)
+ attributes = attrs.single_value.get('ipanttrustattributes', 0)
if not options.get('raw', False) and trust_type is not None:
- attrs['trusttype'] = trust_type_string(attrs['ipanttrusttype'][0])
+ attrs['trusttype'] = [trust_type_string(trust_type, attributes)]
del attrs['ipanttrusttype']
+ if attributes:
+ del attrs['ipanttrustattributes']
return truncated
@@ -1071,7 +1100,7 @@ class trust_find(LDAPSearch):
class trust_show(LDAPRetrieve):
__doc__ = _('Display information about a trust.')
has_output_params = LDAPRetrieve.has_output_params + trust_output_params +\
- (Str('ipanttrusttype'), Str('ipanttrustdirection'))
+ (Str('ipanttrusttype'), Str('ipanttrustdirection'), Str('ipanttrustattributes'))
def execute(self, *keys, **options):
result = super(trust_show, self).execute(*keys, **options)
@@ -1088,16 +1117,20 @@ class trust_show(LDAPRetrieve):
# if --raw not used
if not options.get('raw', False):
- trust_type = entry_attrs.get('ipanttrusttype', [None])[0]
+ trust_type = entry_attrs.single_value.get('ipanttrusttype', None)
+ attributes = entry_attrs.single_value.get('ipanttrustattributes', 0)
if trust_type is not None:
- entry_attrs['trusttype'] = trust_type_string(trust_type)
+ entry_attrs['trusttype'] = [trust_type_string(trust_type, attributes)]
del entry_attrs['ipanttrusttype']
- dir_str = entry_attrs.get('ipanttrustdirection', [None])[0]
+ dir_str = entry_attrs.single_value.get('ipanttrustdirection', None)
if dir_str is not None:
entry_attrs['trustdirection'] = [trust_direction_string(dir_str)]
del entry_attrs['ipanttrustdirection']
+ if attributes:
+ del entry_attrs['ipanttrustattributes']
+
return dn
--
2.7.4
-------------- next part --------------
From 942364346e930296fba4acfa60e1eda735903ffd Mon Sep 17 00:00:00 2001
From: Alexander Bokovoy <abokovoy at redhat.com>
Date: Tue, 7 Jun 2016 22:41:10 +0300
Subject: [PATCH 7/7] ipaserver/dcerpc: reformat to make the code closer to
pep8
Because Samba Python bindings provide long-named methods and constants,
sometimes it is impossible to fit into 80 columns without causing
damage to readability of the code. This patchset attempts to reduce
pep8 complaints to a minimum.
---
ipaserver/dcerpc.py | 475 +++++++++++++++++++++++++++++++++-------------------
1 file changed, 299 insertions(+), 176 deletions(-)
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index dc25ab7..7e2925b 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -44,10 +44,12 @@ import samba
import random
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.backends import default_backend
+# pylint: disable=F0401
try:
- from ldap.controls import RequestControl as LDAPControl #pylint: disable=F0401
+ from ldap.controls import RequestControl as LDAPControl
except ImportError:
- from ldap.controls import LDAPControl as LDAPControl #pylint: disable=F0401
+ from ldap.controls import LDAPControl as LDAPControl
+# pylint: enable=F0401
import ldap as _ldap
from ipapython.ipaldap import IPAdmin
from ipalib.session import krbccache_dir, krbccache_prefix
@@ -74,7 +76,7 @@ and Samba4 python bindings.
# Both constants can be used as masks against trust direction
# because bi-directional has two lower bits set.
-TRUST_ONEWAY = 1
+TRUST_ONEWAY = 1
TRUST_BIDIRECTIONAL = 3
# Trust join behavior
@@ -91,31 +93,44 @@ def is_sid_valid(sid):
return True
-access_denied_error = errors.ACIError(info=_('CIFS server denied your credentials'))
+access_denied_error = errors.ACIError(
+ info=_('CIFS server denied your credentials'))
dcerpc_error_codes = {
-1073741823:
- errors.RemoteRetrieveError(reason=_('communication with CIFS server was unsuccessful')),
+ errors.RemoteRetrieveError(
+ reason=_('communication with CIFS server was unsuccessful')),
-1073741790: access_denied_error,
-1073741715: access_denied_error,
-1073741614: access_denied_error,
-1073741603:
- errors.ValidationError(name=_('AD domain controller'), error=_('unsupported functional level')),
- -1073741811: # NT_STATUS_INVALID_PARAMETER
+ errors.ValidationError(
+ name=_('AD domain controller'),
+ error=_('unsupported functional level')),
+ -1073741811: # NT_STATUS_INVALID_PARAMETER
errors.RemoteRetrieveError(
- reason=_('AD domain controller complains about communication sequence. It may mean unsynchronized time on both sides, for example')),
- -1073741776: # NT_STATUS_INVALID_PARAMETER_MIX, we simply will skip the binding
+ reason=_('AD domain controller complains about communication '
+ 'sequence. It may mean unsynchronized time on both '
+ 'sides, for example')),
+ -1073741776: # NT_STATUS_INVALID_PARAMETER_MIX,
+ # we simply will skip the binding
access_denied_error,
- -1073741772: # NT_STATUS_OBJECT_NAME_NOT_FOUND
- errors.RemoteRetrieveError(reason=_('CIFS server configuration does not allow access to \\\\pipe\\lsarpc')),
+ -1073741772: # NT_STATUS_OBJECT_NAME_NOT_FOUND
+ errors.RemoteRetrieveError(
+ reason=_('CIFS server configuration does not allow '
+ 'access to \\\\pipe\\lsarpc')),
}
dcerpc_error_messages = {
"NT_STATUS_OBJECT_NAME_NOT_FOUND":
- errors.NotFound(reason=_('Cannot find specified domain or server name')),
+ errors.NotFound(
+ reason=_('Cannot find specified domain or server name')),
"WERR_NO_LOGON_SERVERS":
- errors.RemoteRetrieveError(reason=_('AD DC was unable to reach any IPA domain controller. Most likely it is a DNS or firewall issue')),
+ errors.RemoteRetrieveError(
+ reason=_('AD DC was unable to reach any IPA domain controller. '
+ 'Most likely it is a DNS or firewall issue')),
"NT_STATUS_INVALID_PARAMETER_MIX":
- errors.RequirementError(name=_('At least the domain or IP address should be specified')),
+ errors.RequirementError(
+ name=_('At least the domain or IP address should be specified')),
}
pysss_type_key_translation_dict = {
@@ -126,7 +141,7 @@ pysss_type_key_translation_dict = {
}
-def assess_dcerpc_exception(num=None,message=None):
+def assess_dcerpc_exception(num=None, message=None):
"""
Takes error returned by Samba bindings and converts it into
an IPA error class.
@@ -135,8 +150,9 @@ def assess_dcerpc_exception(num=None,message=None):
return dcerpc_error_codes[num]
if message and message in dcerpc_error_messages:
return dcerpc_error_messages[message]
- reason = _('''CIFS server communication error: code "%(num)s",
- message "%(message)s" (both may be "None")''') % dict(num=num, message=message)
+ reason = _('CIFS server communication error: code "%(num)s", '
+ 'message "%(message)s" (both may be "None")') % \
+ dict(num=num, message=message)
return errors.RemoteRetrieveError(reason=reason)
@@ -182,9 +198,13 @@ class DomainValidator(object):
self._parm = None
def is_configured(self):
- cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
+ cn_trust_local = DN(('cn', self.api.env.domain),
+ self.api.env.container_cifsdomains,
+ self.api.env.basedn)
try:
- entry_attrs = self.ldap.get_entry(cn_trust_local, [self.ATTR_FLATNAME, self.ATTR_SID])
+ entry_attrs = self.ldap.get_entry(cn_trust_local,
+ [self.ATTR_FLATNAME,
+ self.ATTR_SID])
self.flatname = entry_attrs[self.ATTR_FLATNAME][0]
self.sid = entry_attrs[self.ATTR_SID][0]
self.dn = entry_attrs.dn
@@ -203,7 +223,8 @@ class DomainValidator(object):
try:
search_kw = {'objectClass': 'ipaNTTrustedDomain'}
- filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL)
+ filter = self.ldap.make_filter(search_kw,
+ rules=self.ldap.MATCH_ALL)
(entries, truncated) = self.ldap.find_entries(
filter=filter,
base_dn=cn_trust,
@@ -216,22 +237,22 @@ class DomainValidator(object):
# domain names as keys and those are generally case-insensitive
result = ipautil.CIDict()
- for entry in entries:
+ for e in entries:
try:
- trust_partner = entry[self.ATTR_TRUST_PARTNER][0]
- flatname_normalized = entry[self.ATTR_FLATNAME][0].lower()
- trusted_sid = entry[self.ATTR_TRUSTED_SID][0]
- except KeyError as e:
+ t_partner = e.single_value.get(self.ATTR_TRUST_PARTNER)
+ fname_norm = e.single_value.get(self.ATTR_FLATNAME).lower()
+ trusted_sid = e.single_value.get(self.ATTR_TRUSTED_SID)
+ except KeyError as exc:
# Some piece of trusted domain info in LDAP is missing
# Skip the domain, but leave log entry for investigation
api.log.warning("Trusted domain '%s' entry misses an "
- "attribute: %s", entry.dn, e)
+ "attribute: %s", e.dn, exc)
continue
- result[trust_partner] = (flatname_normalized,
- security.dom_sid(trusted_sid))
+ result[t_partner] = (fname_norm,
+ security.dom_sid(trusted_sid))
return result
- except errors.NotFound as e:
+ except errors.NotFound as exc:
return []
def set_trusted_domains(self):
@@ -244,21 +265,22 @@ class DomainValidator(object):
# This means we can't check the correctness of a trusted
# domain SIDs
raise errors.ValidationError(name='sid',
- error=_('no trusted domain is configured'))
+ error=_('no trusted domain '
+ 'is configured'))
def get_domain_by_sid(self, sid, exact_match=False):
if not self.domain:
# our domain is not configured or self.is_configured() never run
# reject SIDs as we can't check correctness of them
raise errors.ValidationError(name='sid',
- error=_('domain is not configured'))
+ error=_('domain is not configured'))
# Parse sid string to see if it is really in a SID format
try:
test_sid = security.dom_sid(sid)
except TypeError:
raise errors.ValidationError(name='sid',
- error=_('SID is not valid'))
+ error=_('SID is not valid'))
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
@@ -314,30 +336,34 @@ class DomainValidator(object):
return None
def get_trusted_domain_objects(self, domain=None, flatname=None, filter="",
- attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None):
+ attrs=None, scope=_ldap.SCOPE_SUBTREE,
+ basedn=None):
"""
- Search for LDAP objects in a trusted domain specified either by `domain'
- or `flatname'. The actual LDAP search is specified by `filter', `attrs',
- `scope' and `basedn'. When `basedn' is empty, database root DN is used.
+ Search for LDAP objects in a trusted domain specified either by
+ `domain' or `flatname'. The actual LDAP search is specified by
+ `filter', `attrs', `scope' and `basedn'. When `basedn' is empty,
+ database root DN is used.
"""
assert domain is not None or flatname is not None
"""Returns SID for the trusted domain object (user or group only)"""
if not self.domain:
# our domain is not configured or self.is_configured() never run
raise errors.ValidationError(name=_('Trust setup'),
- error=_('Our domain is not configured'))
+ error=_('Our domain is '
+ 'not configured'))
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
raise errors.ValidationError(name=_('Trust setup'),
- error=_('No trusted domain is not configured'))
+ error=_('No trusted domain is '
+ 'not configured'))
entries = None
if domain is not None:
if domain not in self._domains:
raise errors.ValidationError(name=_('trusted domain object'),
- error= _('domain is not trusted'))
+ error=_('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
elif flatname is not None:
@@ -347,53 +373,65 @@ class DomainValidator(object):
for domain in self._domains:
if self._domains[domain][0] == flatname:
found_flatname = True
- entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
+ entries = self.search_in_dc(domain, filter,
+ attrs, scope, basedn)
if entries:
break
if not found_flatname:
raise errors.ValidationError(name=_('trusted domain object'),
- error= _('no trusted domain matched the specified flat name'))
+ error=_('no trusted domain '
+ 'matched the specified '
+ 'flat name'))
if not entries:
raise errors.NotFound(reason=_('trusted domain object not found'))
return entries
- def get_trusted_domain_object_sid(self, object_name, fallback_to_ldap=True):
+ def get_trusted_domain_object_sid(self, object_name,
+ fallback_to_ldap=True):
result = pysss_nss_idmap.getsidbyname(object_name)
- if object_name in result and (pysss_nss_idmap.SID_KEY in result[object_name]):
+ if object_name in result and \
+ (pysss_nss_idmap.SID_KEY in result[object_name]):
object_sid = result[object_name][pysss_nss_idmap.SID_KEY]
return object_sid
# If fallback to AD DC LDAP is not allowed, bail out
if not fallback_to_ldap:
raise errors.ValidationError(name=_('trusted domain object'),
- error= _('SSSD was unable to resolve the object to a valid SID'))
+ error=_('SSSD was unable to resolve '
+ 'the object to a valid SID'))
# Else, we are going to contact AD DC LDAP
components = normalize_name(object_name)
if not ('domain' in components or 'flatname' in components):
# No domain or realm specified, ambiguous search
- raise errors.ValidationError(name=_('trusted domain object'),
- error= _('Ambiguous search, user domain was not specified'))
+ raise errors.ValidationError(name=_('trusted domain object'),
+ error=_('Ambiguous search, user '
+ 'domain was not specified'))
attrs = ['objectSid']
- filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \
- % dict(name=components['name'])
+ filter = '(&(sAMAccountName=%(name)s)' \
+ '(|(objectClass=user)(objectClass=group)))' \
+ % dict(name=components['name'])
scope = _ldap.SCOPE_SUBTREE
entries = self.get_trusted_domain_objects(components.get('domain'),
- components.get('flatname'), filter, attrs, scope)
+ components.get('flatname'),
+ filter, attrs, scope)
if len(entries) > 1:
# Treat non-unique entries as invalid
raise errors.ValidationError(name=_('trusted domain object'),
- error= _('Trusted domain did not return a unique object'))
+ error=_('Trusted domain did not '
+ 'return a unique object'))
sid = self.__sid_to_str(entries[0]['objectSid'][0])
try:
test_sid = security.dom_sid(sid)
return unicode(test_sid)
except TypeError as e:
raise errors.ValidationError(name=_('trusted domain object'),
- error= _('Trusted domain did not return a valid SID for the object'))
+ error=_('Trusted domain did not '
+ 'return a valid SID for '
+ 'the object'))
def get_trusted_domain_object_type(self, name_or_sid):
"""
@@ -443,7 +481,8 @@ class DomainValidator(object):
)
attrs = ['sAMAccountName']
- filter = (r'(&(objectSid=%(sid)s)(|(objectClass=user)(objectClass=group)))'
+ filter = (r'(&(objectSid=%(sid)s)'
+ '(|(objectClass=user)(objectClass=group)))'
% dict(sid=escaped_sid)) # sid in binary
domain = self.get_domain_by_sid(sid)
@@ -454,7 +493,8 @@ class DomainValidator(object):
if len(entries) > 1:
# Treat non-unique entries as invalid
raise errors.ValidationError(name=_('trusted domain object'),
- error=_('Trusted domain did not return a unique object'))
+ error=_('Trusted domain did not '
+ 'return a unique object'))
object_name = (
"%s@%s" % (entries[0].single_value['sAMAccountName'].lower(),
@@ -486,27 +526,31 @@ class DomainValidator(object):
# Now search a trusted domain for a user with this SID
attrs = ['cn']
filter = '(&(objectClass=user)(objectSid=%(sid)s))' \
- % dict(sid=object_name)
+ % dict(sid=object_name)
try:
- entries = self.get_trusted_domain_objects(domain=domain, filter=filter,
- attrs=attrs, scope=_ldap.SCOPE_SUBTREE)
+ entries = self.get_trusted_domain_objects(domain=domain,
+ filter=filter,
+ attrs=attrs,
+ scope=_ldap.SCOPE_SUBTREE)
except errors.NotFound:
raise errors.NotFound(reason=_('trusted domain user not found'))
user_dn = entries[0].dn
elif domain or flatname:
attrs = ['cn']
filter = '(&(sAMAccountName=%(name)s)(objectClass=user))' \
- % dict(name=name)
+ % dict(name=name)
try:
entries = self.get_trusted_domain_objects(domain,
- flatname, filter, attrs, _ldap.SCOPE_SUBTREE)
+ flatname, filter, attrs,
+ _ldap.SCOPE_SUBTREE)
except errors.NotFound:
raise errors.NotFound(reason=_('trusted domain user not found'))
user_dn = entries[0].dn
else:
# No domain or realm specified, ambiguous search
raise errors.ValidationError(name=_('trusted domain object'),
- error= _('Ambiguous search, user domain was not specified'))
+ error=_('Ambiguous search, '
+ 'user domain was not specified'))
# Get SIDs of user object and it's groups
# tokenGroups attribute must be read with a scope BASE for a known user
@@ -514,9 +558,11 @@ class DomainValidator(object):
attrs = ['objectSID', 'tokenGroups']
filter = "(objectClass=user)"
entries = self.get_trusted_domain_objects(domain,
- flatname, filter, attrs, _ldap.SCOPE_BASE, user_dn)
+ flatname, filter, attrs,
+ _ldap.SCOPE_BASE, user_dn)
object_sid = self.__sid_to_str(entries[0]['objectSid'][0])
- group_sids = [self.__sid_to_str(sid) for sid in entries[0]['tokenGroups']]
+ group_sids = [self.__sid_to_str(sid)
+ for sid in entries[0]['tokenGroups']]
return (object_sid, group_sids)
def get_trusted_domain_user_and_groups(self, object_name):
@@ -540,11 +586,14 @@ class DomainValidator(object):
if is_valid_sid:
object_sid = object_name
result = pysss_nss_idmap.getnamebysid(object_name)
- if object_name in result and (pysss_nss_idmap.NAME_KEY in result[object_name]):
- group_list = pysss.getgrouplist(result[object_name][pysss_nss_idmap.NAME_KEY])
+ if object_name in result and \
+ (pysss_nss_idmap.NAME_KEY in result[object_name]):
+ group_list = pysss.getgrouplist(
+ result[object_name][pysss_nss_idmap.NAME_KEY])
else:
result = pysss_nss_idmap.getsidbyname(object_name)
- if object_name in result and (pysss_nss_idmap.SID_KEY in result[object_name]):
+ if object_name in result and \
+ (pysss_nss_idmap.SID_KEY in result[object_name]):
object_sid = result[object_name][pysss_nss_idmap.SID_KEY]
group_list = pysss.getgrouplist(object_name)
@@ -552,7 +601,10 @@ class DomainValidator(object):
return self.__get_trusted_domain_user_and_groups(object_name)
group_sids = pysss_nss_idmap.getsidbyname(group_list)
- return (object_sid, [el[1][pysss_nss_idmap.SID_KEY] for el in group_sids.items()])
+ return (
+ object_sid,
+ [el[1][pysss_nss_idmap.SID_KEY] for el in group_sids.items()]
+ )
def __sid_to_str(self, sid):
"""
@@ -561,12 +613,13 @@ class DomainValidator(object):
"""
sid_rev_num = ord(sid[0])
number_sub_id = ord(sid[1])
- ia = struct.unpack('!Q','\x00\x00'+sid[2:8])[0]
+ ia = struct.unpack('!Q', '\x00\x00'+sid[2:8])[0]
subs = [
- struct.unpack('<I',sid[8+4*i:12+4*i])[0]
+ struct.unpack('<I', sid[8+4*i:12+4*i])[0]
for i in range(number_sub_id)
]
- return u'S-%d-%d-%s' % ( sid_rev_num, ia, '-'.join([str(s) for s in subs]),)
+ return u'S-%d-%d-%s' % (sid_rev_num, ia,
+ '-'.join([str(s) for s in subs]),)
def kinit_as_http(self, domain):
"""
@@ -624,7 +677,7 @@ class DomainValidator(object):
error on ccache initialization
"""
- if self._admin_creds == None:
+ if self._admin_creds is None:
return (None, None)
domain_suffix = domain.replace('.', '-')
@@ -691,7 +744,8 @@ class DomainValidator(object):
ccache_name = None
if self._admin_creds:
- (ccache_name, principal) = self.kinit_as_administrator(info['dns_domain'])
+ (ccache_name,
+ principal) = self.kinit_as_administrator(info['dns_domain'])
if ccache_name:
with ipautil.private_ccache(path=ccache_name):
@@ -736,7 +790,7 @@ class DomainValidator(object):
if not self._creds:
self._parm = param.LoadParm()
- self._parm.load(os.path.join(ipautil.SHARE_DIR,"smb.conf.empty"))
+ self._parm.load(os.path.join(ipautil.SHARE_DIR, "smb.conf.empty"))
self._parm.set('netbios name', self.flatname)
self._creds = credentials.Credentials()
self._creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
@@ -746,12 +800,14 @@ class DomainValidator(object):
netrc = net.Net(creds=self._creds, lp=self._parm)
finddc_error = None
result = None
+ flags = nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC | nbt.NBT_SERVER_CLOSEST
try:
- result = netrc.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC | nbt.NBT_SERVER_CLOSEST)
+ result = netrc.finddc(domain=domain, flags=flags)
except RuntimeError as e:
try:
# If search of closest GC failed, attempt to find any one
- result = netrc.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC)
+ flags = nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC
+ result = netrc.finddc(domain=domain, flags=flags)
except RuntimeError as e:
finddc_error = e
@@ -789,6 +845,7 @@ class DomainValidator(object):
self._info[domain] = info
return info
+
def string_to_array(what):
return [ord(v) for v in what]
@@ -797,7 +854,7 @@ class TrustDomainInstance(object):
def __init__(self, hostname, creds=None):
self.parm = param.LoadParm()
- self.parm.load(os.path.join(ipautil.SHARE_DIR,"smb.conf.empty"))
+ self.parm.load(os.path.join(ipautil.SHARE_DIR, "smb.conf.empty"))
if len(hostname) > 0:
self.parm.set('netbios name', hostname)
self.creds = creds
@@ -810,14 +867,14 @@ class TrustDomainInstance(object):
self.validation_attempts = 0
def __gen_lsa_connection(self, binding):
- if self.creds is None:
- raise errors.RequirementError(name=_('CIFS credentials object'))
- try:
- result = lsa.lsarpc(binding, self.parm, self.creds)
- return result
- except RuntimeError as e:
- num, message = e.args # pylint: disable=unpacking-non-sequence
- raise assess_dcerpc_exception(num=num, message=message)
+ if self.creds is None:
+ raise errors.RequirementError(name=_('CIFS credentials object'))
+ try:
+ result = lsa.lsarpc(binding, self.parm, self.creds)
+ return result
+ except RuntimeError as e:
+ num, message = e.args # pylint: disable=unpacking-non-sequence
+ raise assess_dcerpc_exception(num=num, message=message)
def init_lsa_pipe(self, remote_host):
"""
@@ -847,30 +904,35 @@ class TrustDomainInstance(object):
# When session key is not available, we just skip this binding
session_attempts = session_attempts + 1
- if self._pipe is None and (attempts + session_attempts) == len(bindings):
+ if self._pipe is None and \
+ (attempts + session_attempts) == len(bindings):
raise errors.ACIError(
- info=_('CIFS server %(host)s denied your credentials') % dict(host=remote_host))
+ info=_('CIFS server %(host)s denied your credentials')
+ % dict(host=remote_host))
if self._pipe is None:
raise errors.RemoteRetrieveError(
- reason=_('Cannot establish LSA connection to %(host)s. Is CIFS server running?') % dict(host=remote_host))
+ reason=_('Cannot establish LSA connection to %(host)s. '
+ 'Is CIFS server running?') % dict(host=remote_host))
self.binding = binding
self.session_key = self._pipe.session_key
def __gen_lsa_bindings(self, remote_host):
"""
- There are multiple transports to issue LSA calls. However, depending on a
- system in use they may be blocked by local operating system policies.
+ There are multiple transports to issue LSA calls. However, depending on
+ a system in use they may be blocked by local operating system policies.
Generate all we can use. init_lsa_pipe() will try them one by one until
there is one working.
- We try NCACN_NP before NCACN_IP_TCP and use SMB2 before SMB1 or defaults.
+ We try NCACN_NP before NCACN_IP_TCP and use SMB2 before SMB1.
"""
transports = (u'ncacn_np', u'ncacn_ip_tcp')
- options = ( u'smb2,print', u'print')
- return [u'%s:%s[%s]' % (t, remote_host, o) for t in transports for o in options]
+ options = (u'smb2,print', u'print')
+ return [u'%s:%s[%s]' % (t, remote_host, o)
+ for t in transports for o in options]
- def retrieve_anonymously(self, remote_host, discover_srv=False, search_pdc=False):
+ def retrieve_anonymously(self, remote_host,
+ discover_srv=False, search_pdc=False):
"""
When retrieving DC information anonymously, we can't get SID of the domain
"""
@@ -896,7 +958,8 @@ class TrustDomainInstance(object):
self.info['is_pdc'] = (result.server_type & nbt.NBT_SERVER_PDC) != 0
# Netlogon response doesn't contain SID of the domain.
- # We need to do rootDSE search with LDAP_SERVER_EXTENDED_DN_OID control to reveal the SID
+ # We need to do rootDSE search with LDAP_SERVER_EXTENDED_DN_OID
+ # control to reveal the SID
ldap_uri = 'ldap://%s' % (result.pdc_dns_name)
conn = _ldap.initialize(ldap_uri)
conn.set_option(_ldap.OPT_SERVER_CONTROLS, [ExtendedDNControl()])
@@ -908,7 +971,7 @@ class TrustDomainInstance(object):
except _ldap.LDAPError as e:
root_logger.error(
"LDAP error when connecting to %(host)s: %(error)s" %
- dict(host=unicode(result.pdc_name), error=str(e)))
+ dict(host=unicode(result.pdc_name), error=str(e)))
except KeyError as e:
root_logger.error("KeyError: {err}, LDAP entry from {host} "
"returned malformed. Your DNS might be "
@@ -930,8 +993,11 @@ class TrustDomainInstance(object):
objectAttribute = lsa.ObjectAttribute()
objectAttribute.sec_qos = lsa.QosInfo()
try:
- self._policy_handle = self._pipe.OpenPolicy2(u"", objectAttribute, security.SEC_FLAG_MAXIMUM_ALLOWED)
- result = self._pipe.QueryInfoPolicy2(self._policy_handle, lsa.LSA_POLICY_INFO_DNS)
+ self._policy_handle = \
+ self._pipe.OpenPolicy2(u"", objectAttribute,
+ security.SEC_FLAG_MAXIMUM_ALLOWED)
+ result = self._pipe.QueryInfoPolicy2(self._policy_handle,
+ lsa.LSA_POLICY_INFO_DNS)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
@@ -944,7 +1010,8 @@ class TrustDomainInstance(object):
self.info['dc'] = remote_host
try:
- result = self._pipe.QueryInfoPolicy2(self._policy_handle, lsa.LSA_POLICY_INFO_ROLE)
+ result = self._pipe.QueryInfoPolicy2(self._policy_handle,
+ lsa.LSA_POLICY_INFO_ROLE)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
@@ -958,18 +1025,18 @@ class TrustDomainInstance(object):
clear_value.size = len(password_blob)
clear_value.password = password_blob
- clear_authentication_information = drsblobs.AuthenticationInformation()
- clear_authentication_information.LastUpdateTime = samba.unix2nttime(int(time.time()))
- clear_authentication_information.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR
- clear_authentication_information.AuthInfo = clear_value
+ clear_authinfo = drsblobs.AuthenticationInformation()
+ clear_authinfo.LastUpdateTime = samba.unix2nttime(int(time.time()))
+ clear_authinfo.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR
+ clear_authinfo.AuthInfo = clear_value
- authentication_information_array = drsblobs.AuthenticationInformationArray()
- authentication_information_array.count = 1
- authentication_information_array.array = [clear_authentication_information]
+ authinfo_array = drsblobs.AuthenticationInformationArray()
+ authinfo_array.count = 1
+ authinfo_array.array = [clear_authinfo]
outgoing = drsblobs.trustAuthInOutBlob()
outgoing.count = 1
- outgoing.current = authentication_information_array
+ outgoing.current = authinfo_array
confounder = [3]*512
for i in range(512):
@@ -983,7 +1050,8 @@ class TrustDomainInstance(object):
trustpass_blob = ndr_pack(trustpass)
- encrypted_trustpass = arcfour_encrypt(self._pipe.session_key, trustpass_blob)
+ encrypted_trustpass = arcfour_encrypt(self._pipe.session_key,
+ trustpass_blob)
auth_blob = lsa.DATA_BUF2()
auth_blob.size = len(encrypted_trustpass)
@@ -993,7 +1061,6 @@ class TrustDomainInstance(object):
auth_info.auth_blob = auth_blob
self.auth_info = auth_info
-
def generate_ftinfo(self, another_domain):
"""
Generates TrustDomainInfoFullInfo2Internal structure
@@ -1032,27 +1099,38 @@ class TrustDomainInstance(object):
# smbd already has the information about itself
ldname = lsa.StringLarge()
ldname.string = another_domain.info['dns_domain']
- collision_info = self._pipe.lsaRSetForestTrustInformation(self._policy_handle,
- ldname,
- lsa.LSA_FOREST_TRUST_DOMAIN_INFO,
- ftinfo, 0)
- if collision_info:
- root_logger.error("When setting forest trust information, got collision info back:\n%s" % (ndr_print(collision_info)))
+ ftlevel = lsa.LSA_FOREST_TRUST_DOMAIN_INFO
+ # RSetForestTrustInformation returns collision information
+ # for trust topology
+ cinfo = self._pipe.lsaRSetForestTrustInformation(
+ self._policy_handle,
+ ldname,
+ ftlevel,
+ ftinfo, 0)
+ if cinfo:
+ root_logger.error("When setting forest trust information, "
+ "got collision info back:\n%s"
+ % (ndr_print(cinfo)))
except RuntimeError as e:
- # We can ignore the error here -- setting up name suffix routes may fail
+ # We can ignore the error here --
+ # setting up name suffix routes may fail
pass
- def establish_trust(self, another_domain, trustdom_secret, trust_type='bidirectional', trust_external=False):
+ def establish_trust(self, another_domain, trustdom_secret,
+ trust_type='bidirectional', trust_external=False):
"""
Establishes trust between our and another domain
- Input: another_domain -- instance of TrustDomainInstance, initialized with #retrieve call
+ Input: another_domain -- instance of TrustDomainInstance,
+ initialized with #retrieve call
trustdom_secret -- shared secred used for the trust
"""
if self.info['name'] == another_domain.info['name']:
# Check that NetBIOS names do not clash
raise errors.ValidationError(name=u'AD Trust Setup',
- error=_('the IPA server and the remote domain cannot share the same '
- 'NetBIOS name: %s') % self.info['name'])
+ error=_('the IPA server and the '
+ 'remote domain cannot share '
+ 'the same NetBIOS name: %s')
+ % self.info['name'])
self.generate_auth(trustdom_secret)
@@ -1071,8 +1149,12 @@ class TrustDomainInstance(object):
try:
dname = lsa.String()
dname.string = another_domain.info['dns_domain']
- res = self._pipe.QueryTrustedDomainInfoByName(self._policy_handle, dname, lsa.LSA_TRUSTED_DOMAIN_INFO_FULL_INFO)
- self._pipe.DeleteTrustedDomain(self._policy_handle, res.info_ex.sid)
+ res = self._pipe.QueryTrustedDomainInfoByName(
+ self._policy_handle,
+ dname,
+ lsa.LSA_TRUSTED_DOMAIN_INFO_FULL_INFO)
+ self._pipe.DeleteTrustedDomain(self._policy_handle,
+ res.info_ex.sid)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
# Ignore anything but access denied (NT_STATUS_ACCESS_DENIED)
@@ -1080,7 +1162,10 @@ class TrustDomainInstance(object):
raise access_denied_error
try:
- trustdom_handle = self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
+ trustdom_handle = self._pipe.CreateTrustedDomainEx2(
+ self._policy_handle,
+ info, self.auth_info,
+ security.SEC_STD_DELETE)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
@@ -1089,13 +1174,19 @@ class TrustDomainInstance(object):
# trust settings. Samba insists this has to be done with LSA
# OpenTrustedDomain* calls, it is not enough to have a handle
# returned by the CreateTrustedDomainEx2 call.
- trustdom_handle = self._pipe.OpenTrustedDomainByName(self._policy_handle, dname, security.SEC_FLAG_MAXIMUM_ALLOWED)
+ trustdom_handle = self._pipe.OpenTrustedDomainByName(
+ self._policy_handle,
+ dname,
+ security.SEC_FLAG_MAXIMUM_ALLOWED)
try:
- infoclass = lsa.TrustDomainInfoSupportedEncTypes()
- infoclass.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5
- infoclass.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
- infoclass.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
- self._pipe.SetInformationTrustedDomain(trustdom_handle, lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES, infoclass)
+ infocls = lsa.TrustDomainInfoSupportedEncTypes()
+ infocls.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5
+ infocls.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
+ infocls.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
+ self._pipe.SetInformationTrustedDomain(
+ trustdom_handle,
+ lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES,
+ infocls)
except RuntimeError as e:
# We can ignore the error here -- changing enctypes is for
# improved security but the trust will work with default values as
@@ -1105,13 +1196,16 @@ class TrustDomainInstance(object):
if not trust_external:
try:
- info = self._pipe.QueryTrustedDomainInfo(trustdom_handle,
- lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX)
+ info = self._pipe.QueryTrustedDomainInfo(
+ trustdom_handle,
+ lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX)
info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE
- self._pipe.SetInformationTrustedDomain(trustdom_handle,
- lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info)
+ self._pipe.SetInformationTrustedDomain(
+ trustdom_handle,
+ lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info)
except RuntimeError as e:
- root_logger.error('unable to set trust transitivity status: %s' % (str(e)))
+ root_logger.error(
+ 'unable to set trust transitivity status: %s' % (str(e)))
if self.info['is_pdc'] or trust_external:
self.update_ftinfo(another_domain)
@@ -1119,12 +1213,13 @@ class TrustDomainInstance(object):
def verify_trust(self, another_domain):
def retrieve_netlogon_info_2(logon_server, domain, function_code, data):
try:
- netr_pipe = netlogon.netlogon(domain.binding, domain.parm, domain.creds)
- result = netr_pipe.netr_LogonControl2Ex(logon_server=logon_server,
+ netr_pipe = netlogon.netlogon(domain.binding,
+ domain.parm, domain.creds)
+ result = netr_pipe.netr_LogonControl2Ex(
+ logon_server=logon_server,
function_code=function_code,
level=2,
- data=data
- )
+ data=data)
return result
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
@@ -1135,9 +1230,11 @@ class TrustDomainInstance(object):
another_domain.info['dns_domain'])
if result and result.flags and netlogon.NETLOGON_VERIFY_STATUS_RETURNED:
- if result.pdc_connection_status[0] != 0 and result.tc_connection_status[0] != 0:
+ if result.pdc_connection_status[0] != 0 and \
+ result.tc_connection_status[0] != 0:
if result.pdc_connection_status[1] == "WERR_ACCESS_DENIED":
- # Most likely AD DC hit another IPA replica which yet has no trust secret replicated
+ # Most likely AD DC hit another IPA replica which
+ # yet has no trust secret replicated
# Sleep and repeat again
self.validation_attempts += 1
@@ -1176,23 +1273,23 @@ class TrustDomainInstance(object):
def fetch_domains(api, mydomain, trustdomain, creds=None, server=None):
trust_flags = dict(
- NETR_TRUST_FLAG_IN_FOREST = 0x00000001,
- NETR_TRUST_FLAG_OUTBOUND = 0x00000002,
- NETR_TRUST_FLAG_TREEROOT = 0x00000004,
- NETR_TRUST_FLAG_PRIMARY = 0x00000008,
- NETR_TRUST_FLAG_NATIVE = 0x00000010,
- NETR_TRUST_FLAG_INBOUND = 0x00000020,
- NETR_TRUST_FLAG_MIT_KRB5 = 0x00000080,
- NETR_TRUST_FLAG_AES = 0x00000100)
+ NETR_TRUST_FLAG_IN_FOREST=0x00000001,
+ NETR_TRUST_FLAG_OUTBOUND=0x00000002,
+ NETR_TRUST_FLAG_TREEROOT=0x00000004,
+ NETR_TRUST_FLAG_PRIMARY=0x00000008,
+ NETR_TRUST_FLAG_NATIVE=0x00000010,
+ NETR_TRUST_FLAG_INBOUND=0x00000020,
+ NETR_TRUST_FLAG_MIT_KRB5=0x00000080,
+ NETR_TRUST_FLAG_AES=0x00000100)
trust_attributes = dict(
- NETR_TRUST_ATTRIBUTE_NON_TRANSITIVE = 0x00000001,
- NETR_TRUST_ATTRIBUTE_UPLEVEL_ONLY = 0x00000002,
- NETR_TRUST_ATTRIBUTE_QUARANTINED_DOMAIN = 0x00000004,
- NETR_TRUST_ATTRIBUTE_FOREST_TRANSITIVE = 0x00000008,
- NETR_TRUST_ATTRIBUTE_CROSS_ORGANIZATION = 0x00000010,
- NETR_TRUST_ATTRIBUTE_WITHIN_FOREST = 0x00000020,
- NETR_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL = 0x00000040)
+ NETR_TRUST_ATTRIBUTE_NON_TRANSITIVE=0x00000001,
+ NETR_TRUST_ATTRIBUTE_UPLEVEL_ONLY=0x00000002,
+ NETR_TRUST_ATTRIBUTE_QUARANTINED_DOMAIN=0x00000004,
+ NETR_TRUST_ATTRIBUTE_FOREST_TRANSITIVE=0x00000008,
+ NETR_TRUST_ATTRIBUTE_CROSS_ORGANIZATION=0x00000010,
+ NETR_TRUST_ATTRIBUTE_WITHIN_FOREST=0x00000020,
+ NETR_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL=0x00000040)
def communicate(td):
td.init_lsa_pipe(td.info['dc'])
@@ -1302,7 +1399,7 @@ class TrustDomainJoins(object):
def allow_behavior(self, *flags):
for f in flags:
- self.__allow_behavior |= int(f)
+ self.__allow_behavior |= int(f)
def __populate_local_domain(self):
# Initialize local domain info using kerberos only
@@ -1314,7 +1411,8 @@ class TrustDomainJoins(object):
ld.retrieve(installutils.get_fqdn())
self.local_domain = ld
- def populate_remote_domain(self, realm, realm_server=None, realm_admin=None, realm_passwd=None):
+ def populate_remote_domain(self, realm, realm_server=None,
+ realm_admin=None, realm_passwd=None):
def get_instance(self):
# Fetch data from foreign domain using password only
rd = TrustDomainInstance('')
@@ -1330,7 +1428,8 @@ class TrustDomainJoins(object):
if realm_server is None:
rd.retrieve_anonymously(realm, discover_srv=True, search_pdc=True)
else:
- rd.retrieve_anonymously(realm_server, discover_srv=False, search_pdc=True)
+ rd.retrieve_anonymously(realm_server,
+ discover_srv=False, search_pdc=True)
rd.read_only = True
if realm_admin and realm_passwd:
if 'name' in rd.info:
@@ -1339,12 +1438,14 @@ class TrustDomainJoins(object):
# realm admin is in DOMAIN\user format
# strip DOMAIN part as we'll enforce the one discovered
realm_admin = names[-1]
- auth_string = u"%s\%s%%%s" % (rd.info['name'], realm_admin, realm_passwd)
+ auth_string = u"%s\%s%%%s" \
+ % (rd.info['name'], realm_admin, realm_passwd)
td = get_instance(self)
td.creds.parse_string(auth_string)
td.creds.set_workstation(self.local_domain.hostname)
if realm_server is None:
- # we must have rd.info['dns_hostname'] then, part of anonymous discovery
+ # we must have rd.info['dns_hostname'] then
+ # as it is part of the anonymous discovery
td.retrieve(rd.info['dns_hostname'])
else:
td.retrieve(realm_server)
@@ -1358,8 +1459,8 @@ class TrustDomainJoins(object):
"""
Generate list of records for forest trust information about
our realm domains. Note that the list generated currently
- includes only top level domains, no exclusion domains, and no TDO objects
- as we handle the latter in a separate way
+ includes only top level domains, no exclusion domains, and
+ no TDO objects as we handle the latter in a separate way
"""
if self.local_domain.read_only:
return
@@ -1367,10 +1468,15 @@ class TrustDomainJoins(object):
self.local_domain.ftinfo_records = []
realm_domains = self.api.Command.realmdomains_show()['result']
- # Use realmdomains' modification timestamp to judge records last update time
- entry = self.api.Backend.ldap2.get_entry(realm_domains['dn'], ['modifyTimestamp'])
+ # Use realmdomains' modification timestamp
+ # to judge records' last update time
+ entry = self.api.Backend.ldap2.get_entry(
+ realm_domains['dn'], ['modifyTimestamp'])
# Convert the timestamp to Windows 64-bit timestamp format
- trust_timestamp = long(time.mktime(entry['modifytimestamp'][0].timetuple())*1e7+116444736000000000)
+ trust_timestamp = long(
+ time.mktime(
+ entry.single_value.get('modifytimestamp').timetuple()
+ )*1e7+116444736000000000)
for dom in realm_domains['associateddomain']:
ftinfo = dict()
@@ -1379,7 +1485,8 @@ class TrustDomainJoins(object):
ftinfo['rec_type'] = lsa.LSA_FOREST_TRUST_TOP_LEVEL_NAME
self.local_domain.ftinfo_records.append(ftinfo)
- def join_ad_full_credentials(self, realm, realm_server, realm_admin, realm_passwd, trust_type):
+ def join_ad_full_credentials(self, realm, realm_server, realm_admin,
+ realm_passwd, trust_type):
if not self.configured:
return None
@@ -1392,24 +1499,33 @@ class TrustDomainJoins(object):
)
trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL)
- if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']:
+ if self.remote_domain.info['dns_domain'] != \
+ self.remote_domain.info['dns_forest']:
if not trust_external:
- raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'],
- domain=self.remote_domain.info['dns_domain'])
+ raise errors.NotAForestRootError(
+ forest=self.remote_domain.info['dns_forest'],
+ domain=self.remote_domain.info['dns_domain'])
if not self.remote_domain.read_only:
trustdom_pass = samba.generate_random_password(128, 128)
self.get_realmdomains()
self.remote_domain.establish_trust(self.local_domain,
- trustdom_pass, trust_type, trust_external)
+ trustdom_pass,
+ trust_type, trust_external)
self.local_domain.establish_trust(self.remote_domain,
- trustdom_pass, trust_type, trust_external)
- # if trust is inbound, we don't need to verify it because AD DC will respond
- # with WERR_NO_SUCH_DOMAIN -- in only does verification for outbound trusts.
+ trustdom_pass,
+ trust_type, trust_external)
+ # if trust is inbound, we don't need to verify it because
+ # AD DC will respond with WERR_NO_SUCH_DOMAIN --
+ # it only does verification for outbound trusts.
result = True
if trust_type == TRUST_BIDIRECTIONAL:
result = self.remote_domain.verify_trust(self.local_domain)
- return dict(local=self.local_domain, remote=self.remote_domain, verified=result)
+ return dict(
+ local=self.local_domain,
+ remote=self.remote_domain,
+ verified=result
+ )
return None
def join_ad_ipa_half(self, realm, realm_server, trustdom_passwd, trust_type):
@@ -1420,11 +1536,18 @@ class TrustDomainJoins(object):
self.populate_remote_domain(realm, realm_server, realm_passwd=None)
trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL)
- if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']:
+ if self.remote_domain.info['dns_domain'] != \
+ self.remote_domain.info['dns_forest']:
if not trust_external:
- raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'],
- domain=self.remote_domain.info['dns_domain'])
+ raise errors.NotAForestRootError(
+ forest=self.remote_domain.info['dns_forest'],
+ domain=self.remote_domain.info['dns_domain'])
self.local_domain.establish_trust(self.remote_domain,
- trustdom_passwd, trust_type, trust_external)
- return dict(local=self.local_domain, remote=self.remote_domain, verified=False)
+ trustdom_passwd,
+ trust_type, trust_external)
+ return dict(
+ local=self.local_domain,
+ remote=self.remote_domain,
+ verified=False
+ )
--
2.7.4
More information about the Freeipa-devel
mailing list