[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pypy-svn
Subject:    [pypy-commit] pypy stdlib-2.7.9: Implement SSLContext.get_ca_certs()
From:       amauryfa <noreply () buildbot ! pypy ! org>
Date:       2015-01-30 23:45:15
Message-ID: 20150130234515.A2C511C0FAB () cobra ! cs ! uni-duesseldorf ! de
[Download RAW message or body]

Author: Amaury Forgeot d'Arc <amauryfa@gmail.com>
Branch: stdlib-2.7.9
Changeset: r75595:d5649f3f7a3b
Date: 2015-01-31 00:43 +0100
http://bitbucket.org/pypy/pypy/changeset/d5649f3f7a3b/

Log:	Implement SSLContext.get_ca_certs()

diff --git a/pypy/module/_ssl/interp_ssl.py b/pypy/module/_ssl/interp_ssl.py
--- a/pypy/module/_ssl/interp_ssl.py
+++ b/pypy/module/_ssl/interp_ssl.py
@@ -525,16 +525,7 @@
 
         if der:
             # return cert in DER-encoded format
-            with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr:
-                buf_ptr[0] = lltype.nullptr(rffi.CCHARP.TO)
-                length = libssl_i2d_X509(self.peer_cert, buf_ptr)
-                if length < 0:
-                    raise _ssl_seterror(space, self, length)
-                try:
-                    # this is actually an immutable bytes sequence
-                    return space.wrap(rffi.charpsize2str(buf_ptr[0], length))
-                finally:
-                    libssl_OPENSSL_free(buf_ptr[0])
+            return _certificate_to_der(space, self.peer_cert)
         else:
             verification = libssl_SSL_CTX_get_verify_mode(
                 libssl_SSL_get_SSL_CTX(self.ssl))
@@ -622,37 +613,45 @@
                            _SSLSocket.descr_set_context),
 )
 
+def _certificate_to_der(space, certificate):
+    with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr:
+        buf_ptr[0] = lltype.nullptr(rffi.CCHARP.TO)
+        length = libssl_i2d_X509(certificate, buf_ptr)
+        if length < 0:
+            raise _ssl_seterror(space, None, 0)
+        try:
+            return space.wrap(rffi.charpsize2str(buf_ptr[0], length))
+        finally:
+            libssl_OPENSSL_free(buf_ptr[0])
 
-def _decode_certificate(space, certificate, verbose=False):
+def _decode_certificate(space, certificate):
     w_retval = space.newdict()
 
     w_peer = _create_tuple_for_X509_NAME(
         space, libssl_X509_get_subject_name(certificate))
     space.setitem(w_retval, space.wrap("subject"), w_peer)
 
-    if verbose:
-        w_issuer = _create_tuple_for_X509_NAME(
-            space, libssl_X509_get_issuer_name(certificate))
-        space.setitem(w_retval, space.wrap("issuer"), w_issuer)
+    w_issuer = _create_tuple_for_X509_NAME(
+        space, libssl_X509_get_issuer_name(certificate))
+    space.setitem(w_retval, space.wrap("issuer"), w_issuer)
 
-        space.setitem(w_retval, space.wrap("version"),
-                      space.wrap(libssl_X509_get_version(certificate)))
+    space.setitem(w_retval, space.wrap("version"),
+                  space.wrap(libssl_X509_get_version(certificate)))
 
     biobuf = libssl_BIO_new(libssl_BIO_s_mem())
     try:
 
-        if verbose:
-            libssl_BIO_reset(biobuf)
-            serialNumber = libssl_X509_get_serialNumber(certificate)
-            libssl_i2a_ASN1_INTEGER(biobuf, serialNumber)
-            # should not exceed 20 octets, 160 bits, so buf is big enough
-            with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf:
-                length = libssl_BIO_gets(biobuf, buf, 99)
-                if length < 0:
-                    raise _ssl_seterror(space, None, length)
+        libssl_BIO_reset(biobuf)
+        serialNumber = libssl_X509_get_serialNumber(certificate)
+        libssl_i2a_ASN1_INTEGER(biobuf, serialNumber)
+        # should not exceed 20 octets, 160 bits, so buf is big enough
+        with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf:
+            length = libssl_BIO_gets(biobuf, buf, 99)
+            if length < 0:
+                raise _ssl_seterror(space, None, length)
 
-                w_serial = space.wrap(rffi.charpsize2str(buf, length))
-            space.setitem(w_retval, space.wrap("serialNumber"), w_serial)
+            w_serial = space.wrap(rffi.charpsize2str(buf, length))
+        space.setitem(w_retval, space.wrap("serialNumber"), w_serial)
 
         libssl_BIO_reset(biobuf)
         notBefore = libssl_X509_get_notBefore(certificate)
@@ -977,8 +976,8 @@
     return getattr(space.fromcache(Cache), name)
 
 
-@unwrap_spec(filename=str, verbose=bool)
-def _test_decode_cert(space, filename, verbose=True):
+@unwrap_spec(filename=str)
+def _test_decode_cert(space, filename):
     cert = libssl_BIO_new(libssl_BIO_s_file())
     if not cert:
         raise ssl_error(space, "Can't malloc memory to read file")
@@ -992,7 +991,7 @@
             raise ssl_error(space, "Error decoding PEM-encoded file")
 
         try:
-            return _decode_certificate(space, x, verbose)
+            return _decode_certificate(space, x)
         finally:
             libssl_X509_free(x)
     finally:
@@ -1352,6 +1351,27 @@
 
         self.npn_protocols = SSLNpnProtocols(self.ctx, protos)
 
+    def get_ca_certs_w(self, space, w_binary_form=None):
+        if w_binary_form and space.is_true(w_binary_form):
+            binary_mode = True
+        else:
+            binary_mode = False
+        rlist = []
+        store = libssl_SSL_CTX_get_cert_store(self.ctx)
+        for i in range(libssl_sk_X509_OBJECT_num(store[0].c_objs)):
+            obj = libssl_sk_X509_OBJECT_value(store[0].c_objs, i)
+            if intmask(obj.c_type) != X509_LU_X509:
+                # not a x509 cert
+                continue
+            # CA for any purpose
+            cert = libssl_pypy_X509_OBJECT_data_x509(obj)
+            if not libssl_X509_check_ca(cert):
+                continue
+            if binary_mode:
+                rlist.append(_certificate_to_der(space, cert))
+            else:
+                rlist.append(_decode_certificate(space, cert))
+        return space.newlist(rlist)
 
 _SSLContext.typedef = TypeDef(
     "_ssl._SSLContext",
@@ -1364,6 +1384,7 @@
     load_verify_locations=interp2app(_SSLContext.load_verify_locations_w),
     set_default_verify_paths=interp2app(_SSLContext.descr_set_default_verify_paths),
     _set_npn_protocols=interp2app(_SSLContext.set_npn_protocols_w),
+    get_ca_certs=interp2app(_SSLContext.get_ca_certs_w),
 
     options=GetSetProperty(_SSLContext.descr_get_options,
                            _SSLContext.descr_set_options),
diff --git a/pypy/module/_ssl/test/test_ssl.py b/pypy/module/_ssl/test/test_ssl.py
--- a/pypy/module/_ssl/test/test_ssl.py
+++ b/pypy/module/_ssl/test/test_ssl.py
@@ -294,6 +294,9 @@
         tmpfile = udir / "cert.passwd.pem"
         tmpfile.write(SSL_CERTIFICATE_PROTECTED)
         cls.w_cert_protected = cls.space.wrap(str(tmpfile))
+        tmpfile = udir / "python.org.pem"
+        tmpfile.write(SVN_PYTHON_ORG_ROOT_CERT)
+        cls.w_python_org_cert = cls.space.wrap(str(tmpfile))
         cls.w_dh512 = cls.space.wrap(os.path.join(
             os.path.dirname(__file__), 'dh512.pem'))
 
@@ -327,6 +330,17 @@
         ctx.load_verify_locations(cadata=cacert_pem)
         assert ctx.cert_store_stats()["x509_ca"]
 
+    def test_get_ca_certs(self):
+        import _ssl
+        ctx = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
+        ctx.load_verify_locations(self.keycert)
+        assert ctx.get_ca_certs() == []
+        ctx.load_verify_locations(self.python_org_cert)
+        certs = ctx.get_ca_certs()
+        assert len(certs) == 1
+        print(certs)
+        assert len(certs[0]['issuer']) == 4
+
     def test_load_dh_params(self):
         import _ssl
         ctx = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
@@ -500,3 +514,46 @@
 CRlNBAAlvhKzO7Clpf9l0YKBEfraJByX
 -----END CERTIFICATE-----
 """
+SVN_PYTHON_ORG_ROOT_CERT = """
+-----BEGIN CERTIFICATE-----
+MIIHPTCCBSWgAwIBAgIBADANBgkqhkiG9w0BAQQFADB5MRAwDgYDVQQKEwdSb290
+IENBMR4wHAYDVQQLExVodHRwOi8vd3d3LmNhY2VydC5vcmcxIjAgBgNVBAMTGUNB
+IENlcnQgU2lnbmluZyBBdXRob3JpdHkxITAfBgkqhkiG9w0BCQEWEnN1cHBvcnRA
+Y2FjZXJ0Lm9yZzAeFw0wMzAzMzAxMjI5NDlaFw0zMzAzMjkxMjI5NDlaMHkxEDAO
+BgNVBAoTB1Jvb3QgQ0ExHjAcBgNVBAsTFWh0dHA6Ly93d3cuY2FjZXJ0Lm9yZzEi
+MCAGA1UEAxMZQ0EgQ2VydCBTaWduaW5nIEF1dGhvcml0eTEhMB8GCSqGSIb3DQEJ
+ARYSc3VwcG9ydEBjYWNlcnQub3JnMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
+CgKCAgEAziLA4kZ97DYoB1CW8qAzQIxL8TtmPzHlawI229Z89vGIj053NgVBlfkJ
+8BLPRoZzYLdufujAWGSuzbCtRRcMY/pnCujW0r8+55jE8Ez64AO7NV1sId6eINm6
+zWYyN3L69wj1x81YyY7nDl7qPv4coRQKFWyGhFtkZip6qUtTefWIonvuLwphK42y
+fk1WpRPs6tqSnqxEQR5YYGUFZvjARL3LlPdCfgv3ZWiYUQXw8wWRBB0bF4LsyFe7
+w2t6iPGwcswlWyCR7BYCEo8y6RcYSNDHBS4CMEK4JZwFaz+qOqfrU0j36NK2B5jc
+G8Y0f3/JHIJ6BVgrCFvzOKKrF11myZjXnhCLotLddJr3cQxyYN/Nb5gznZY0dj4k
+epKwDpUeb+agRThHqtdB7Uq3EvbXG4OKDy7YCbZZ16oE/9KTfWgu3YtLq1i6L43q
+laegw1SJpfvbi1EinbLDvhG+LJGGi5Z4rSDTii8aP8bQUWWHIbEZAWV/RRyH9XzQ
+QUxPKZgh/TMfdQwEUfoZd9vUFBzugcMd9Zi3aQaRIt0AUMyBMawSB3s42mhb5ivU
+fslfrejrckzzAeVLIL+aplfKkQABi6F1ITe1Yw1nPkZPcCBnzsXWWdsC4PDSy826
+YreQQejdIOQpvGQpQsgi3Hia/0PsmBsJUUtaWsJx8cTLc6nloQsCAwEAAaOCAc4w
+ggHKMB0GA1UdDgQWBBQWtTIb1Mfz4OaO873SsDrusjkY0TCBowYDVR0jBIGbMIGY
+gBQWtTIb1Mfz4OaO873SsDrusjkY0aF9pHsweTEQMA4GA1UEChMHUm9vdCBDQTEe
+MBwGA1UECxMVaHR0cDovL3d3dy5jYWNlcnQub3JnMSIwIAYDVQQDExlDQSBDZXJ0
+IFNpZ25pbmcgQXV0aG9yaXR5MSEwHwYJKoZIhvcNAQkBFhJzdXBwb3J0QGNhY2Vy
+dC5vcmeCAQAwDwYDVR0TAQH/BAUwAwEB/zAyBgNVHR8EKzApMCegJaAjhiFodHRw
+czovL3d3dy5jYWNlcnQub3JnL3Jldm9rZS5jcmwwMAYJYIZIAYb4QgEEBCMWIWh0
+dHBzOi8vd3d3LmNhY2VydC5vcmcvcmV2b2tlLmNybDA0BglghkgBhvhCAQgEJxYl
+aHR0cDovL3d3dy5jYWNlcnQub3JnL2luZGV4LnBocD9pZD0xMDBWBglghkgBhvhC
+AQ0ESRZHVG8gZ2V0IHlvdXIgb3duIGNlcnRpZmljYXRlIGZvciBGUkVFIGhlYWQg
+b3ZlciB0byBodHRwOi8vd3d3LmNhY2VydC5vcmcwDQYJKoZIhvcNAQEEBQADggIB
+ACjH7pyCArpcgBLKNQodgW+JapnM8mgPf6fhjViVPr3yBsOQWqy1YPaZQwGjiHCc
+nWKdpIevZ1gNMDY75q1I08t0AoZxPuIrA2jxNGJARjtT6ij0rPtmlVOKTV39O9lg
+18p5aTuxZZKmxoGCXJzN600BiqXfEVWqFcofN8CCmHBh22p8lqOOLlQ+TyGpkO/c
+gr/c6EWtTZBzCDyUZbAEmXZ/4rzCahWqlwQ3JNgelE5tDlG+1sSPypZt90Pf6DBl
+Jzt7u0NDY8RD97LsaMzhGY4i+5jhe1o+ATc7iwiwovOVThrLm82asduycPAtStvY
+sONvRUgzEv/+PDIqVPfE94rwiCPCR/5kenHA0R6mY7AHfqQv0wGP3J8rtsYIqQ+T
+SCX8Ev2fQtzzxD72V7DX3WnRBnc0CkvSyqD/HMaMyRa+xMwyN2hzXwj7UfdJUzYF
+CpUCTPJ5GhD22Dp1nPMd8aINcGeGG7MW9S/lpOt5hvk9C8JzC6WZrG/8Z7jlLwum
+GCSNe9FINSkYQKyTYOGWhlC0elnYjyELn8+CkcY7v2vcB5G5l1YjqrZslMZIBjzk
+zk6q5PYvCdxTby78dOs6Y5nCpqyJvKeyRKANihDjbPIky/qbn3BHLt4Ui9SyIAmW
+omTxJBzcoTWcFbLUvFUufQb1nA5V9FrWk9p2rSVzTMVD
+-----END CERTIFICATE-----
+"""
_______________________________________________
pypy-commit mailing list
pypy-commit@python.org
https://mail.python.org/mailman/listinfo/pypy-commit
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic