Skip to content

Commit

Permalink
Improve handling of IDNA/Unicode domains (#436)
Browse files Browse the repository at this point in the history
* Prepare IDNA/Unicode conversion code. Use to normalize input.

* Use IDNA library first (IDNA2008) and Python's IDNA2003 implementation as a fallback.

* Make sure idna is installed.

* Add changelog fragment.

* 'punycode' → 'idna'.

* Add name_encoding options and tests.

* Avoid invalid character for IDNA2008.

* Linting.

* Forgot to upate value.

* Work around cryptography bug. Fix port handling for URIs.

* Forgot other place sensitive to cryptography bug.

* Forgot one. (Will likely still fail.)

* Decode IDNA in _compress_entry() to avoid comparison screw-ups.

* Work around Python 3.5 problem in Ansible 2.9's default test container.

* Update changelog fragment.

* Fix error, add tests.

* Python 2 compatibility.

* Update requirements.
  • Loading branch information
felixfontein authored May 9, 2022
1 parent 90efcc1 commit 4cf9515
Show file tree
Hide file tree
Showing 20 changed files with 479 additions and 31 deletions.
12 changes: 12 additions & 0 deletions changelogs/fragments/436-idns.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
minor_changes:
- "Support automatic conversion for Internalionalized Domain Names (IDNs).
When passing general names, for example Subject Altenative Names to ``community.crypto.openssl_csr``, these will automatically be converted to IDNA.
Conversion will be done per label to IDNA2008 if possible, and IDNA2003 if IDNA2008 conversion fails for that label.
Note that IDNA conversion requires `the Python idna library <https://pypi.org/project/idna/>`_ to be installed.
Please note that depending on which versions of the cryptography library are used, it could try to process the converted IDNA
another time with the Python ``idna`` library and reject IDNA2003 encoded values. Using a new enough ``cryptography`` version avoids this
(https:/ansible-collections/community.crypto/issues/426, https:/ansible-collections/community.crypto/pull/436)."
- "openssl_csr_info - add ``name_encoding`` option to control the encoding (IDNA, Unicode) used to return domain names in general names (https:/ansible-collections/community.crypto/pull/436)."
- "x509_certificate_info - add ``name_encoding`` option to control the encoding (IDNA, Unicode) used to return domain names in general names (https:/ansible-collections/community.crypto/pull/436)."
- "x509_crl - add ``name_encoding`` option to control the encoding (IDNA, Unicode) used to return domain names in general names (https:/ansible-collections/community.crypto/pull/436)."
- "x509_crl_info - add ``name_encoding`` option to control the encoding (IDNA, Unicode) used to return domain names in general names (https:/ansible-collections/community.crypto/pull/436)."
30 changes: 30 additions & 0 deletions plugins/doc_fragments/name_encoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-

# Copyright: (c) 2022, Felix Fontein <[email protected]>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function
__metaclass__ = type


class ModuleDocFragment(object):
DOCUMENTATION = r'''
options:
name_encoding:
description:
- How to encode names (DNS names, URIs, email addresses) in return values.
- C(ignore) will use the encoding returned by the backend.
- C(idna) will convert all labels of domain names to IDNA encoding.
IDNA2008 will be preferred, and IDNA2003 will be used if IDNA2008 encoding fails.
- C(unicode) will convert all labels of domain names to Unicode.
IDNA2008 will be preferred, and IDNA2003 will be used if IDNA2008 decoding fails.
- B(Note) that C(idna) and C(unicode) require the L(idna Python library,https://pypi.org/project/idna/) to be installed.
type: str
default: ignore
choices:
- ignore
- idna
- unicode
requirements:
- If I(name_encoding) is set to another value than C(ignore), the L(idna Python library,https://pypi.org/project/idna/) needs to be installed.
'''
4 changes: 2 additions & 2 deletions plugins/module_utils/crypto/cryptography_crl.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ def cryptography_decode_revoked_certificate(cert):
return result


def cryptography_dump_revoked(entry):
def cryptography_dump_revoked(entry, idn_rewrite='ignore'):
return {
'serial_number': entry['serial_number'],
'revocation_date': entry['revocation_date'].strftime(TIMESTAMP_FORMAT),
'issuer':
[cryptography_decode_name(issuer) for issuer in entry['issuer']]
[cryptography_decode_name(issuer, idn_rewrite=idn_rewrite) for issuer in entry['issuer']]
if entry['issuer'] is not None else None,
'issuer_critical': entry['issuer_critical'],
'reason': REVOCATION_REASON_MAP_INVERSE.get(entry['reason']) if entry['reason'] is not None else None,
Expand Down
103 changes: 96 additions & 7 deletions plugins/module_utils/crypto/cryptography_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import binascii
import re
import sys
import traceback

from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.module_utils.six.moves.urllib.parse import urlparse, urlunparse, ParseResult

from ._asn1 import serialize_asn1_string_as_der

from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
Expand Down Expand Up @@ -80,6 +83,16 @@
# Error handled in the calling module.
_load_pkcs12 = None

try:
import idna

HAS_IDNA = True
except ImportError:
HAS_IDNA = False
IDNA_IMP_ERROR = traceback.format_exc()

from ansible.module_utils.basic import missing_required_lib

from .basic import (
CRYPTOGRAPHY_HAS_DSA_SIGN,
CRYPTOGRAPHY_HAS_EC_SIGN,
Expand Down Expand Up @@ -359,23 +372,97 @@ def cryptography_parse_relative_distinguished_name(rdn):
return cryptography.x509.RelativeDistinguishedName(names)


def _is_ascii(value):
'''Check whether the Unicode string `value` contains only ASCII characters.'''
try:
value.encode("ascii")
return True
except UnicodeEncodeError:
return False


def _adjust_idn(value, idn_rewrite):
if idn_rewrite == 'ignore' or not value:
return value
if idn_rewrite == 'idna' and _is_ascii(value):
return value
if idn_rewrite not in ('idna', 'unicode'):
raise ValueError('Invalid value for idn_rewrite: "{0}"'.format(idn_rewrite))
if not HAS_IDNA:
raise OpenSSLObjectError(
missing_required_lib('idna', reason='to transform {what} DNS name "{name}" to {dest}'.format(
name=value,
what='IDNA' if idn_rewrite == 'unicode' else 'Unicode',
dest='Unicode' if idn_rewrite == 'unicode' else 'IDNA',
)))
# Since IDNA does not like '*' or empty labels (except one empty label at the end),
# we split and let IDNA only handle labels that are neither empty or '*'.
parts = value.split(u'.')
for index, part in enumerate(parts):
if part in (u'', u'*'):
continue
try:
if idn_rewrite == 'idna':
parts[index] = idna.encode(part).decode('ascii')
elif idn_rewrite == 'unicode' and part.startswith(u'xn--'):
parts[index] = idna.decode(part)
except idna.IDNAError as exc2008:
try:
if idn_rewrite == 'idna':
parts[index] = part.encode('idna').decode('ascii')
elif idn_rewrite == 'unicode' and part.startswith(u'xn--'):
parts[index] = part.encode('ascii').decode('idna')
except Exception as exc2003:
raise OpenSSLObjectError(
u'Error while transforming part "{part}" of {what} DNS name "{name}" to {dest}.'
u' IDNA2008 transformation resulted in "{exc2008}", IDNA2003 transformation resulted in "{exc2003}".'.format(
part=part,
name=value,
what='IDNA' if idn_rewrite == 'unicode' else 'Unicode',
dest='Unicode' if idn_rewrite == 'unicode' else 'IDNA',
exc2003=exc2003,
exc2008=exc2008,
))
return u'.'.join(parts)


def _adjust_idn_email(value, idn_rewrite):
idx = value.find(u'@')
if idx < 0:
return value
return u'{0}@{1}'.format(value[:idx], _adjust_idn(value[idx + 1:], idn_rewrite))


def _adjust_idn_url(value, idn_rewrite):
url = urlparse(value)
host = _adjust_idn(url.hostname, idn_rewrite)
if url.username is not None and url.password is not None:
host = u'{0}:{1}@{2}'.format(url.username, url.password, host)
elif url.username is not None:
host = u'{0}@{1}'.format(url.username, host)
if url.port is not None:
host = u'{0}:{1}'.format(host, url.port)
return urlunparse(
ParseResult(scheme=url.scheme, netloc=host, path=url.path, params=url.params, query=url.query, fragment=url.fragment))


def cryptography_get_name(name, what='Subject Alternative Name'):
'''
Given a name string, returns a cryptography x509.GeneralName object.
Raises an OpenSSLObjectError if the name is unknown or cannot be parsed.
'''
try:
if name.startswith('DNS:'):
return x509.DNSName(to_text(name[4:]))
return x509.DNSName(_adjust_idn(to_text(name[4:]), 'idna'))
if name.startswith('IP:'):
address = to_text(name[3:])
if '/' in address:
return x509.IPAddress(ipaddress.ip_network(address))
return x509.IPAddress(ipaddress.ip_address(address))
if name.startswith('email:'):
return x509.RFC822Name(to_text(name[6:]))
return x509.RFC822Name(_adjust_idn_email(to_text(name[6:]), 'idna'))
if name.startswith('URI:'):
return x509.UniformResourceIdentifier(to_text(name[4:]))
return x509.UniformResourceIdentifier(_adjust_idn_url(to_text(name[4:]), 'idna'))
if name.startswith('RID:'):
m = re.match(r'^([0-9]+(?:\.[0-9]+)*)$', to_text(name[4:]))
if not m:
Expand Down Expand Up @@ -422,21 +509,23 @@ def _dn_escape_value(value):
return value


def cryptography_decode_name(name):
def cryptography_decode_name(name, idn_rewrite='ignore'):
'''
Given a cryptography x509.GeneralName object, returns a string.
Raises an OpenSSLObjectError if the name is not supported.
'''
if idn_rewrite not in ('ignore', 'idna', 'unicode'):
raise AssertionError('idn_rewrite must be one of "ignore", "idna", or "unicode"')
if isinstance(name, x509.DNSName):
return u'DNS:{0}'.format(name.value)
return u'DNS:{0}'.format(_adjust_idn(name.value, idn_rewrite))
if isinstance(name, x509.IPAddress):
if isinstance(name.value, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
return u'IP:{0}/{1}'.format(name.value.network_address.compressed, name.value.prefixlen)
return u'IP:{0}'.format(name.value.compressed)
if isinstance(name, x509.RFC822Name):
return u'email:{0}'.format(name.value)
return u'email:{0}'.format(_adjust_idn_email(name.value, idn_rewrite))
if isinstance(name, x509.UniformResourceIdentifier):
return u'URI:{0}'.format(name.value)
return u'URI:{0}'.format(_adjust_idn_url(name.value, idn_rewrite))
if isinstance(name, x509.DirectoryName):
# According to https://datatracker.ietf.org/doc/html/rfc4514.html#section-2.1 the
# list needs to be reversed, and joined by commas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
"""Validate the supplied cert, using the cryptography backend"""
def __init__(self, module, content):
super(CertificateInfoRetrievalCryptography, self).__init__(module, 'cryptography', content)
self.name_encoding = module.params.get('name_encoding', 'ignore')

def _get_der_bytes(self):
return self.cert.public_bytes(serialization.Encoding.DER)
Expand Down Expand Up @@ -309,7 +310,7 @@ def _get_ocsp_must_staple(self):
def _get_subject_alt_name(self):
try:
san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
result = [cryptography_decode_name(san) for san in san_ext.value]
result = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in san_ext.value]
return result, san_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
Expand Down Expand Up @@ -341,7 +342,7 @@ def _get_authority_key_identifier(self):
ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
issuer = None
if ext.value.authority_cert_issuer is not None:
issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
issuer = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in ext.value.authority_cert_issuer]
return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
except cryptography.x509.ExtensionNotFound:
return None, None, None
Expand Down
3 changes: 2 additions & 1 deletion plugins/module_utils/crypto/module_backends/crl_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, module, content, list_revoked_certificates=True):
self.module = module
self.content = content
self.list_revoked_certificates = list_revoked_certificates
self.name_encoding = module.params.get('name_encoding', 'ignore')

def get_info(self):
self.crl_pem = identify_pem_format(self.content)
Expand Down Expand Up @@ -86,7 +87,7 @@ def get_info(self):
result['revoked_certificates'] = []
for cert in self.crl:
entry = cryptography_decode_revoked_certificate(cert)
result['revoked_certificates'].append(cryptography_dump_revoked(entry))
result['revoked_certificates'].append(cryptography_dump_revoked(entry, idn_rewrite=self.name_encoding))

return result

Expand Down
9 changes: 5 additions & 4 deletions plugins/module_utils/crypto/module_backends/csr_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
"""Validate the supplied CSR, using the cryptography backend"""
def __init__(self, module, content, validate_signature):
super(CSRInfoRetrievalCryptography, self).__init__(module, 'cryptography', content, validate_signature)
self.name_encoding = module.params.get('name_encoding', 'ignore')

def _get_subject_ordered(self):
result = []
Expand Down Expand Up @@ -256,16 +257,16 @@ def _get_ocsp_must_staple(self):
def _get_subject_alt_name(self):
try:
san_ext = self.csr.extensions.get_extension_for_class(x509.SubjectAlternativeName)
result = [cryptography_decode_name(san) for san in san_ext.value]
result = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in san_ext.value]
return result, san_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False

def _get_name_constraints(self):
try:
nc_ext = self.csr.extensions.get_extension_for_class(x509.NameConstraints)
permitted = [cryptography_decode_name(san) for san in nc_ext.value.permitted_subtrees or []]
excluded = [cryptography_decode_name(san) for san in nc_ext.value.excluded_subtrees or []]
permitted = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in nc_ext.value.permitted_subtrees or []]
excluded = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in nc_ext.value.excluded_subtrees or []]
return permitted, excluded, nc_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, None, False
Expand All @@ -291,7 +292,7 @@ def _get_authority_key_identifier(self):
ext = self.csr.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
issuer = None
if ext.value.authority_cert_issuer is not None:
issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
issuer = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in ext.value.authority_cert_issuer]
return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
except cryptography.x509.ExtensionNotFound:
return None, None, None
Expand Down
10 changes: 9 additions & 1 deletion plugins/modules/openssl_csr_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
default: auto
choices: [ auto, cryptography ]
extends_documentation_fragment:
- community.crypto.name_encoding
seealso:
- module: community.crypto.openssl_csr
- module: community.crypto.openssl_csr_pipe
Expand Down Expand Up @@ -124,7 +127,9 @@
returned: success
type: bool
subject_alt_name:
description: Entries in the C(subject_alt_name) extension, or C(none) if extension is not present.
description:
- Entries in the C(subject_alt_name) extension, or C(none) if extension is not present.
- See I(name_encoding) for how IDNs are handled.
returned: success
type: list
elements: str
Expand Down Expand Up @@ -152,6 +157,7 @@
description:
- List of excluded subtrees the CA cannot sign certificates for.
- Is C(none) if extension is not present.
- See I(name_encoding) for how IDNs are handled.
returned: success
type: list
elements: str
Expand Down Expand Up @@ -281,6 +287,7 @@
description:
- The CSR's authority cert issuer as a list of general names.
- Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
- See I(name_encoding) for how IDNs are handled.
returned: success
type: list
elements: str
Expand Down Expand Up @@ -312,6 +319,7 @@ def main():
argument_spec=dict(
path=dict(type='path'),
content=dict(type='str'),
name_encoding=dict(type='str', default='ignore', choices=['ignore', 'idna', 'unicode']),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography']),
),
required_one_of=(
Expand Down
9 changes: 8 additions & 1 deletion plugins/modules/x509_certificate_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
default: auto
choices: [ auto, cryptography ]
extends_documentation_fragment:
- community.crypto.name_encoding
notes:
- All timestamp values are provided in ASN.1 TIME format, in other words, following the C(YYYYMMDDHHMMSSZ) pattern.
They are all in UTC.
Expand Down Expand Up @@ -168,7 +171,9 @@
returned: success
type: bool
subject_alt_name:
description: Entries in the C(subject_alt_name) extension, or C(none) if extension is not present.
description:
- Entries in the C(subject_alt_name) extension, or C(none) if extension is not present.
- See I(name_encoding) for how IDNs are handled.
returned: success
type: list
elements: str
Expand Down Expand Up @@ -355,6 +360,7 @@
description:
- The certificate's authority cert issuer as a list of general names.
- Is C(none) if the C(AuthorityKeyIdentifier) extension is not present.
- See I(name_encoding) for how IDNs are handled.
returned: success
type: list
elements: str
Expand Down Expand Up @@ -397,6 +403,7 @@ def main():
path=dict(type='path'),
content=dict(type='str'),
valid_at=dict(type='dict'),
name_encoding=dict(type='str', default='ignore', choices=['ignore', 'idna', 'unicode']),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography']),
),
required_one_of=(
Expand Down
Loading

0 comments on commit 4cf9515

Please sign in to comment.