From d50c3cc9442fc706407be4a5bf57d387066be916 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Wed, 10 Jul 2024 21:51:30 +0200 Subject: [PATCH] get_certificate: add get_certificate_chain option (#784) * Implement get_certificate_chain option. * Implement basic tests. * Add compatibility for current Python 3.13 pre-releases. --- plugins/modules/get_certificate.py | 90 ++++++++++++++++++- .../targets/get_certificate/tasks/main.yml | 2 + .../get_certificate/tests/validate.yml | 25 ++++++ 3 files changed, 114 insertions(+), 3 deletions(-) diff --git a/plugins/modules/get_certificate.py b/plugins/modules/get_certificate.py index 7a9c7736c..7de6e114e 100644 --- a/plugins/modules/get_certificate.py +++ b/plugins/modules/get_certificate.py @@ -107,12 +107,23 @@ type: list elements: raw version_added: 2.21.0 + get_certificate_chain: + description: + - If set to V(true), will obtain the certificate chain next to the certificate itself. + - The chain as returned by the server can be found in RV(unverified_chain), and the chain that passed validation + in RV(verified_chain). + - B(Note) that this needs B(Python 3.10 or newer). Also note that only Python 3.13 or newer officially supports this. + The module uses internal APIs of Python 3.10, 3.11, and 3.12 to achieve the same. It can be that future versions of + Python 3.10, 3.11, or 3.12 break this. + type: bool + default: false + version_added: 2.21.0 notes: - When using ca_cert on OS X it has been reported that in some conditions the validate will always succeed. requirements: - - "Python >= 2.7 when using O(proxy_host)" + - "Python >= 2.7 when using O(proxy_host), and Python >= 3.10 when O(get_certificate_chain=true)" - "cryptography >= 1.6" seealso: @@ -189,6 +200,26 @@ description: The version number of the certificate. returned: success type: str +verified_chain: + description: + - The verified certificate chain retrieved from the port. + - The first entry is always RV(cert). + - The last certificate the root certificate the chain is traced to. If O(ca_cert) is provided this certificate is part of that store; + otherwise it is part of the store used by default by Python. + - Note that RV(unverified_chain) generally does not contain the root certificate, and might contain other certificates that are not part + of the validated chain. + returned: success and O(get_certificate_chain=true) + type: list + elements: str + version_added: 2.21.0 +unverified_chain: + description: + - The certificate chain retrieved from the port. + - The first entry is always RV(cert). + returned: success and O(get_certificate_chain=true) + type: list + elements: str + version_added: 2.21.0 ''' EXAMPLES = ''' @@ -240,6 +271,7 @@ import base64 import traceback import ssl +import sys from os.path import isfile from socket import create_connection, setdefaulttimeout, socket @@ -317,6 +349,7 @@ def main(): ciphers=dict(type='list', elements='str'), asn1_base64=dict(type='bool'), tls_ctx_options=dict(type='list', elements='raw'), + get_certificate_chain=dict(type='bool', default=False), ), ) @@ -330,7 +363,9 @@ def main(): start_tls_server_type = module.params.get('starttls') ciphers = module.params.get('ciphers') asn1_base64 = module.params['asn1_base64'] - tls_ctx_options = module.params.get('tls_ctx_options') + tls_ctx_options = module.params['tls_ctx_options'] + get_certificate_chain = module.params['get_certificate_chain'] + if asn1_base64 is None: module.deprecate( 'The default value `false` for asn1_base64 is deprecated and will change to `true` in ' @@ -341,6 +376,12 @@ def main(): ) asn1_base64 = False + if get_certificate_chain and sys.version_info < (3, 10): + module.fail_json( + msg='get_certificate_chain=true can only be used with Python 3.10 (Python 3.13+ officially supports this). ' + 'The Python version used to run the get_certificate module is %s' % sys.version + ) + backend = module.params.get('select_crypto_backend') if backend == 'auto': # Detection what is possible @@ -371,6 +412,9 @@ def main(): if not isfile(ca_cert): module.fail_json(msg="ca_cert file does not exist") + verified_chain = None + unverified_chain = None + if not HAS_CREATE_DEFAULT_CONTEXT: # Python < 2.7.9 if proxy_host: @@ -450,8 +494,43 @@ def main(): except Exception as e: module.fail_json(msg="Failed to add {0} to CTX options".format(tls_ctx_option_str or tls_ctx_option_int)) - cert = ctx.wrap_socket(sock, server_hostname=server_name or host).getpeercert(True) + tls_sock = ctx.wrap_socket(sock, server_hostname=server_name or host) + cert = tls_sock.getpeercert(True) cert = DER_cert_to_PEM_cert(cert) + + if get_certificate_chain: + if sys.version_info < (3, 13): + # The official way to access this has been added in https://github.com/python/cpython/pull/109113/files. + # We're basically doing the same for older Python versions. The internal API needed for this was added + # in https://github.com/python/cpython/commit/666991fc598bc312d72aff0078ecb553f0a968f1, which was first + # released in Python 3.10.0. + def _convert_chain(chain): + if not chain: + return [] + return [c.public_bytes(ssl._ssl.ENCODING_DER) for c in chain] + + ssl_obj = tls_sock._sslobj # This is of type ssl._ssl._SSLSocket + verified_der_chain = _convert_chain(ssl_obj.get_verified_chain()) + unverified_der_chain = _convert_chain(ssl_obj.get_unverified_chain()) + else: + # This works with Python 3.13+ + + # Unfortunately due to a bug (https://github.com/python/cpython/issues/118658) some early pre-releases of + # Python 3.13 do not return lists of byte strings, but lists of _ssl.Certificate objects. This is going to + # be fixed by https://github.com/python/cpython/pull/118669. For now we convert the certificates ourselves + # if they are not byte strings to work around this. + def _convert_chain(chain): + return [ + c if isinstance(c, bytes) else c.public_bytes(ssl._ssl.ENCODING_DER) + for c in chain + ] + + verified_der_chain = _convert_chain(tls_sock.get_verified_chain()) + unverified_der_chain = _convert_chain(tls_sock.get_unverified_chain()) + + verified_chain = [DER_cert_to_PEM_cert(c) for c in verified_der_chain] + unverified_chain = [DER_cert_to_PEM_cert(c) for c in unverified_der_chain] + except Exception as e: if proxy_host: module.fail_json(msg="Failed to get cert via proxy {0}:{1} from {2}:{3}, error: {4}".format( @@ -499,6 +578,11 @@ def main(): else: result['version'] = "unknown" + if verified_chain is not None: + result['verified_chain'] = verified_chain + if unverified_chain is not None: + result['unverified_chain'] = unverified_chain + module.exit_json(**result) diff --git a/tests/integration/targets/get_certificate/tasks/main.yml b/tests/integration/targets/get_certificate/tasks/main.yml index cd5b93979..c7826b505 100644 --- a/tests/integration/targets/get_certificate/tasks/main.yml +++ b/tests/integration/targets/get_certificate/tasks/main.yml @@ -10,6 +10,8 @@ - set_fact: skip_tests: false + has_get_certificate_chain: >- + {{ ansible_facts.python_version is version('3.10.0', '>=') }} - block: diff --git a/tests/integration/targets/get_certificate/tests/validate.yml b/tests/integration/targets/get_certificate/tests/validate.yml index 29ca26873..8835f0105 100644 --- a/tests/integration/targets/get_certificate/tests/validate.yml +++ b/tests/integration/targets/get_certificate/tests/validate.yml @@ -123,6 +123,7 @@ port: 443 select_crypto_backend: "{{ select_crypto_backend }}" asn1_base64: true + get_certificate_chain: "{{ has_get_certificate_chain }}" register: result - assert: @@ -130,6 +131,30 @@ - result is not changed - result is not failed +- name: Read CA cert + slurp: + src: '{{ remote_tmp_dir }}/temp.pem' + register: cacert + when: has_get_certificate_chain + +- name: Validate get_certificate_chain=true results + assert: + that: + - result.verified_chain is sequence + - result.unverified_chain is sequence + - result.verified_chain[0] == result.cert + - result.unverified_chain[0] == result.cert + - result.verified_chain[-1] == cacert.content | b64decode + - result.verified_chain == result.unverified_chain + [cacert.content | b64decode] + when: has_get_certificate_chain + +- name: Validate get_certificate_chain=false results + assert: + that: + - result.verified_chain is undefined + - result.unverified_chain is undefined + when: not has_get_certificate_chain + - name: Generate bogus CA privatekey openssl_privatekey: path: '{{ remote_tmp_dir }}/bogus_ca.key'