Skip to content

Commit

Permalink
get_certificate: add get_certificate_chain option (#784)
Browse files Browse the repository at this point in the history
* Implement get_certificate_chain option.

* Implement basic tests.

* Add compatibility for current Python 3.13 pre-releases.
  • Loading branch information
felixfontein authored Jul 10, 2024
1 parent 4c26fad commit d50c3cc
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 3 deletions.
90 changes: 87 additions & 3 deletions plugins/modules/get_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,23 @@
type: list
elements: raw
version_added: 2.21.0
get_certificate_chain:
description:
- If set to V(true), will obtain the certificate chain next to the certificate itself.
- The chain as returned by the server can be found in RV(unverified_chain), and the chain that passed validation
in RV(verified_chain).
- B(Note) that this needs B(Python 3.10 or newer). Also note that only Python 3.13 or newer officially supports this.
The module uses internal APIs of Python 3.10, 3.11, and 3.12 to achieve the same. It can be that future versions of
Python 3.10, 3.11, or 3.12 break this.
type: bool
default: false
version_added: 2.21.0
notes:
- When using ca_cert on OS X it has been reported that in some conditions the validate will always succeed.
requirements:
- "Python >= 2.7 when using O(proxy_host)"
- "Python >= 2.7 when using O(proxy_host), and Python >= 3.10 when O(get_certificate_chain=true)"
- "cryptography >= 1.6"
seealso:
Expand Down Expand Up @@ -189,6 +200,26 @@
description: The version number of the certificate.
returned: success
type: str
verified_chain:
description:
- The verified certificate chain retrieved from the port.
- The first entry is always RV(cert).
- The last certificate the root certificate the chain is traced to. If O(ca_cert) is provided this certificate is part of that store;
otherwise it is part of the store used by default by Python.
- Note that RV(unverified_chain) generally does not contain the root certificate, and might contain other certificates that are not part
of the validated chain.
returned: success and O(get_certificate_chain=true)
type: list
elements: str
version_added: 2.21.0
unverified_chain:
description:
- The certificate chain retrieved from the port.
- The first entry is always RV(cert).
returned: success and O(get_certificate_chain=true)
type: list
elements: str
version_added: 2.21.0
'''

EXAMPLES = '''
Expand Down Expand Up @@ -240,6 +271,7 @@
import base64
import traceback
import ssl
import sys

from os.path import isfile
from socket import create_connection, setdefaulttimeout, socket
Expand Down Expand Up @@ -317,6 +349,7 @@ def main():
ciphers=dict(type='list', elements='str'),
asn1_base64=dict(type='bool'),
tls_ctx_options=dict(type='list', elements='raw'),
get_certificate_chain=dict(type='bool', default=False),
),
)

Expand All @@ -330,7 +363,9 @@ def main():
start_tls_server_type = module.params.get('starttls')
ciphers = module.params.get('ciphers')
asn1_base64 = module.params['asn1_base64']
tls_ctx_options = module.params.get('tls_ctx_options')
tls_ctx_options = module.params['tls_ctx_options']
get_certificate_chain = module.params['get_certificate_chain']

if asn1_base64 is None:
module.deprecate(
'The default value `false` for asn1_base64 is deprecated and will change to `true` in '
Expand All @@ -341,6 +376,12 @@ def main():
)
asn1_base64 = False

if get_certificate_chain and sys.version_info < (3, 10):
module.fail_json(
msg='get_certificate_chain=true can only be used with Python 3.10 (Python 3.13+ officially supports this). '
'The Python version used to run the get_certificate module is %s' % sys.version
)

backend = module.params.get('select_crypto_backend')
if backend == 'auto':
# Detection what is possible
Expand Down Expand Up @@ -371,6 +412,9 @@ def main():
if not isfile(ca_cert):
module.fail_json(msg="ca_cert file does not exist")

verified_chain = None
unverified_chain = None

if not HAS_CREATE_DEFAULT_CONTEXT:
# Python < 2.7.9
if proxy_host:
Expand Down Expand Up @@ -450,8 +494,43 @@ def main():
except Exception as e:
module.fail_json(msg="Failed to add {0} to CTX options".format(tls_ctx_option_str or tls_ctx_option_int))

cert = ctx.wrap_socket(sock, server_hostname=server_name or host).getpeercert(True)
tls_sock = ctx.wrap_socket(sock, server_hostname=server_name or host)
cert = tls_sock.getpeercert(True)
cert = DER_cert_to_PEM_cert(cert)

if get_certificate_chain:
if sys.version_info < (3, 13):
# The official way to access this has been added in https:/python/cpython/pull/109113/files.
# We're basically doing the same for older Python versions. The internal API needed for this was added
# in https:/python/cpython/commit/666991fc598bc312d72aff0078ecb553f0a968f1, which was first
# released in Python 3.10.0.
def _convert_chain(chain):
if not chain:
return []
return [c.public_bytes(ssl._ssl.ENCODING_DER) for c in chain]

ssl_obj = tls_sock._sslobj # This is of type ssl._ssl._SSLSocket
verified_der_chain = _convert_chain(ssl_obj.get_verified_chain())
unverified_der_chain = _convert_chain(ssl_obj.get_unverified_chain())
else:
# This works with Python 3.13+

# Unfortunately due to a bug (https:/python/cpython/issues/118658) some early pre-releases of
# Python 3.13 do not return lists of byte strings, but lists of _ssl.Certificate objects. This is going to
# be fixed by https:/python/cpython/pull/118669. For now we convert the certificates ourselves
# if they are not byte strings to work around this.
def _convert_chain(chain):
return [
c if isinstance(c, bytes) else c.public_bytes(ssl._ssl.ENCODING_DER)
for c in chain
]

verified_der_chain = _convert_chain(tls_sock.get_verified_chain())
unverified_der_chain = _convert_chain(tls_sock.get_unverified_chain())

verified_chain = [DER_cert_to_PEM_cert(c) for c in verified_der_chain]
unverified_chain = [DER_cert_to_PEM_cert(c) for c in unverified_der_chain]

except Exception as e:
if proxy_host:
module.fail_json(msg="Failed to get cert via proxy {0}:{1} from {2}:{3}, error: {4}".format(
Expand Down Expand Up @@ -499,6 +578,11 @@ def main():
else:
result['version'] = "unknown"

if verified_chain is not None:
result['verified_chain'] = verified_chain
if unverified_chain is not None:
result['unverified_chain'] = unverified_chain

module.exit_json(**result)


Expand Down
2 changes: 2 additions & 0 deletions tests/integration/targets/get_certificate/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

- set_fact:
skip_tests: false
has_get_certificate_chain: >-
{{ ansible_facts.python_version is version('3.10.0', '>=') }}
- block:

Expand Down
25 changes: 25 additions & 0 deletions tests/integration/targets/get_certificate/tests/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,38 @@
port: 443
select_crypto_backend: "{{ select_crypto_backend }}"
asn1_base64: true
get_certificate_chain: "{{ has_get_certificate_chain }}"
register: result

- assert:
that:
- result is not changed
- result is not failed

- name: Read CA cert
slurp:
src: '{{ remote_tmp_dir }}/temp.pem'
register: cacert
when: has_get_certificate_chain

- name: Validate get_certificate_chain=true results
assert:
that:
- result.verified_chain is sequence
- result.unverified_chain is sequence
- result.verified_chain[0] == result.cert
- result.unverified_chain[0] == result.cert
- result.verified_chain[-1] == cacert.content | b64decode
- result.verified_chain == result.unverified_chain + [cacert.content | b64decode]
when: has_get_certificate_chain

- name: Validate get_certificate_chain=false results
assert:
that:
- result.verified_chain is undefined
- result.unverified_chain is undefined
when: not has_get_certificate_chain

- name: Generate bogus CA privatekey
openssl_privatekey:
path: '{{ remote_tmp_dir }}/bogus_ca.key'
Expand Down

0 comments on commit d50c3cc

Please sign in to comment.