[prev in list] [next in list] [prev in thread] [next in thread]
List: pypy-svn
Subject: [pypy-commit] pypy stdlib-2.7.9: Implement SSLContext.get_ca_certs()
From: amauryfa <noreply () buildbot ! pypy ! org>
Date: 2015-01-30 23:45:15
Message-ID: 20150130234515.A2C511C0FAB () cobra ! cs ! uni-duesseldorf ! de
[Download RAW message or body]
Author: Amaury Forgeot d'Arc <amauryfa@gmail.com>
Branch: stdlib-2.7.9
Changeset: r75595:d5649f3f7a3b
Date: 2015-01-31 00:43 +0100
http://bitbucket.org/pypy/pypy/changeset/d5649f3f7a3b/
Log: Implement SSLContext.get_ca_certs()
diff --git a/pypy/module/_ssl/interp_ssl.py b/pypy/module/_ssl/interp_ssl.py
--- a/pypy/module/_ssl/interp_ssl.py
+++ b/pypy/module/_ssl/interp_ssl.py
@@ -525,16 +525,7 @@
if der:
# return cert in DER-encoded format
- with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr:
- buf_ptr[0] = lltype.nullptr(rffi.CCHARP.TO)
- length = libssl_i2d_X509(self.peer_cert, buf_ptr)
- if length < 0:
- raise _ssl_seterror(space, self, length)
- try:
- # this is actually an immutable bytes sequence
- return space.wrap(rffi.charpsize2str(buf_ptr[0], length))
- finally:
- libssl_OPENSSL_free(buf_ptr[0])
+ return _certificate_to_der(space, self.peer_cert)
else:
verification = libssl_SSL_CTX_get_verify_mode(
libssl_SSL_get_SSL_CTX(self.ssl))
@@ -622,37 +613,45 @@
_SSLSocket.descr_set_context),
)
+def _certificate_to_der(space, certificate):
+ with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr:
+ buf_ptr[0] = lltype.nullptr(rffi.CCHARP.TO)
+ length = libssl_i2d_X509(certificate, buf_ptr)
+ if length < 0:
+ raise _ssl_seterror(space, None, 0)
+ try:
+ return space.wrap(rffi.charpsize2str(buf_ptr[0], length))
+ finally:
+ libssl_OPENSSL_free(buf_ptr[0])
-def _decode_certificate(space, certificate, verbose=False):
+def _decode_certificate(space, certificate):
w_retval = space.newdict()
w_peer = _create_tuple_for_X509_NAME(
space, libssl_X509_get_subject_name(certificate))
space.setitem(w_retval, space.wrap("subject"), w_peer)
- if verbose:
- w_issuer = _create_tuple_for_X509_NAME(
- space, libssl_X509_get_issuer_name(certificate))
- space.setitem(w_retval, space.wrap("issuer"), w_issuer)
+ w_issuer = _create_tuple_for_X509_NAME(
+ space, libssl_X509_get_issuer_name(certificate))
+ space.setitem(w_retval, space.wrap("issuer"), w_issuer)
- space.setitem(w_retval, space.wrap("version"),
- space.wrap(libssl_X509_get_version(certificate)))
+ space.setitem(w_retval, space.wrap("version"),
+ space.wrap(libssl_X509_get_version(certificate)))
biobuf = libssl_BIO_new(libssl_BIO_s_mem())
try:
- if verbose:
- libssl_BIO_reset(biobuf)
- serialNumber = libssl_X509_get_serialNumber(certificate)
- libssl_i2a_ASN1_INTEGER(biobuf, serialNumber)
- # should not exceed 20 octets, 160 bits, so buf is big enough
- with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf:
- length = libssl_BIO_gets(biobuf, buf, 99)
- if length < 0:
- raise _ssl_seterror(space, None, length)
+ libssl_BIO_reset(biobuf)
+ serialNumber = libssl_X509_get_serialNumber(certificate)
+ libssl_i2a_ASN1_INTEGER(biobuf, serialNumber)
+ # should not exceed 20 octets, 160 bits, so buf is big enough
+ with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf:
+ length = libssl_BIO_gets(biobuf, buf, 99)
+ if length < 0:
+ raise _ssl_seterror(space, None, length)
- w_serial = space.wrap(rffi.charpsize2str(buf, length))
- space.setitem(w_retval, space.wrap("serialNumber"), w_serial)
+ w_serial = space.wrap(rffi.charpsize2str(buf, length))
+ space.setitem(w_retval, space.wrap("serialNumber"), w_serial)
libssl_BIO_reset(biobuf)
notBefore = libssl_X509_get_notBefore(certificate)
@@ -977,8 +976,8 @@
return getattr(space.fromcache(Cache), name)
-@unwrap_spec(filename=str, verbose=bool)
-def _test_decode_cert(space, filename, verbose=True):
+@unwrap_spec(filename=str)
+def _test_decode_cert(space, filename):
cert = libssl_BIO_new(libssl_BIO_s_file())
if not cert:
raise ssl_error(space, "Can't malloc memory to read file")
@@ -992,7 +991,7 @@
raise ssl_error(space, "Error decoding PEM-encoded file")
try:
- return _decode_certificate(space, x, verbose)
+ return _decode_certificate(space, x)
finally:
libssl_X509_free(x)
finally:
@@ -1352,6 +1351,27 @@
self.npn_protocols = SSLNpnProtocols(self.ctx, protos)
+ def get_ca_certs_w(self, space, w_binary_form=None):
+ if w_binary_form and space.is_true(w_binary_form):
+ binary_mode = True
+ else:
+ binary_mode = False
+ rlist = []
+ store = libssl_SSL_CTX_get_cert_store(self.ctx)
+ for i in range(libssl_sk_X509_OBJECT_num(store[0].c_objs)):
+ obj = libssl_sk_X509_OBJECT_value(store[0].c_objs, i)
+ if intmask(obj.c_type) != X509_LU_X509:
+ # not a x509 cert
+ continue
+ # CA for any purpose
+ cert = libssl_pypy_X509_OBJECT_data_x509(obj)
+ if not libssl_X509_check_ca(cert):
+ continue
+ if binary_mode:
+ rlist.append(_certificate_to_der(space, cert))
+ else:
+ rlist.append(_decode_certificate(space, cert))
+ return space.newlist(rlist)
_SSLContext.typedef = TypeDef(
"_ssl._SSLContext",
@@ -1364,6 +1384,7 @@
load_verify_locations=interp2app(_SSLContext.load_verify_locations_w),
set_default_verify_paths=interp2app(_SSLContext.descr_set_default_verify_paths),
_set_npn_protocols=interp2app(_SSLContext.set_npn_protocols_w),
+ get_ca_certs=interp2app(_SSLContext.get_ca_certs_w),
options=GetSetProperty(_SSLContext.descr_get_options,
_SSLContext.descr_set_options),
diff --git a/pypy/module/_ssl/test/test_ssl.py b/pypy/module/_ssl/test/test_ssl.py
--- a/pypy/module/_ssl/test/test_ssl.py
+++ b/pypy/module/_ssl/test/test_ssl.py
@@ -294,6 +294,9 @@
tmpfile = udir / "cert.passwd.pem"
tmpfile.write(SSL_CERTIFICATE_PROTECTED)
cls.w_cert_protected = cls.space.wrap(str(tmpfile))
+ tmpfile = udir / "python.org.pem"
+ tmpfile.write(SVN_PYTHON_ORG_ROOT_CERT)
+ cls.w_python_org_cert = cls.space.wrap(str(tmpfile))
cls.w_dh512 = cls.space.wrap(os.path.join(
os.path.dirname(__file__), 'dh512.pem'))
@@ -327,6 +330,17 @@
ctx.load_verify_locations(cadata=cacert_pem)
assert ctx.cert_store_stats()["x509_ca"]
+ def test_get_ca_certs(self):
+ import _ssl
+ ctx = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
+ ctx.load_verify_locations(self.keycert)
+ assert ctx.get_ca_certs() == []
+ ctx.load_verify_locations(self.python_org_cert)
+ certs = ctx.get_ca_certs()
+ assert len(certs) == 1
+ print(certs)
+ assert len(certs[0]['issuer']) == 4
+
def test_load_dh_params(self):
import _ssl
ctx = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
@@ -500,3 +514,46 @@
CRlNBAAlvhKzO7Clpf9l0YKBEfraJByX
-----END CERTIFICATE-----
"""
+SVN_PYTHON_ORG_ROOT_CERT = """
+-----BEGIN CERTIFICATE-----
+MIIHPTCCBSWgAwIBAgIBADANBgkqhkiG9w0BAQQFADB5MRAwDgYDVQQKEwdSb290
+IENBMR4wHAYDVQQLExVodHRwOi8vd3d3LmNhY2VydC5vcmcxIjAgBgNVBAMTGUNB
+IENlcnQgU2lnbmluZyBBdXRob3JpdHkxITAfBgkqhkiG9w0BCQEWEnN1cHBvcnRA
+Y2FjZXJ0Lm9yZzAeFw0wMzAzMzAxMjI5NDlaFw0zMzAzMjkxMjI5NDlaMHkxEDAO
+BgNVBAoTB1Jvb3QgQ0ExHjAcBgNVBAsTFWh0dHA6Ly93d3cuY2FjZXJ0Lm9yZzEi
+MCAGA1UEAxMZQ0EgQ2VydCBTaWduaW5nIEF1dGhvcml0eTEhMB8GCSqGSIb3DQEJ
+ARYSc3VwcG9ydEBjYWNlcnQub3JnMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
+CgKCAgEAziLA4kZ97DYoB1CW8qAzQIxL8TtmPzHlawI229Z89vGIj053NgVBlfkJ
+8BLPRoZzYLdufujAWGSuzbCtRRcMY/pnCujW0r8+55jE8Ez64AO7NV1sId6eINm6
+zWYyN3L69wj1x81YyY7nDl7qPv4coRQKFWyGhFtkZip6qUtTefWIonvuLwphK42y
+fk1WpRPs6tqSnqxEQR5YYGUFZvjARL3LlPdCfgv3ZWiYUQXw8wWRBB0bF4LsyFe7
+w2t6iPGwcswlWyCR7BYCEo8y6RcYSNDHBS4CMEK4JZwFaz+qOqfrU0j36NK2B5jc
+G8Y0f3/JHIJ6BVgrCFvzOKKrF11myZjXnhCLotLddJr3cQxyYN/Nb5gznZY0dj4k
+epKwDpUeb+agRThHqtdB7Uq3EvbXG4OKDy7YCbZZ16oE/9KTfWgu3YtLq1i6L43q
+laegw1SJpfvbi1EinbLDvhG+LJGGi5Z4rSDTii8aP8bQUWWHIbEZAWV/RRyH9XzQ
+QUxPKZgh/TMfdQwEUfoZd9vUFBzugcMd9Zi3aQaRIt0AUMyBMawSB3s42mhb5ivU
+fslfrejrckzzAeVLIL+aplfKkQABi6F1ITe1Yw1nPkZPcCBnzsXWWdsC4PDSy826
+YreQQejdIOQpvGQpQsgi3Hia/0PsmBsJUUtaWsJx8cTLc6nloQsCAwEAAaOCAc4w
+ggHKMB0GA1UdDgQWBBQWtTIb1Mfz4OaO873SsDrusjkY0TCBowYDVR0jBIGbMIGY
+gBQWtTIb1Mfz4OaO873SsDrusjkY0aF9pHsweTEQMA4GA1UEChMHUm9vdCBDQTEe
+MBwGA1UECxMVaHR0cDovL3d3dy5jYWNlcnQub3JnMSIwIAYDVQQDExlDQSBDZXJ0
+IFNpZ25pbmcgQXV0aG9yaXR5MSEwHwYJKoZIhvcNAQkBFhJzdXBwb3J0QGNhY2Vy
+dC5vcmeCAQAwDwYDVR0TAQH/BAUwAwEB/zAyBgNVHR8EKzApMCegJaAjhiFodHRw
+czovL3d3dy5jYWNlcnQub3JnL3Jldm9rZS5jcmwwMAYJYIZIAYb4QgEEBCMWIWh0
+dHBzOi8vd3d3LmNhY2VydC5vcmcvcmV2b2tlLmNybDA0BglghkgBhvhCAQgEJxYl
+aHR0cDovL3d3dy5jYWNlcnQub3JnL2luZGV4LnBocD9pZD0xMDBWBglghkgBhvhC
+AQ0ESRZHVG8gZ2V0IHlvdXIgb3duIGNlcnRpZmljYXRlIGZvciBGUkVFIGhlYWQg
+b3ZlciB0byBodHRwOi8vd3d3LmNhY2VydC5vcmcwDQYJKoZIhvcNAQEEBQADggIB
+ACjH7pyCArpcgBLKNQodgW+JapnM8mgPf6fhjViVPr3yBsOQWqy1YPaZQwGjiHCc
+nWKdpIevZ1gNMDY75q1I08t0AoZxPuIrA2jxNGJARjtT6ij0rPtmlVOKTV39O9lg
+18p5aTuxZZKmxoGCXJzN600BiqXfEVWqFcofN8CCmHBh22p8lqOOLlQ+TyGpkO/c
+gr/c6EWtTZBzCDyUZbAEmXZ/4rzCahWqlwQ3JNgelE5tDlG+1sSPypZt90Pf6DBl
+Jzt7u0NDY8RD97LsaMzhGY4i+5jhe1o+ATc7iwiwovOVThrLm82asduycPAtStvY
+sONvRUgzEv/+PDIqVPfE94rwiCPCR/5kenHA0R6mY7AHfqQv0wGP3J8rtsYIqQ+T
+SCX8Ev2fQtzzxD72V7DX3WnRBnc0CkvSyqD/HMaMyRa+xMwyN2hzXwj7UfdJUzYF
+CpUCTPJ5GhD22Dp1nPMd8aINcGeGG7MW9S/lpOt5hvk9C8JzC6WZrG/8Z7jlLwum
+GCSNe9FINSkYQKyTYOGWhlC0elnYjyELn8+CkcY7v2vcB5G5l1YjqrZslMZIBjzk
+zk6q5PYvCdxTby78dOs6Y5nCpqyJvKeyRKANihDjbPIky/qbn3BHLt4Ui9SyIAmW
+omTxJBzcoTWcFbLUvFUufQb1nA5V9FrWk9p2rSVzTMVD
+-----END CERTIFICATE-----
+"""
_______________________________________________
pypy-commit mailing list
pypy-commit@python.org
https://mail.python.org/mailman/listinfo/pypy-commit
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic