Spaces:
Runtime error
Runtime error
| import os | |
| import socket | |
| from errno import errorcode | |
| from functools import partial, wraps | |
| from itertools import chain, count | |
| from sys import platform | |
| from weakref import WeakValueDictionary | |
| from OpenSSL._util import ( | |
| UNSPECIFIED as _UNSPECIFIED, | |
| exception_from_error_queue as _exception_from_error_queue, | |
| ffi as _ffi, | |
| lib as _lib, | |
| make_assert as _make_assert, | |
| no_zero_allocator as _no_zero_allocator, | |
| path_bytes as _path_bytes, | |
| text_to_bytes_and_warn as _text_to_bytes_and_warn, | |
| ) | |
| from OpenSSL.crypto import ( | |
| FILETYPE_PEM, | |
| PKey, | |
| X509, | |
| X509Name, | |
| X509Store, | |
| _PassphraseHelper, | |
| ) | |
| __all__ = [ | |
| "OPENSSL_VERSION_NUMBER", | |
| "SSLEAY_VERSION", | |
| "SSLEAY_CFLAGS", | |
| "SSLEAY_PLATFORM", | |
| "SSLEAY_DIR", | |
| "SSLEAY_BUILT_ON", | |
| "OPENSSL_VERSION", | |
| "OPENSSL_CFLAGS", | |
| "OPENSSL_PLATFORM", | |
| "OPENSSL_DIR", | |
| "OPENSSL_BUILT_ON", | |
| "SENT_SHUTDOWN", | |
| "RECEIVED_SHUTDOWN", | |
| "SSLv23_METHOD", | |
| "TLSv1_METHOD", | |
| "TLSv1_1_METHOD", | |
| "TLSv1_2_METHOD", | |
| "TLS_METHOD", | |
| "TLS_SERVER_METHOD", | |
| "TLS_CLIENT_METHOD", | |
| "DTLS_METHOD", | |
| "DTLS_SERVER_METHOD", | |
| "DTLS_CLIENT_METHOD", | |
| "SSL3_VERSION", | |
| "TLS1_VERSION", | |
| "TLS1_1_VERSION", | |
| "TLS1_2_VERSION", | |
| "TLS1_3_VERSION", | |
| "OP_NO_SSLv2", | |
| "OP_NO_SSLv3", | |
| "OP_NO_TLSv1", | |
| "OP_NO_TLSv1_1", | |
| "OP_NO_TLSv1_2", | |
| "MODE_RELEASE_BUFFERS", | |
| "OP_SINGLE_DH_USE", | |
| "OP_SINGLE_ECDH_USE", | |
| "OP_EPHEMERAL_RSA", | |
| "OP_MICROSOFT_SESS_ID_BUG", | |
| "OP_NETSCAPE_CHALLENGE_BUG", | |
| "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG", | |
| "OP_SSLREF2_REUSE_CERT_TYPE_BUG", | |
| "OP_MICROSOFT_BIG_SSLV3_BUFFER", | |
| "OP_MSIE_SSLV2_RSA_PADDING", | |
| "OP_SSLEAY_080_CLIENT_DH_BUG", | |
| "OP_TLS_D5_BUG", | |
| "OP_TLS_BLOCK_PADDING_BUG", | |
| "OP_DONT_INSERT_EMPTY_FRAGMENTS", | |
| "OP_CIPHER_SERVER_PREFERENCE", | |
| "OP_TLS_ROLLBACK_BUG", | |
| "OP_PKCS1_CHECK_1", | |
| "OP_PKCS1_CHECK_2", | |
| "OP_NETSCAPE_CA_DN_BUG", | |
| "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG", | |
| "OP_NO_COMPRESSION", | |
| "OP_NO_QUERY_MTU", | |
| "OP_COOKIE_EXCHANGE", | |
| "OP_NO_TICKET", | |
| "OP_ALL", | |
| "VERIFY_PEER", | |
| "VERIFY_FAIL_IF_NO_PEER_CERT", | |
| "VERIFY_CLIENT_ONCE", | |
| "VERIFY_NONE", | |
| "SESS_CACHE_OFF", | |
| "SESS_CACHE_CLIENT", | |
| "SESS_CACHE_SERVER", | |
| "SESS_CACHE_BOTH", | |
| "SESS_CACHE_NO_AUTO_CLEAR", | |
| "SESS_CACHE_NO_INTERNAL_LOOKUP", | |
| "SESS_CACHE_NO_INTERNAL_STORE", | |
| "SESS_CACHE_NO_INTERNAL", | |
| "SSL_ST_CONNECT", | |
| "SSL_ST_ACCEPT", | |
| "SSL_ST_MASK", | |
| "SSL_CB_LOOP", | |
| "SSL_CB_EXIT", | |
| "SSL_CB_READ", | |
| "SSL_CB_WRITE", | |
| "SSL_CB_ALERT", | |
| "SSL_CB_READ_ALERT", | |
| "SSL_CB_WRITE_ALERT", | |
| "SSL_CB_ACCEPT_LOOP", | |
| "SSL_CB_ACCEPT_EXIT", | |
| "SSL_CB_CONNECT_LOOP", | |
| "SSL_CB_CONNECT_EXIT", | |
| "SSL_CB_HANDSHAKE_START", | |
| "SSL_CB_HANDSHAKE_DONE", | |
| "Error", | |
| "WantReadError", | |
| "WantWriteError", | |
| "WantX509LookupError", | |
| "ZeroReturnError", | |
| "SysCallError", | |
| "NO_OVERLAPPING_PROTOCOLS", | |
| "SSLeay_version", | |
| "Session", | |
| "Context", | |
| "Connection", | |
| ] | |
| OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER | |
| OPENSSL_VERSION = SSLEAY_VERSION = _lib.OPENSSL_VERSION | |
| OPENSSL_CFLAGS = SSLEAY_CFLAGS = _lib.OPENSSL_CFLAGS | |
| OPENSSL_PLATFORM = SSLEAY_PLATFORM = _lib.OPENSSL_PLATFORM | |
| OPENSSL_DIR = SSLEAY_DIR = _lib.OPENSSL_DIR | |
| OPENSSL_BUILT_ON = SSLEAY_BUILT_ON = _lib.OPENSSL_BUILT_ON | |
| SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN | |
| RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN | |
| SSLv23_METHOD = 3 | |
| TLSv1_METHOD = 4 | |
| TLSv1_1_METHOD = 5 | |
| TLSv1_2_METHOD = 6 | |
| TLS_METHOD = 7 | |
| TLS_SERVER_METHOD = 8 | |
| TLS_CLIENT_METHOD = 9 | |
| DTLS_METHOD = 10 | |
| DTLS_SERVER_METHOD = 11 | |
| DTLS_CLIENT_METHOD = 12 | |
| try: | |
| SSL3_VERSION = _lib.SSL3_VERSION | |
| TLS1_VERSION = _lib.TLS1_VERSION | |
| TLS1_1_VERSION = _lib.TLS1_1_VERSION | |
| TLS1_2_VERSION = _lib.TLS1_2_VERSION | |
| TLS1_3_VERSION = _lib.TLS1_3_VERSION | |
| except AttributeError: | |
| # Hardcode constants for cryptography < 3.4, see | |
| # https://github.com/pyca/pyopenssl/pull/985#issuecomment-775186682 | |
| SSL3_VERSION = 768 | |
| TLS1_VERSION = 769 | |
| TLS1_1_VERSION = 770 | |
| TLS1_2_VERSION = 771 | |
| TLS1_3_VERSION = 772 | |
| OP_NO_SSLv2 = _lib.SSL_OP_NO_SSLv2 | |
| OP_NO_SSLv3 = _lib.SSL_OP_NO_SSLv3 | |
| OP_NO_TLSv1 = _lib.SSL_OP_NO_TLSv1 | |
| OP_NO_TLSv1_1 = _lib.SSL_OP_NO_TLSv1_1 | |
| OP_NO_TLSv1_2 = _lib.SSL_OP_NO_TLSv1_2 | |
| try: | |
| OP_NO_TLSv1_3 = _lib.SSL_OP_NO_TLSv1_3 | |
| __all__.append("OP_NO_TLSv1_3") | |
| except AttributeError: | |
| pass | |
| MODE_RELEASE_BUFFERS = _lib.SSL_MODE_RELEASE_BUFFERS | |
| OP_SINGLE_DH_USE = _lib.SSL_OP_SINGLE_DH_USE | |
| OP_SINGLE_ECDH_USE = _lib.SSL_OP_SINGLE_ECDH_USE | |
| OP_EPHEMERAL_RSA = _lib.SSL_OP_EPHEMERAL_RSA | |
| OP_MICROSOFT_SESS_ID_BUG = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG | |
| OP_NETSCAPE_CHALLENGE_BUG = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG | |
| OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG = ( | |
| _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG | |
| ) | |
| OP_SSLREF2_REUSE_CERT_TYPE_BUG = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG | |
| OP_MICROSOFT_BIG_SSLV3_BUFFER = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER | |
| OP_MSIE_SSLV2_RSA_PADDING = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING | |
| OP_SSLEAY_080_CLIENT_DH_BUG = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG | |
| OP_TLS_D5_BUG = _lib.SSL_OP_TLS_D5_BUG | |
| OP_TLS_BLOCK_PADDING_BUG = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG | |
| OP_DONT_INSERT_EMPTY_FRAGMENTS = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS | |
| OP_CIPHER_SERVER_PREFERENCE = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE | |
| OP_TLS_ROLLBACK_BUG = _lib.SSL_OP_TLS_ROLLBACK_BUG | |
| OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1 | |
| OP_PKCS1_CHECK_2 = _lib.SSL_OP_PKCS1_CHECK_2 | |
| OP_NETSCAPE_CA_DN_BUG = _lib.SSL_OP_NETSCAPE_CA_DN_BUG | |
| OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG = ( | |
| _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG | |
| ) | |
| OP_NO_COMPRESSION = _lib.SSL_OP_NO_COMPRESSION | |
| OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU | |
| OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE | |
| OP_NO_TICKET = _lib.SSL_OP_NO_TICKET | |
| try: | |
| OP_NO_RENEGOTIATION = _lib.SSL_OP_NO_RENEGOTIATION | |
| __all__.append("OP_NO_RENEGOTIATION") | |
| except AttributeError: | |
| pass | |
| try: | |
| OP_IGNORE_UNEXPECTED_EOF = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF | |
| __all__.append("OP_IGNORE_UNEXPECTED_EOF") | |
| except AttributeError: | |
| pass | |
| OP_ALL = _lib.SSL_OP_ALL | |
| VERIFY_PEER = _lib.SSL_VERIFY_PEER | |
| VERIFY_FAIL_IF_NO_PEER_CERT = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT | |
| VERIFY_CLIENT_ONCE = _lib.SSL_VERIFY_CLIENT_ONCE | |
| VERIFY_NONE = _lib.SSL_VERIFY_NONE | |
| SESS_CACHE_OFF = _lib.SSL_SESS_CACHE_OFF | |
| SESS_CACHE_CLIENT = _lib.SSL_SESS_CACHE_CLIENT | |
| SESS_CACHE_SERVER = _lib.SSL_SESS_CACHE_SERVER | |
| SESS_CACHE_BOTH = _lib.SSL_SESS_CACHE_BOTH | |
| SESS_CACHE_NO_AUTO_CLEAR = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR | |
| SESS_CACHE_NO_INTERNAL_LOOKUP = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP | |
| SESS_CACHE_NO_INTERNAL_STORE = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE | |
| SESS_CACHE_NO_INTERNAL = _lib.SSL_SESS_CACHE_NO_INTERNAL | |
| SSL_ST_CONNECT = _lib.SSL_ST_CONNECT | |
| SSL_ST_ACCEPT = _lib.SSL_ST_ACCEPT | |
| SSL_ST_MASK = _lib.SSL_ST_MASK | |
| SSL_CB_LOOP = _lib.SSL_CB_LOOP | |
| SSL_CB_EXIT = _lib.SSL_CB_EXIT | |
| SSL_CB_READ = _lib.SSL_CB_READ | |
| SSL_CB_WRITE = _lib.SSL_CB_WRITE | |
| SSL_CB_ALERT = _lib.SSL_CB_ALERT | |
| SSL_CB_READ_ALERT = _lib.SSL_CB_READ_ALERT | |
| SSL_CB_WRITE_ALERT = _lib.SSL_CB_WRITE_ALERT | |
| SSL_CB_ACCEPT_LOOP = _lib.SSL_CB_ACCEPT_LOOP | |
| SSL_CB_ACCEPT_EXIT = _lib.SSL_CB_ACCEPT_EXIT | |
| SSL_CB_CONNECT_LOOP = _lib.SSL_CB_CONNECT_LOOP | |
| SSL_CB_CONNECT_EXIT = _lib.SSL_CB_CONNECT_EXIT | |
| SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START | |
| SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE | |
| # Taken from https://golang.org/src/crypto/x509/root_linux.go | |
| _CERTIFICATE_FILE_LOCATIONS = [ | |
| "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc. | |
| "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6 | |
| "/etc/ssl/ca-bundle.pem", # OpenSUSE | |
| "/etc/pki/tls/cacert.pem", # OpenELEC | |
| "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7 | |
| ] | |
| _CERTIFICATE_PATH_LOCATIONS = [ | |
| "/etc/ssl/certs", # SLES10/SLES11 | |
| ] | |
| # These values are compared to output from cffi's ffi.string so they must be | |
| # byte strings. | |
| _CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs" | |
| _CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem" | |
| class Error(Exception): | |
| """ | |
| An error occurred in an `OpenSSL.SSL` API. | |
| """ | |
| _raise_current_error = partial(_exception_from_error_queue, Error) | |
| _openssl_assert = _make_assert(Error) | |
| class WantReadError(Error): | |
| pass | |
| class WantWriteError(Error): | |
| pass | |
| class WantX509LookupError(Error): | |
| pass | |
| class ZeroReturnError(Error): | |
| pass | |
| class SysCallError(Error): | |
| pass | |
| class _CallbackExceptionHelper: | |
| """ | |
| A base class for wrapper classes that allow for intelligent exception | |
| handling in OpenSSL callbacks. | |
| :ivar list _problems: Any exceptions that occurred while executing in a | |
| context where they could not be raised in the normal way. Typically | |
| this is because OpenSSL has called into some Python code and requires a | |
| return value. The exceptions are saved to be raised later when it is | |
| possible to do so. | |
| """ | |
| def __init__(self): | |
| self._problems = [] | |
| def raise_if_problem(self): | |
| """ | |
| Raise an exception from the OpenSSL error queue or that was previously | |
| captured whe running a callback. | |
| """ | |
| if self._problems: | |
| try: | |
| _raise_current_error() | |
| except Error: | |
| pass | |
| raise self._problems.pop(0) | |
| class _VerifyHelper(_CallbackExceptionHelper): | |
| """ | |
| Wrap a callback such that it can be used as a certificate verification | |
| callback. | |
| """ | |
| def __init__(self, callback): | |
| _CallbackExceptionHelper.__init__(self) | |
| def wrapper(ok, store_ctx): | |
| x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) | |
| _lib.X509_up_ref(x509) | |
| cert = X509._from_raw_x509_ptr(x509) | |
| error_number = _lib.X509_STORE_CTX_get_error(store_ctx) | |
| error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx) | |
| index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx() | |
| ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index) | |
| connection = Connection._reverse_mapping[ssl] | |
| try: | |
| result = callback( | |
| connection, cert, error_number, error_depth, ok | |
| ) | |
| except Exception as e: | |
| self._problems.append(e) | |
| return 0 | |
| else: | |
| if result: | |
| _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK) | |
| return 1 | |
| else: | |
| return 0 | |
| self.callback = _ffi.callback( | |
| "int (*)(int, X509_STORE_CTX *)", wrapper | |
| ) | |
| NO_OVERLAPPING_PROTOCOLS = object() | |
| class _ALPNSelectHelper(_CallbackExceptionHelper): | |
| """ | |
| Wrap a callback such that it can be used as an ALPN selection callback. | |
| """ | |
| def __init__(self, callback): | |
| _CallbackExceptionHelper.__init__(self) | |
| def wrapper(ssl, out, outlen, in_, inlen, arg): | |
| try: | |
| conn = Connection._reverse_mapping[ssl] | |
| # The string passed to us is made up of multiple | |
| # length-prefixed bytestrings. We need to split that into a | |
| # list. | |
| instr = _ffi.buffer(in_, inlen)[:] | |
| protolist = [] | |
| while instr: | |
| encoded_len = instr[0] | |
| proto = instr[1 : encoded_len + 1] | |
| protolist.append(proto) | |
| instr = instr[encoded_len + 1 :] | |
| # Call the callback | |
| outbytes = callback(conn, protolist) | |
| any_accepted = True | |
| if outbytes is NO_OVERLAPPING_PROTOCOLS: | |
| outbytes = b"" | |
| any_accepted = False | |
| elif not isinstance(outbytes, bytes): | |
| raise TypeError( | |
| "ALPN callback must return a bytestring or the " | |
| "special NO_OVERLAPPING_PROTOCOLS sentinel value." | |
| ) | |
| # Save our callback arguments on the connection object to make | |
| # sure that they don't get freed before OpenSSL can use them. | |
| # Then, return them in the appropriate output parameters. | |
| conn._alpn_select_callback_args = [ | |
| _ffi.new("unsigned char *", len(outbytes)), | |
| _ffi.new("unsigned char[]", outbytes), | |
| ] | |
| outlen[0] = conn._alpn_select_callback_args[0][0] | |
| out[0] = conn._alpn_select_callback_args[1] | |
| if not any_accepted: | |
| return _lib.SSL_TLSEXT_ERR_NOACK | |
| return _lib.SSL_TLSEXT_ERR_OK | |
| except Exception as e: | |
| self._problems.append(e) | |
| return _lib.SSL_TLSEXT_ERR_ALERT_FATAL | |
| self.callback = _ffi.callback( | |
| ( | |
| "int (*)(SSL *, unsigned char **, unsigned char *, " | |
| "const unsigned char *, unsigned int, void *)" | |
| ), | |
| wrapper, | |
| ) | |
| class _OCSPServerCallbackHelper(_CallbackExceptionHelper): | |
| """ | |
| Wrap a callback such that it can be used as an OCSP callback for the server | |
| side. | |
| Annoyingly, OpenSSL defines one OCSP callback but uses it in two different | |
| ways. For servers, that callback is expected to retrieve some OCSP data and | |
| hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, | |
| SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback | |
| is expected to check the OCSP data, and returns a negative value on error, | |
| 0 if the response is not acceptable, or positive if it is. These are | |
| mutually exclusive return code behaviours, and they mean that we need two | |
| helpers so that we always return an appropriate error code if the user's | |
| code throws an exception. | |
| Given that we have to have two helpers anyway, these helpers are a bit more | |
| helpery than most: specifically, they hide a few more of the OpenSSL | |
| functions so that the user has an easier time writing these callbacks. | |
| This helper implements the server side. | |
| """ | |
| def __init__(self, callback): | |
| _CallbackExceptionHelper.__init__(self) | |
| def wrapper(ssl, cdata): | |
| try: | |
| conn = Connection._reverse_mapping[ssl] | |
| # Extract the data if any was provided. | |
| if cdata != _ffi.NULL: | |
| data = _ffi.from_handle(cdata) | |
| else: | |
| data = None | |
| # Call the callback. | |
| ocsp_data = callback(conn, data) | |
| if not isinstance(ocsp_data, bytes): | |
| raise TypeError("OCSP callback must return a bytestring.") | |
| # If the OCSP data was provided, we will pass it to OpenSSL. | |
| # However, we have an early exit here: if no OCSP data was | |
| # provided we will just exit out and tell OpenSSL that there | |
| # is nothing to do. | |
| if not ocsp_data: | |
| return 3 # SSL_TLSEXT_ERR_NOACK | |
| # OpenSSL takes ownership of this data and expects it to have | |
| # been allocated by OPENSSL_malloc. | |
| ocsp_data_length = len(ocsp_data) | |
| data_ptr = _lib.OPENSSL_malloc(ocsp_data_length) | |
| _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data | |
| _lib.SSL_set_tlsext_status_ocsp_resp( | |
| ssl, data_ptr, ocsp_data_length | |
| ) | |
| return 0 | |
| except Exception as e: | |
| self._problems.append(e) | |
| return 2 # SSL_TLSEXT_ERR_ALERT_FATAL | |
| self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) | |
| class _OCSPClientCallbackHelper(_CallbackExceptionHelper): | |
| """ | |
| Wrap a callback such that it can be used as an OCSP callback for the client | |
| side. | |
| Annoyingly, OpenSSL defines one OCSP callback but uses it in two different | |
| ways. For servers, that callback is expected to retrieve some OCSP data and | |
| hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, | |
| SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback | |
| is expected to check the OCSP data, and returns a negative value on error, | |
| 0 if the response is not acceptable, or positive if it is. These are | |
| mutually exclusive return code behaviours, and they mean that we need two | |
| helpers so that we always return an appropriate error code if the user's | |
| code throws an exception. | |
| Given that we have to have two helpers anyway, these helpers are a bit more | |
| helpery than most: specifically, they hide a few more of the OpenSSL | |
| functions so that the user has an easier time writing these callbacks. | |
| This helper implements the client side. | |
| """ | |
| def __init__(self, callback): | |
| _CallbackExceptionHelper.__init__(self) | |
| def wrapper(ssl, cdata): | |
| try: | |
| conn = Connection._reverse_mapping[ssl] | |
| # Extract the data if any was provided. | |
| if cdata != _ffi.NULL: | |
| data = _ffi.from_handle(cdata) | |
| else: | |
| data = None | |
| # Get the OCSP data. | |
| ocsp_ptr = _ffi.new("unsigned char **") | |
| ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr) | |
| if ocsp_len < 0: | |
| # No OCSP data. | |
| ocsp_data = b"" | |
| else: | |
| # Copy the OCSP data, then pass it to the callback. | |
| ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:] | |
| valid = callback(conn, ocsp_data, data) | |
| # Return 1 on success or 0 on error. | |
| return int(bool(valid)) | |
| except Exception as e: | |
| self._problems.append(e) | |
| # Return negative value if an exception is hit. | |
| return -1 | |
| self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) | |
| class _CookieGenerateCallbackHelper(_CallbackExceptionHelper): | |
| def __init__(self, callback): | |
| _CallbackExceptionHelper.__init__(self) | |
| def wrapper(ssl, out, outlen): | |
| try: | |
| conn = Connection._reverse_mapping[ssl] | |
| cookie = callback(conn) | |
| out[0 : len(cookie)] = cookie | |
| outlen[0] = len(cookie) | |
| return 1 | |
| except Exception as e: | |
| self._problems.append(e) | |
| # "a zero return value can be used to abort the handshake" | |
| return 0 | |
| self.callback = _ffi.callback( | |
| "int (*)(SSL *, unsigned char *, unsigned int *)", | |
| wrapper, | |
| ) | |
| class _CookieVerifyCallbackHelper(_CallbackExceptionHelper): | |
| def __init__(self, callback): | |
| _CallbackExceptionHelper.__init__(self) | |
| def wrapper(ssl, c_cookie, cookie_len): | |
| try: | |
| conn = Connection._reverse_mapping[ssl] | |
| return callback(conn, bytes(c_cookie[0:cookie_len])) | |
| except Exception as e: | |
| self._problems.append(e) | |
| return 0 | |
| self.callback = _ffi.callback( | |
| "int (*)(SSL *, unsigned char *, unsigned int)", | |
| wrapper, | |
| ) | |
| def _asFileDescriptor(obj): | |
| fd = None | |
| if not isinstance(obj, int): | |
| meth = getattr(obj, "fileno", None) | |
| if meth is not None: | |
| obj = meth() | |
| if isinstance(obj, int): | |
| fd = obj | |
| if not isinstance(fd, int): | |
| raise TypeError("argument must be an int, or have a fileno() method.") | |
| elif fd < 0: | |
| raise ValueError( | |
| "file descriptor cannot be a negative integer (%i)" % (fd,) | |
| ) | |
| return fd | |
| def OpenSSL_version(type): | |
| """ | |
| Return a string describing the version of OpenSSL in use. | |
| :param type: One of the :const:`OPENSSL_` constants defined in this module. | |
| """ | |
| return _ffi.string(_lib.OpenSSL_version(type)) | |
| SSLeay_version = OpenSSL_version | |
| def _make_requires(flag, error): | |
| """ | |
| Builds a decorator that ensures that functions that rely on OpenSSL | |
| functions that are not present in this build raise NotImplementedError, | |
| rather than AttributeError coming out of cryptography. | |
| :param flag: A cryptography flag that guards the functions, e.g. | |
| ``Cryptography_HAS_NEXTPROTONEG``. | |
| :param error: The string to be used in the exception if the flag is false. | |
| """ | |
| def _requires_decorator(func): | |
| if not flag: | |
| def explode(*args, **kwargs): | |
| raise NotImplementedError(error) | |
| return explode | |
| else: | |
| return func | |
| return _requires_decorator | |
| _requires_alpn = _make_requires( | |
| _lib.Cryptography_HAS_ALPN, "ALPN not available" | |
| ) | |
| _requires_keylog = _make_requires( | |
| getattr(_lib, "Cryptography_HAS_KEYLOG", None), "Key logging not available" | |
| ) | |
| class Session: | |
| """ | |
| A class representing an SSL session. A session defines certain connection | |
| parameters which may be re-used to speed up the setup of subsequent | |
| connections. | |
| .. versionadded:: 0.14 | |
| """ | |
| pass | |
| class Context: | |
| """ | |
| :class:`OpenSSL.SSL.Context` instances define the parameters for setting | |
| up new SSL connections. | |
| :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD, | |
| DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD. | |
| SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should | |
| not be used. | |
| """ | |
| _methods = { | |
| SSLv23_METHOD: (_lib.TLS_method, None), | |
| TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION), | |
| TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION), | |
| TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION), | |
| TLS_METHOD: (_lib.TLS_method, None), | |
| TLS_SERVER_METHOD: (_lib.TLS_server_method, None), | |
| TLS_CLIENT_METHOD: (_lib.TLS_client_method, None), | |
| DTLS_METHOD: (_lib.DTLS_method, None), | |
| DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None), | |
| DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None), | |
| } | |
| def __init__(self, method): | |
| if not isinstance(method, int): | |
| raise TypeError("method must be an integer") | |
| try: | |
| method_func, version = self._methods[method] | |
| except KeyError: | |
| raise ValueError("No such protocol") | |
| method_obj = method_func() | |
| _openssl_assert(method_obj != _ffi.NULL) | |
| context = _lib.SSL_CTX_new(method_obj) | |
| _openssl_assert(context != _ffi.NULL) | |
| context = _ffi.gc(context, _lib.SSL_CTX_free) | |
| self._context = context | |
| self._passphrase_helper = None | |
| self._passphrase_callback = None | |
| self._passphrase_userdata = None | |
| self._verify_helper = None | |
| self._verify_callback = None | |
| self._info_callback = None | |
| self._keylog_callback = None | |
| self._tlsext_servername_callback = None | |
| self._app_data = None | |
| self._alpn_select_helper = None | |
| self._alpn_select_callback = None | |
| self._ocsp_helper = None | |
| self._ocsp_callback = None | |
| self._ocsp_data = None | |
| self._cookie_generate_helper = None | |
| self._cookie_verify_helper = None | |
| self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE) | |
| if version is not None: | |
| self.set_min_proto_version(version) | |
| self.set_max_proto_version(version) | |
| def set_min_proto_version(self, version): | |
| """ | |
| Set the minimum supported protocol version. Setting the minimum | |
| version to 0 will enable protocol versions down to the lowest version | |
| supported by the library. | |
| If the underlying OpenSSL build is missing support for the selected | |
| version, this method will raise an exception. | |
| """ | |
| _openssl_assert( | |
| _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 | |
| ) | |
| def set_max_proto_version(self, version): | |
| """ | |
| Set the maximum supported protocol version. Setting the maximum | |
| version to 0 will enable protocol versions up to the highest version | |
| supported by the library. | |
| If the underlying OpenSSL build is missing support for the selected | |
| version, this method will raise an exception. | |
| """ | |
| _openssl_assert( | |
| _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 | |
| ) | |
| def load_verify_locations(self, cafile, capath=None): | |
| """ | |
| Let SSL know where we can find trusted certificates for the certificate | |
| chain. Note that the certificates have to be in PEM format. | |
| If capath is passed, it must be a directory prepared using the | |
| ``c_rehash`` tool included with OpenSSL. Either, but not both, of | |
| *pemfile* or *capath* may be :data:`None`. | |
| :param cafile: In which file we can find the certificates (``bytes`` or | |
| ``unicode``). | |
| :param capath: In which directory we can find the certificates | |
| (``bytes`` or ``unicode``). | |
| :return: None | |
| """ | |
| if cafile is None: | |
| cafile = _ffi.NULL | |
| else: | |
| cafile = _path_bytes(cafile) | |
| if capath is None: | |
| capath = _ffi.NULL | |
| else: | |
| capath = _path_bytes(capath) | |
| load_result = _lib.SSL_CTX_load_verify_locations( | |
| self._context, cafile, capath | |
| ) | |
| if not load_result: | |
| _raise_current_error() | |
| def _wrap_callback(self, callback): | |
| def wrapper(size, verify, userdata): | |
| return callback(size, verify, self._passphrase_userdata) | |
| return _PassphraseHelper( | |
| FILETYPE_PEM, wrapper, more_args=True, truncate=True | |
| ) | |
| def set_passwd_cb(self, callback, userdata=None): | |
| """ | |
| Set the passphrase callback. This function will be called | |
| when a private key with a passphrase is loaded. | |
| :param callback: The Python callback to use. This must accept three | |
| positional arguments. First, an integer giving the maximum length | |
| of the passphrase it may return. If the returned passphrase is | |
| longer than this, it will be truncated. Second, a boolean value | |
| which will be true if the user should be prompted for the | |
| passphrase twice and the callback should verify that the two values | |
| supplied are equal. Third, the value given as the *userdata* | |
| parameter to :meth:`set_passwd_cb`. The *callback* must return | |
| a byte string. If an error occurs, *callback* should return a false | |
| value (e.g. an empty string). | |
| :param userdata: (optional) A Python object which will be given as | |
| argument to the callback | |
| :return: None | |
| """ | |
| if not callable(callback): | |
| raise TypeError("callback must be callable") | |
| self._passphrase_helper = self._wrap_callback(callback) | |
| self._passphrase_callback = self._passphrase_helper.callback | |
| _lib.SSL_CTX_set_default_passwd_cb( | |
| self._context, self._passphrase_callback | |
| ) | |
| self._passphrase_userdata = userdata | |
| def set_default_verify_paths(self): | |
| """ | |
| Specify that the platform provided CA certificates are to be used for | |
| verification purposes. This method has some caveats related to the | |
| binary wheels that cryptography (pyOpenSSL's primary dependency) ships: | |
| * macOS will only load certificates using this method if the user has | |
| the ``openssl@1.1`` `Homebrew <https://brew.sh>`_ formula installed | |
| in the default location. | |
| * Windows will not work. | |
| * manylinux1 cryptography wheels will work on most common Linux | |
| distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the | |
| manylinux1 wheel and attempts to load roots via a fallback path. | |
| :return: None | |
| """ | |
| # SSL_CTX_set_default_verify_paths will attempt to load certs from | |
| # both a cafile and capath that are set at compile time. However, | |
| # it will first check environment variables and, if present, load | |
| # those paths instead | |
| set_result = _lib.SSL_CTX_set_default_verify_paths(self._context) | |
| _openssl_assert(set_result == 1) | |
| # After attempting to set default_verify_paths we need to know whether | |
| # to go down the fallback path. | |
| # First we'll check to see if any env vars have been set. If so, | |
| # we won't try to do anything else because the user has set the path | |
| # themselves. | |
| dir_env_var = _ffi.string(_lib.X509_get_default_cert_dir_env()).decode( | |
| "ascii" | |
| ) | |
| file_env_var = _ffi.string( | |
| _lib.X509_get_default_cert_file_env() | |
| ).decode("ascii") | |
| if not self._check_env_vars_set(dir_env_var, file_env_var): | |
| default_dir = _ffi.string(_lib.X509_get_default_cert_dir()) | |
| default_file = _ffi.string(_lib.X509_get_default_cert_file()) | |
| # Now we check to see if the default_dir and default_file are set | |
| # to the exact values we use in our manylinux1 builds. If they are | |
| # then we know to load the fallbacks | |
| if ( | |
| default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR | |
| and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE | |
| ): | |
| # This is manylinux1, let's load our fallback paths | |
| self._fallback_default_verify_paths( | |
| _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS | |
| ) | |
| def _check_env_vars_set(self, dir_env_var, file_env_var): | |
| """ | |
| Check to see if the default cert dir/file environment vars are present. | |
| :return: bool | |
| """ | |
| return ( | |
| os.environ.get(file_env_var) is not None | |
| or os.environ.get(dir_env_var) is not None | |
| ) | |
| def _fallback_default_verify_paths(self, file_path, dir_path): | |
| """ | |
| Default verify paths are based on the compiled version of OpenSSL. | |
| However, when pyca/cryptography is compiled as a manylinux1 wheel | |
| that compiled location can potentially be wrong. So, like Go, we | |
| will try a predefined set of paths and attempt to load roots | |
| from there. | |
| :return: None | |
| """ | |
| for cafile in file_path: | |
| if os.path.isfile(cafile): | |
| self.load_verify_locations(cafile) | |
| break | |
| for capath in dir_path: | |
| if os.path.isdir(capath): | |
| self.load_verify_locations(None, capath) | |
| break | |
| def use_certificate_chain_file(self, certfile): | |
| """ | |
| Load a certificate chain from a file. | |
| :param certfile: The name of the certificate chain file (``bytes`` or | |
| ``unicode``). Must be PEM encoded. | |
| :return: None | |
| """ | |
| certfile = _path_bytes(certfile) | |
| result = _lib.SSL_CTX_use_certificate_chain_file( | |
| self._context, certfile | |
| ) | |
| if not result: | |
| _raise_current_error() | |
| def use_certificate_file(self, certfile, filetype=FILETYPE_PEM): | |
| """ | |
| Load a certificate from a file | |
| :param certfile: The name of the certificate file (``bytes`` or | |
| ``unicode``). | |
| :param filetype: (optional) The encoding of the file, which is either | |
| :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is | |
| :const:`FILETYPE_PEM`. | |
| :return: None | |
| """ | |
| certfile = _path_bytes(certfile) | |
| if not isinstance(filetype, int): | |
| raise TypeError("filetype must be an integer") | |
| use_result = _lib.SSL_CTX_use_certificate_file( | |
| self._context, certfile, filetype | |
| ) | |
| if not use_result: | |
| _raise_current_error() | |
| def use_certificate(self, cert): | |
| """ | |
| Load a certificate from a X509 object | |
| :param cert: The X509 object | |
| :return: None | |
| """ | |
| # Mirrored at Connection.use_certificate | |
| if not isinstance(cert, X509): | |
| raise TypeError("cert must be an X509 instance") | |
| use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509) | |
| if not use_result: | |
| _raise_current_error() | |
| def add_extra_chain_cert(self, certobj): | |
| """ | |
| Add certificate to chain | |
| :param certobj: The X509 certificate object to add to the chain | |
| :return: None | |
| """ | |
| if not isinstance(certobj, X509): | |
| raise TypeError("certobj must be an X509 instance") | |
| copy = _lib.X509_dup(certobj._x509) | |
| add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy) | |
| if not add_result: | |
| # TODO: This is untested. | |
| _lib.X509_free(copy) | |
| _raise_current_error() | |
| def _raise_passphrase_exception(self): | |
| if self._passphrase_helper is not None: | |
| self._passphrase_helper.raise_if_problem(Error) | |
| _raise_current_error() | |
| def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED): | |
| """ | |
| Load a private key from a file | |
| :param keyfile: The name of the key file (``bytes`` or ``unicode``) | |
| :param filetype: (optional) The encoding of the file, which is either | |
| :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is | |
| :const:`FILETYPE_PEM`. | |
| :return: None | |
| """ | |
| keyfile = _path_bytes(keyfile) | |
| if filetype is _UNSPECIFIED: | |
| filetype = FILETYPE_PEM | |
| elif not isinstance(filetype, int): | |
| raise TypeError("filetype must be an integer") | |
| use_result = _lib.SSL_CTX_use_PrivateKey_file( | |
| self._context, keyfile, filetype | |
| ) | |
| if not use_result: | |
| self._raise_passphrase_exception() | |
| def use_privatekey(self, pkey): | |
| """ | |
| Load a private key from a PKey object | |
| :param pkey: The PKey object | |
| :return: None | |
| """ | |
| # Mirrored at Connection.use_privatekey | |
| if not isinstance(pkey, PKey): | |
| raise TypeError("pkey must be a PKey instance") | |
| use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey) | |
| if not use_result: | |
| self._raise_passphrase_exception() | |
| def check_privatekey(self): | |
| """ | |
| Check if the private key (loaded with :meth:`use_privatekey`) matches | |
| the certificate (loaded with :meth:`use_certificate`) | |
| :return: :data:`None` (raises :exc:`Error` if something's wrong) | |
| """ | |
| if not _lib.SSL_CTX_check_private_key(self._context): | |
| _raise_current_error() | |
| def load_client_ca(self, cafile): | |
| """ | |
| Load the trusted certificates that will be sent to the client. Does | |
| not actually imply any of the certificates are trusted; that must be | |
| configured separately. | |
| :param bytes cafile: The path to a certificates file in PEM format. | |
| :return: None | |
| """ | |
| ca_list = _lib.SSL_load_client_CA_file( | |
| _text_to_bytes_and_warn("cafile", cafile) | |
| ) | |
| _openssl_assert(ca_list != _ffi.NULL) | |
| _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) | |
| def set_session_id(self, buf): | |
| """ | |
| Set the session id to *buf* within which a session can be reused for | |
| this Context object. This is needed when doing session resumption, | |
| because there is no way for a stored session to know which Context | |
| object it is associated with. | |
| :param bytes buf: The session id. | |
| :returns: None | |
| """ | |
| buf = _text_to_bytes_and_warn("buf", buf) | |
| _openssl_assert( | |
| _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf)) | |
| == 1 | |
| ) | |
| def set_session_cache_mode(self, mode): | |
| """ | |
| Set the behavior of the session cache used by all connections using | |
| this Context. The previously set mode is returned. See | |
| :const:`SESS_CACHE_*` for details about particular modes. | |
| :param mode: One or more of the SESS_CACHE_* flags (combine using | |
| bitwise or) | |
| :returns: The previously set caching mode. | |
| .. versionadded:: 0.14 | |
| """ | |
| if not isinstance(mode, int): | |
| raise TypeError("mode must be an integer") | |
| return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) | |
| def get_session_cache_mode(self): | |
| """ | |
| Get the current session cache mode. | |
| :returns: The currently used cache mode. | |
| .. versionadded:: 0.14 | |
| """ | |
| return _lib.SSL_CTX_get_session_cache_mode(self._context) | |
| def set_verify(self, mode, callback=None): | |
| """ | |
| Set the verification flags for this Context object to *mode* and | |
| specify that *callback* should be used for verification callbacks. | |
| :param mode: The verify mode, this should be one of | |
| :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If | |
| :const:`VERIFY_PEER` is used, *mode* can be OR:ed with | |
| :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and | |
| :const:`VERIFY_CLIENT_ONCE` to further control the behaviour. | |
| :param callback: The optional Python verification callback to use. | |
| This should take five arguments: A Connection object, an X509 | |
| object, and three integer variables, which are in turn potential | |
| error number, error depth and return code. *callback* should | |
| return True if verification passes and False otherwise. | |
| If omitted, OpenSSL's default verification is used. | |
| :return: None | |
| See SSL_CTX_set_verify(3SSL) for further details. | |
| """ | |
| if not isinstance(mode, int): | |
| raise TypeError("mode must be an integer") | |
| if callback is None: | |
| self._verify_helper = None | |
| self._verify_callback = None | |
| _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL) | |
| else: | |
| if not callable(callback): | |
| raise TypeError("callback must be callable") | |
| self._verify_helper = _VerifyHelper(callback) | |
| self._verify_callback = self._verify_helper.callback | |
| _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) | |
| def set_verify_depth(self, depth): | |
| """ | |
| Set the maximum depth for the certificate chain verification that shall | |
| be allowed for this Context object. | |
| :param depth: An integer specifying the verify depth | |
| :return: None | |
| """ | |
| if not isinstance(depth, int): | |
| raise TypeError("depth must be an integer") | |
| _lib.SSL_CTX_set_verify_depth(self._context, depth) | |
| def get_verify_mode(self): | |
| """ | |
| Retrieve the Context object's verify mode, as set by | |
| :meth:`set_verify`. | |
| :return: The verify mode | |
| """ | |
| return _lib.SSL_CTX_get_verify_mode(self._context) | |
| def get_verify_depth(self): | |
| """ | |
| Retrieve the Context object's verify depth, as set by | |
| :meth:`set_verify_depth`. | |
| :return: The verify depth | |
| """ | |
| return _lib.SSL_CTX_get_verify_depth(self._context) | |
| def load_tmp_dh(self, dhfile): | |
| """ | |
| Load parameters for Ephemeral Diffie-Hellman | |
| :param dhfile: The file to load EDH parameters from (``bytes`` or | |
| ``unicode``). | |
| :return: None | |
| """ | |
| dhfile = _path_bytes(dhfile) | |
| bio = _lib.BIO_new_file(dhfile, b"r") | |
| if bio == _ffi.NULL: | |
| _raise_current_error() | |
| bio = _ffi.gc(bio, _lib.BIO_free) | |
| dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) | |
| dh = _ffi.gc(dh, _lib.DH_free) | |
| res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) | |
| _openssl_assert(res == 1) | |
| def set_tmp_ecdh(self, curve): | |
| """ | |
| Select a curve to use for ECDHE key exchange. | |
| :param curve: A curve object to use as returned by either | |
| :meth:`OpenSSL.crypto.get_elliptic_curve` or | |
| :meth:`OpenSSL.crypto.get_elliptic_curves`. | |
| :return: None | |
| """ | |
| _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY()) | |
| def set_cipher_list(self, cipher_list): | |
| """ | |
| Set the list of ciphers to be used in this context. | |
| See the OpenSSL manual for more information (e.g. | |
| :manpage:`ciphers(1)`). | |
| :param bytes cipher_list: An OpenSSL cipher string. | |
| :return: None | |
| """ | |
| cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list) | |
| if not isinstance(cipher_list, bytes): | |
| raise TypeError("cipher_list must be a byte string.") | |
| _openssl_assert( | |
| _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1 | |
| ) | |
| # In OpenSSL 1.1.1 setting the cipher list will always return TLS 1.3 | |
| # ciphers even if you pass an invalid cipher. Applications (like | |
| # Twisted) have tests that depend on an error being raised if an | |
| # invalid cipher string is passed, but without the following check | |
| # for the TLS 1.3 specific cipher suites it would never error. | |
| tmpconn = Connection(self, None) | |
| if tmpconn.get_cipher_list() == [ | |
| "TLS_AES_256_GCM_SHA384", | |
| "TLS_CHACHA20_POLY1305_SHA256", | |
| "TLS_AES_128_GCM_SHA256", | |
| ]: | |
| raise Error( | |
| [ | |
| ( | |
| "SSL routines", | |
| "SSL_CTX_set_cipher_list", | |
| "no cipher match", | |
| ), | |
| ], | |
| ) | |
| def set_client_ca_list(self, certificate_authorities): | |
| """ | |
| Set the list of preferred client certificate signers for this server | |
| context. | |
| This list of certificate authorities will be sent to the client when | |
| the server requests a client certificate. | |
| :param certificate_authorities: a sequence of X509Names. | |
| :return: None | |
| .. versionadded:: 0.10 | |
| """ | |
| name_stack = _lib.sk_X509_NAME_new_null() | |
| _openssl_assert(name_stack != _ffi.NULL) | |
| try: | |
| for ca_name in certificate_authorities: | |
| if not isinstance(ca_name, X509Name): | |
| raise TypeError( | |
| "client CAs must be X509Name objects, not %s " | |
| "objects" % (type(ca_name).__name__,) | |
| ) | |
| copy = _lib.X509_NAME_dup(ca_name._name) | |
| _openssl_assert(copy != _ffi.NULL) | |
| push_result = _lib.sk_X509_NAME_push(name_stack, copy) | |
| if not push_result: | |
| _lib.X509_NAME_free(copy) | |
| _raise_current_error() | |
| except Exception: | |
| _lib.sk_X509_NAME_free(name_stack) | |
| raise | |
| _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) | |
| def add_client_ca(self, certificate_authority): | |
| """ | |
| Add the CA certificate to the list of preferred signers for this | |
| context. | |
| The list of certificate authorities will be sent to the client when the | |
| server requests a client certificate. | |
| :param certificate_authority: certificate authority's X509 certificate. | |
| :return: None | |
| .. versionadded:: 0.10 | |
| """ | |
| if not isinstance(certificate_authority, X509): | |
| raise TypeError("certificate_authority must be an X509 instance") | |
| add_result = _lib.SSL_CTX_add_client_CA( | |
| self._context, certificate_authority._x509 | |
| ) | |
| _openssl_assert(add_result == 1) | |
| def set_timeout(self, timeout): | |
| """ | |
| Set the timeout for newly created sessions for this Context object to | |
| *timeout*. The default value is 300 seconds. See the OpenSSL manual | |
| for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`). | |
| :param timeout: The timeout in (whole) seconds | |
| :return: The previous session timeout | |
| """ | |
| if not isinstance(timeout, int): | |
| raise TypeError("timeout must be an integer") | |
| return _lib.SSL_CTX_set_timeout(self._context, timeout) | |
| def get_timeout(self): | |
| """ | |
| Retrieve session timeout, as set by :meth:`set_timeout`. The default | |
| is 300 seconds. | |
| :return: The session timeout | |
| """ | |
| return _lib.SSL_CTX_get_timeout(self._context) | |
| def set_info_callback(self, callback): | |
| """ | |
| Set the information callback to *callback*. This function will be | |
| called from time to time during SSL handshakes. | |
| :param callback: The Python callback to use. This should take three | |
| arguments: a Connection object and two integers. The first integer | |
| specifies where in the SSL handshake the function was called, and | |
| the other the return code from a (possibly failed) internal | |
| function call. | |
| :return: None | |
| """ | |
| def wrapper(ssl, where, return_code): | |
| callback(Connection._reverse_mapping[ssl], where, return_code) | |
| self._info_callback = _ffi.callback( | |
| "void (*)(const SSL *, int, int)", wrapper | |
| ) | |
| _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) | |
| def set_keylog_callback(self, callback): | |
| """ | |
| Set the TLS key logging callback to *callback*. This function will be | |
| called whenever TLS key material is generated or received, in order | |
| to allow applications to store this keying material for debugging | |
| purposes. | |
| :param callback: The Python callback to use. This should take two | |
| arguments: a Connection object and a bytestring that contains | |
| the key material in the format used by NSS for its SSLKEYLOGFILE | |
| debugging output. | |
| :return: None | |
| """ | |
| def wrapper(ssl, line): | |
| line = _ffi.string(line) | |
| callback(Connection._reverse_mapping[ssl], line) | |
| self._keylog_callback = _ffi.callback( | |
| "void (*)(const SSL *, const char *)", wrapper | |
| ) | |
| _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback) | |
| def get_app_data(self): | |
| """ | |
| Get the application data (supplied via :meth:`set_app_data()`) | |
| :return: The application data | |
| """ | |
| return self._app_data | |
| def set_app_data(self, data): | |
| """ | |
| Set the application data (will be returned from get_app_data()) | |
| :param data: Any Python object | |
| :return: None | |
| """ | |
| self._app_data = data | |
| def get_cert_store(self): | |
| """ | |
| Get the certificate store for the context. This can be used to add | |
| "trusted" certificates without using the | |
| :meth:`load_verify_locations` method. | |
| :return: A X509Store object or None if it does not have one. | |
| """ | |
| store = _lib.SSL_CTX_get_cert_store(self._context) | |
| if store == _ffi.NULL: | |
| # TODO: This is untested. | |
| return None | |
| pystore = X509Store.__new__(X509Store) | |
| pystore._store = store | |
| return pystore | |
| def set_options(self, options): | |
| """ | |
| Add options. Options set before are not cleared! | |
| This method should be used with the :const:`OP_*` constants. | |
| :param options: The options to add. | |
| :return: The new option bitmask. | |
| """ | |
| if not isinstance(options, int): | |
| raise TypeError("options must be an integer") | |
| return _lib.SSL_CTX_set_options(self._context, options) | |
| def set_mode(self, mode): | |
| """ | |
| Add modes via bitmask. Modes set before are not cleared! This method | |
| should be used with the :const:`MODE_*` constants. | |
| :param mode: The mode to add. | |
| :return: The new mode bitmask. | |
| """ | |
| if not isinstance(mode, int): | |
| raise TypeError("mode must be an integer") | |
| return _lib.SSL_CTX_set_mode(self._context, mode) | |
| def set_tlsext_servername_callback(self, callback): | |
| """ | |
| Specify a callback function to be called when clients specify a server | |
| name. | |
| :param callback: The callback function. It will be invoked with one | |
| argument, the Connection instance. | |
| .. versionadded:: 0.13 | |
| """ | |
| def wrapper(ssl, alert, arg): | |
| callback(Connection._reverse_mapping[ssl]) | |
| return 0 | |
| self._tlsext_servername_callback = _ffi.callback( | |
| "int (*)(SSL *, int *, void *)", wrapper | |
| ) | |
| _lib.SSL_CTX_set_tlsext_servername_callback( | |
| self._context, self._tlsext_servername_callback | |
| ) | |
| def set_tlsext_use_srtp(self, profiles): | |
| """ | |
| Enable support for negotiating SRTP keying material. | |
| :param bytes profiles: A colon delimited list of protection profile | |
| names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``. | |
| :return: None | |
| """ | |
| if not isinstance(profiles, bytes): | |
| raise TypeError("profiles must be a byte string.") | |
| _openssl_assert( | |
| _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 | |
| ) | |
| def set_alpn_protos(self, protos): | |
| """ | |
| Specify the protocols that the client is prepared to speak after the | |
| TLS connection has been negotiated using Application Layer Protocol | |
| Negotiation. | |
| :param protos: A list of the protocols to be offered to the server. | |
| This list should be a Python list of bytestrings representing the | |
| protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. | |
| """ | |
| # Different versions of OpenSSL are inconsistent about how they handle | |
| # empty proto lists (see #1043), so we avoid the problem entirely by | |
| # rejecting them ourselves. | |
| if not protos: | |
| raise ValueError("at least one protocol must be specified") | |
| # Take the list of protocols and join them together, prefixing them | |
| # with their lengths. | |
| protostr = b"".join( | |
| chain.from_iterable((bytes((len(p),)), p) for p in protos) | |
| ) | |
| # Build a C string from the list. We don't need to save this off | |
| # because OpenSSL immediately copies the data out. | |
| input_str = _ffi.new("unsigned char[]", protostr) | |
| # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: | |
| # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() | |
| # return 0 on success, and non-0 on failure. | |
| # WARNING: these functions reverse the return value convention. | |
| _openssl_assert( | |
| _lib.SSL_CTX_set_alpn_protos( | |
| self._context, input_str, len(protostr) | |
| ) | |
| == 0 | |
| ) | |
| def set_alpn_select_callback(self, callback): | |
| """ | |
| Specify a callback function that will be called on the server when a | |
| client offers protocols using ALPN. | |
| :param callback: The callback function. It will be invoked with two | |
| arguments: the Connection, and a list of offered protocols as | |
| bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return | |
| one of those bytestrings to indicate the chosen protocol, the | |
| empty bytestring to terminate the TLS connection, or the | |
| :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered | |
| protocol was selected, but that the connection should not be | |
| aborted. | |
| """ | |
| self._alpn_select_helper = _ALPNSelectHelper(callback) | |
| self._alpn_select_callback = self._alpn_select_helper.callback | |
| _lib.SSL_CTX_set_alpn_select_cb( | |
| self._context, self._alpn_select_callback, _ffi.NULL | |
| ) | |
| def _set_ocsp_callback(self, helper, data): | |
| """ | |
| This internal helper does the common work for | |
| ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is | |
| almost all of it. | |
| """ | |
| self._ocsp_helper = helper | |
| self._ocsp_callback = helper.callback | |
| if data is None: | |
| self._ocsp_data = _ffi.NULL | |
| else: | |
| self._ocsp_data = _ffi.new_handle(data) | |
| rc = _lib.SSL_CTX_set_tlsext_status_cb( | |
| self._context, self._ocsp_callback | |
| ) | |
| _openssl_assert(rc == 1) | |
| rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) | |
| _openssl_assert(rc == 1) | |
| def set_ocsp_server_callback(self, callback, data=None): | |
| """ | |
| Set a callback to provide OCSP data to be stapled to the TLS handshake | |
| on the server side. | |
| :param callback: The callback function. It will be invoked with two | |
| arguments: the Connection, and the optional arbitrary data you have | |
| provided. The callback must return a bytestring that contains the | |
| OCSP data to staple to the handshake. If no OCSP data is available | |
| for this connection, return the empty bytestring. | |
| :param data: Some opaque data that will be passed into the callback | |
| function when called. This can be used to avoid needing to do | |
| complex data lookups or to keep track of what context is being | |
| used. This parameter is optional. | |
| """ | |
| helper = _OCSPServerCallbackHelper(callback) | |
| self._set_ocsp_callback(helper, data) | |
| def set_ocsp_client_callback(self, callback, data=None): | |
| """ | |
| Set a callback to validate OCSP data stapled to the TLS handshake on | |
| the client side. | |
| :param callback: The callback function. It will be invoked with three | |
| arguments: the Connection, a bytestring containing the stapled OCSP | |
| assertion, and the optional arbitrary data you have provided. The | |
| callback must return a boolean that indicates the result of | |
| validating the OCSP data: ``True`` if the OCSP data is valid and | |
| the certificate can be trusted, or ``False`` if either the OCSP | |
| data is invalid or the certificate has been revoked. | |
| :param data: Some opaque data that will be passed into the callback | |
| function when called. This can be used to avoid needing to do | |
| complex data lookups or to keep track of what context is being | |
| used. This parameter is optional. | |
| """ | |
| helper = _OCSPClientCallbackHelper(callback) | |
| self._set_ocsp_callback(helper, data) | |
| def set_cookie_generate_callback(self, callback): | |
| self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback) | |
| _lib.SSL_CTX_set_cookie_generate_cb( | |
| self._context, | |
| self._cookie_generate_helper.callback, | |
| ) | |
| def set_cookie_verify_callback(self, callback): | |
| self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback) | |
| _lib.SSL_CTX_set_cookie_verify_cb( | |
| self._context, | |
| self._cookie_verify_helper.callback, | |
| ) | |
| class Connection: | |
| _reverse_mapping = WeakValueDictionary() | |
| def __init__(self, context, socket=None): | |
| """ | |
| Create a new Connection object, using the given OpenSSL.SSL.Context | |
| instance and socket. | |
| :param context: An SSL Context to use for this connection | |
| :param socket: The socket to use for transport layer | |
| """ | |
| if not isinstance(context, Context): | |
| raise TypeError("context must be a Context instance") | |
| ssl = _lib.SSL_new(context._context) | |
| self._ssl = _ffi.gc(ssl, _lib.SSL_free) | |
| # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns | |
| # an SSL_ERROR_WANT_READ when processing a non-application data packet | |
| # even though there is still data on the underlying transport. | |
| # See https://github.com/openssl/openssl/issues/6234 for more details. | |
| _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY) | |
| self._context = context | |
| self._app_data = None | |
| # References to strings used for Application Layer Protocol | |
| # Negotiation. These strings get copied at some point but it's well | |
| # after the callback returns, so we have to hang them somewhere to | |
| # avoid them getting freed. | |
| self._alpn_select_callback_args = None | |
| # Reference the verify_callback of the Context. This ensures that if | |
| # set_verify is called again after the SSL object has been created we | |
| # do not point to a dangling reference | |
| self._verify_helper = context._verify_helper | |
| self._verify_callback = context._verify_callback | |
| # And likewise for the cookie callbacks | |
| self._cookie_generate_helper = context._cookie_generate_helper | |
| self._cookie_verify_helper = context._cookie_verify_helper | |
| self._reverse_mapping[self._ssl] = self | |
| if socket is None: | |
| self._socket = None | |
| # Don't set up any gc for these, SSL_free will take care of them. | |
| self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem()) | |
| _openssl_assert(self._into_ssl != _ffi.NULL) | |
| self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem()) | |
| _openssl_assert(self._from_ssl != _ffi.NULL) | |
| _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl) | |
| else: | |
| self._into_ssl = None | |
| self._from_ssl = None | |
| self._socket = socket | |
| set_result = _lib.SSL_set_fd( | |
| self._ssl, _asFileDescriptor(self._socket) | |
| ) | |
| _openssl_assert(set_result == 1) | |
| def __getattr__(self, name): | |
| """ | |
| Look up attributes on the wrapped socket object if they are not found | |
| on the Connection object. | |
| """ | |
| if self._socket is None: | |
| raise AttributeError( | |
| "'%s' object has no attribute '%s'" | |
| % (self.__class__.__name__, name) | |
| ) | |
| else: | |
| return getattr(self._socket, name) | |
| def _raise_ssl_error(self, ssl, result): | |
| if self._context._verify_helper is not None: | |
| self._context._verify_helper.raise_if_problem() | |
| if self._context._alpn_select_helper is not None: | |
| self._context._alpn_select_helper.raise_if_problem() | |
| if self._context._ocsp_helper is not None: | |
| self._context._ocsp_helper.raise_if_problem() | |
| error = _lib.SSL_get_error(ssl, result) | |
| if error == _lib.SSL_ERROR_WANT_READ: | |
| raise WantReadError() | |
| elif error == _lib.SSL_ERROR_WANT_WRITE: | |
| raise WantWriteError() | |
| elif error == _lib.SSL_ERROR_ZERO_RETURN: | |
| raise ZeroReturnError() | |
| elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP: | |
| # TODO: This is untested. | |
| raise WantX509LookupError() | |
| elif error == _lib.SSL_ERROR_SYSCALL: | |
| if _lib.ERR_peek_error() == 0: | |
| if result < 0: | |
| if platform == "win32": | |
| errno = _ffi.getwinerror()[0] | |
| else: | |
| errno = _ffi.errno | |
| if errno != 0: | |
| raise SysCallError(errno, errorcode.get(errno)) | |
| raise SysCallError(-1, "Unexpected EOF") | |
| else: | |
| # TODO: This is untested. | |
| _raise_current_error() | |
| elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0: | |
| # In 3.0.x an unexpected EOF no longer triggers syscall error | |
| # but we want to maintain compatibility so we check here and | |
| # raise syscall if it is an EOF. Since we're not actually sure | |
| # what else could raise SSL_ERROR_SSL we check for the presence | |
| # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING | |
| # and if it's not present we just raise an error, which matches | |
| # the behavior before we added this elif section | |
| peeked_error = _lib.ERR_peek_error() | |
| reason = _lib.ERR_GET_REASON(peeked_error) | |
| if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING: | |
| _openssl_assert( | |
| reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING | |
| ) | |
| _lib.ERR_clear_error() | |
| raise SysCallError(-1, "Unexpected EOF") | |
| else: | |
| _raise_current_error() | |
| elif error == _lib.SSL_ERROR_NONE: | |
| pass | |
| else: | |
| _raise_current_error() | |
| def get_context(self): | |
| """ | |
| Retrieve the :class:`Context` object associated with this | |
| :class:`Connection`. | |
| """ | |
| return self._context | |
| def set_context(self, context): | |
| """ | |
| Switch this connection to a new session context. | |
| :param context: A :class:`Context` instance giving the new session | |
| context to use. | |
| """ | |
| if not isinstance(context, Context): | |
| raise TypeError("context must be a Context instance") | |
| _lib.SSL_set_SSL_CTX(self._ssl, context._context) | |
| self._context = context | |
| def get_servername(self): | |
| """ | |
| Retrieve the servername extension value if provided in the client hello | |
| message, or None if there wasn't one. | |
| :return: A byte string giving the server name or :data:`None`. | |
| .. versionadded:: 0.13 | |
| """ | |
| name = _lib.SSL_get_servername( | |
| self._ssl, _lib.TLSEXT_NAMETYPE_host_name | |
| ) | |
| if name == _ffi.NULL: | |
| return None | |
| return _ffi.string(name) | |
| def set_verify(self, mode, callback=None): | |
| """ | |
| Override the Context object's verification flags for this specific | |
| connection. See :py:meth:`Context.set_verify` for details. | |
| """ | |
| if not isinstance(mode, int): | |
| raise TypeError("mode must be an integer") | |
| if callback is None: | |
| self._verify_helper = None | |
| self._verify_callback = None | |
| _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL) | |
| else: | |
| if not callable(callback): | |
| raise TypeError("callback must be callable") | |
| self._verify_helper = _VerifyHelper(callback) | |
| self._verify_callback = self._verify_helper.callback | |
| _lib.SSL_set_verify(self._ssl, mode, self._verify_callback) | |
| def get_verify_mode(self): | |
| """ | |
| Retrieve the Connection object's verify mode, as set by | |
| :meth:`set_verify`. | |
| :return: The verify mode | |
| """ | |
| return _lib.SSL_get_verify_mode(self._ssl) | |
| def use_certificate(self, cert): | |
| """ | |
| Load a certificate from a X509 object | |
| :param cert: The X509 object | |
| :return: None | |
| """ | |
| # Mirrored from Context.use_certificate | |
| if not isinstance(cert, X509): | |
| raise TypeError("cert must be an X509 instance") | |
| use_result = _lib.SSL_use_certificate(self._ssl, cert._x509) | |
| if not use_result: | |
| _raise_current_error() | |
| def use_privatekey(self, pkey): | |
| """ | |
| Load a private key from a PKey object | |
| :param pkey: The PKey object | |
| :return: None | |
| """ | |
| # Mirrored from Context.use_privatekey | |
| if not isinstance(pkey, PKey): | |
| raise TypeError("pkey must be a PKey instance") | |
| use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey) | |
| if not use_result: | |
| self._context._raise_passphrase_exception() | |
| def set_ciphertext_mtu(self, mtu): | |
| """ | |
| For DTLS, set the maximum UDP payload size (*not* including IP/UDP | |
| overhead). | |
| Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent | |
| OpenSSL from spontaneously clearing this. | |
| :param mtu: An integer giving the maximum transmission unit. | |
| .. versionadded:: 21.1 | |
| """ | |
| _lib.SSL_set_mtu(self._ssl, mtu) | |
| def get_cleartext_mtu(self): | |
| """ | |
| For DTLS, get the maximum size of unencrypted data you can pass to | |
| :meth:`write` without exceeding the MTU (as passed to | |
| :meth:`set_ciphertext_mtu`). | |
| :return: The effective MTU as an integer. | |
| .. versionadded:: 21.1 | |
| """ | |
| if not hasattr(_lib, "DTLS_get_data_mtu"): | |
| raise NotImplementedError("requires OpenSSL 1.1.1 or better") | |
| return _lib.DTLS_get_data_mtu(self._ssl) | |
| def set_tlsext_host_name(self, name): | |
| """ | |
| Set the value of the servername extension to send in the client hello. | |
| :param name: A byte string giving the name. | |
| .. versionadded:: 0.13 | |
| """ | |
| if not isinstance(name, bytes): | |
| raise TypeError("name must be a byte string") | |
| elif b"\0" in name: | |
| raise TypeError("name must not contain NUL byte") | |
| # XXX I guess this can fail sometimes? | |
| _lib.SSL_set_tlsext_host_name(self._ssl, name) | |
| def pending(self): | |
| """ | |
| Get the number of bytes that can be safely read from the SSL buffer | |
| (**not** the underlying transport buffer). | |
| :return: The number of bytes available in the receive buffer. | |
| """ | |
| return _lib.SSL_pending(self._ssl) | |
| def send(self, buf, flags=0): | |
| """ | |
| Send data on the connection. NOTE: If you get one of the WantRead, | |
| WantWrite or WantX509Lookup exceptions on this, you have to call the | |
| method again with the SAME buffer. | |
| :param buf: The string, buffer or memoryview to send | |
| :param flags: (optional) Included for compatibility with the socket | |
| API, the value is ignored | |
| :return: The number of bytes written | |
| """ | |
| # Backward compatibility | |
| buf = _text_to_bytes_and_warn("buf", buf) | |
| with _ffi.from_buffer(buf) as data: | |
| # check len(buf) instead of len(data) for testability | |
| if len(buf) > 2147483647: | |
| raise ValueError( | |
| "Cannot send more than 2**31-1 bytes at once." | |
| ) | |
| result = _lib.SSL_write(self._ssl, data, len(data)) | |
| self._raise_ssl_error(self._ssl, result) | |
| return result | |
| write = send | |
| def sendall(self, buf, flags=0): | |
| """ | |
| Send "all" data on the connection. This calls send() repeatedly until | |
| all data is sent. If an error occurs, it's impossible to tell how much | |
| data has been sent. | |
| :param buf: The string, buffer or memoryview to send | |
| :param flags: (optional) Included for compatibility with the socket | |
| API, the value is ignored | |
| :return: The number of bytes written | |
| """ | |
| buf = _text_to_bytes_and_warn("buf", buf) | |
| with _ffi.from_buffer(buf) as data: | |
| left_to_send = len(buf) | |
| total_sent = 0 | |
| while left_to_send: | |
| # SSL_write's num arg is an int, | |
| # so we cannot send more than 2**31-1 bytes at once. | |
| result = _lib.SSL_write( | |
| self._ssl, data + total_sent, min(left_to_send, 2147483647) | |
| ) | |
| self._raise_ssl_error(self._ssl, result) | |
| total_sent += result | |
| left_to_send -= result | |
| return total_sent | |
| def recv(self, bufsiz, flags=None): | |
| """ | |
| Receive data on the connection. | |
| :param bufsiz: The maximum number of bytes to read | |
| :param flags: (optional) The only supported flag is ``MSG_PEEK``, | |
| all other flags are ignored. | |
| :return: The string read from the Connection | |
| """ | |
| buf = _no_zero_allocator("char[]", bufsiz) | |
| if flags is not None and flags & socket.MSG_PEEK: | |
| result = _lib.SSL_peek(self._ssl, buf, bufsiz) | |
| else: | |
| result = _lib.SSL_read(self._ssl, buf, bufsiz) | |
| self._raise_ssl_error(self._ssl, result) | |
| return _ffi.buffer(buf, result)[:] | |
| read = recv | |
| def recv_into(self, buffer, nbytes=None, flags=None): | |
| """ | |
| Receive data on the connection and copy it directly into the provided | |
| buffer, rather than creating a new string. | |
| :param buffer: The buffer to copy into. | |
| :param nbytes: (optional) The maximum number of bytes to read into the | |
| buffer. If not present, defaults to the size of the buffer. If | |
| larger than the size of the buffer, is reduced to the size of the | |
| buffer. | |
| :param flags: (optional) The only supported flag is ``MSG_PEEK``, | |
| all other flags are ignored. | |
| :return: The number of bytes read into the buffer. | |
| """ | |
| if nbytes is None: | |
| nbytes = len(buffer) | |
| else: | |
| nbytes = min(nbytes, len(buffer)) | |
| # We need to create a temporary buffer. This is annoying, it would be | |
| # better if we could pass memoryviews straight into the SSL_read call, | |
| # but right now we can't. Revisit this if CFFI gets that ability. | |
| buf = _no_zero_allocator("char[]", nbytes) | |
| if flags is not None and flags & socket.MSG_PEEK: | |
| result = _lib.SSL_peek(self._ssl, buf, nbytes) | |
| else: | |
| result = _lib.SSL_read(self._ssl, buf, nbytes) | |
| self._raise_ssl_error(self._ssl, result) | |
| # This strange line is all to avoid a memory copy. The buffer protocol | |
| # should allow us to assign a CFFI buffer to the LHS of this line, but | |
| # on CPython 3.3+ that segfaults. As a workaround, we can temporarily | |
| # wrap it in a memoryview. | |
| buffer[:result] = memoryview(_ffi.buffer(buf, result)) | |
| return result | |
| def _handle_bio_errors(self, bio, result): | |
| if _lib.BIO_should_retry(bio): | |
| if _lib.BIO_should_read(bio): | |
| raise WantReadError() | |
| elif _lib.BIO_should_write(bio): | |
| # TODO: This is untested. | |
| raise WantWriteError() | |
| elif _lib.BIO_should_io_special(bio): | |
| # TODO: This is untested. I think io_special means the socket | |
| # BIO has a not-yet connected socket. | |
| raise ValueError("BIO_should_io_special") | |
| else: | |
| # TODO: This is untested. | |
| raise ValueError("unknown bio failure") | |
| else: | |
| # TODO: This is untested. | |
| _raise_current_error() | |
| def bio_read(self, bufsiz): | |
| """ | |
| If the Connection was created with a memory BIO, this method can be | |
| used to read bytes from the write end of that memory BIO. Many | |
| Connection methods will add bytes which must be read in this manner or | |
| the buffer will eventually fill up and the Connection will be able to | |
| take no further actions. | |
| :param bufsiz: The maximum number of bytes to read | |
| :return: The string read. | |
| """ | |
| if self._from_ssl is None: | |
| raise TypeError("Connection sock was not None") | |
| if not isinstance(bufsiz, int): | |
| raise TypeError("bufsiz must be an integer") | |
| buf = _no_zero_allocator("char[]", bufsiz) | |
| result = _lib.BIO_read(self._from_ssl, buf, bufsiz) | |
| if result <= 0: | |
| self._handle_bio_errors(self._from_ssl, result) | |
| return _ffi.buffer(buf, result)[:] | |
| def bio_write(self, buf): | |
| """ | |
| If the Connection was created with a memory BIO, this method can be | |
| used to add bytes to the read end of that memory BIO. The Connection | |
| can then read the bytes (for example, in response to a call to | |
| :meth:`recv`). | |
| :param buf: The string to put into the memory BIO. | |
| :return: The number of bytes written | |
| """ | |
| buf = _text_to_bytes_and_warn("buf", buf) | |
| if self._into_ssl is None: | |
| raise TypeError("Connection sock was not None") | |
| with _ffi.from_buffer(buf) as data: | |
| result = _lib.BIO_write(self._into_ssl, data, len(data)) | |
| if result <= 0: | |
| self._handle_bio_errors(self._into_ssl, result) | |
| return result | |
| def renegotiate(self): | |
| """ | |
| Renegotiate the session. | |
| :return: True if the renegotiation can be started, False otherwise | |
| :rtype: bool | |
| """ | |
| if not self.renegotiate_pending(): | |
| _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1) | |
| return True | |
| return False | |
| def do_handshake(self): | |
| """ | |
| Perform an SSL handshake (usually called after :meth:`renegotiate` or | |
| one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can | |
| raise the same exceptions as :meth:`send` and :meth:`recv`. | |
| :return: None. | |
| """ | |
| result = _lib.SSL_do_handshake(self._ssl) | |
| self._raise_ssl_error(self._ssl, result) | |
| def renegotiate_pending(self): | |
| """ | |
| Check if there's a renegotiation in progress, it will return False once | |
| a renegotiation is finished. | |
| :return: Whether there's a renegotiation in progress | |
| :rtype: bool | |
| """ | |
| return _lib.SSL_renegotiate_pending(self._ssl) == 1 | |
| def total_renegotiations(self): | |
| """ | |
| Find out the total number of renegotiations. | |
| :return: The number of renegotiations. | |
| :rtype: int | |
| """ | |
| return _lib.SSL_total_renegotiations(self._ssl) | |
| def connect(self, addr): | |
| """ | |
| Call the :meth:`connect` method of the underlying socket and set up SSL | |
| on the socket, using the :class:`Context` object supplied to this | |
| :class:`Connection` object at creation. | |
| :param addr: A remote address | |
| :return: What the socket's connect method returns | |
| """ | |
| _lib.SSL_set_connect_state(self._ssl) | |
| return self._socket.connect(addr) | |
| def connect_ex(self, addr): | |
| """ | |
| Call the :meth:`connect_ex` method of the underlying socket and set up | |
| SSL on the socket, using the Context object supplied to this Connection | |
| object at creation. Note that if the :meth:`connect_ex` method of the | |
| socket doesn't return 0, SSL won't be initialized. | |
| :param addr: A remove address | |
| :return: What the socket's connect_ex method returns | |
| """ | |
| connect_ex = self._socket.connect_ex | |
| self.set_connect_state() | |
| return connect_ex(addr) | |
| def accept(self): | |
| """ | |
| Call the :meth:`accept` method of the underlying socket and set up SSL | |
| on the returned socket, using the Context object supplied to this | |
| :class:`Connection` object at creation. | |
| :return: A *(conn, addr)* pair where *conn* is the new | |
| :class:`Connection` object created, and *address* is as returned by | |
| the socket's :meth:`accept`. | |
| """ | |
| client, addr = self._socket.accept() | |
| conn = Connection(self._context, client) | |
| conn.set_accept_state() | |
| return (conn, addr) | |
| def DTLSv1_listen(self): | |
| """ | |
| Call the OpenSSL function DTLSv1_listen on this connection. See the | |
| OpenSSL manual for more details. | |
| :return: None | |
| """ | |
| # Possible future extension: return the BIO_ADDR in some form. | |
| bio_addr = _lib.BIO_ADDR_new() | |
| try: | |
| result = _lib.DTLSv1_listen(self._ssl, bio_addr) | |
| finally: | |
| _lib.BIO_ADDR_free(bio_addr) | |
| # DTLSv1_listen is weird. A zero return value means 'didn't find a | |
| # ClientHello with valid cookie, but keep trying'. So basically | |
| # WantReadError. But it doesn't work correctly with _raise_ssl_error. | |
| # So we raise it manually instead. | |
| if self._cookie_generate_helper is not None: | |
| self._cookie_generate_helper.raise_if_problem() | |
| if self._cookie_verify_helper is not None: | |
| self._cookie_verify_helper.raise_if_problem() | |
| if result == 0: | |
| raise WantReadError() | |
| if result < 0: | |
| self._raise_ssl_error(self._ssl, result) | |
| def bio_shutdown(self): | |
| """ | |
| If the Connection was created with a memory BIO, this method can be | |
| used to indicate that *end of file* has been reached on the read end of | |
| that memory BIO. | |
| :return: None | |
| """ | |
| if self._from_ssl is None: | |
| raise TypeError("Connection sock was not None") | |
| _lib.BIO_set_mem_eof_return(self._into_ssl, 0) | |
| def shutdown(self): | |
| """ | |
| Send the shutdown message to the Connection. | |
| :return: True if the shutdown completed successfully (i.e. both sides | |
| have sent closure alerts), False otherwise (in which case you | |
| call :meth:`recv` or :meth:`send` when the connection becomes | |
| readable/writeable). | |
| """ | |
| result = _lib.SSL_shutdown(self._ssl) | |
| if result < 0: | |
| self._raise_ssl_error(self._ssl, result) | |
| elif result > 0: | |
| return True | |
| else: | |
| return False | |
| def get_cipher_list(self): | |
| """ | |
| Retrieve the list of ciphers used by the Connection object. | |
| :return: A list of native cipher strings. | |
| """ | |
| ciphers = [] | |
| for i in count(): | |
| result = _lib.SSL_get_cipher_list(self._ssl, i) | |
| if result == _ffi.NULL: | |
| break | |
| ciphers.append(_ffi.string(result).decode("utf-8")) | |
| return ciphers | |
| def get_client_ca_list(self): | |
| """ | |
| Get CAs whose certificates are suggested for client authentication. | |
| :return: If this is a server connection, the list of certificate | |
| authorities that will be sent or has been sent to the client, as | |
| controlled by this :class:`Connection`'s :class:`Context`. | |
| If this is a client connection, the list will be empty until the | |
| connection with the server is established. | |
| .. versionadded:: 0.10 | |
| """ | |
| ca_names = _lib.SSL_get_client_CA_list(self._ssl) | |
| if ca_names == _ffi.NULL: | |
| # TODO: This is untested. | |
| return [] | |
| result = [] | |
| for i in range(_lib.sk_X509_NAME_num(ca_names)): | |
| name = _lib.sk_X509_NAME_value(ca_names, i) | |
| copy = _lib.X509_NAME_dup(name) | |
| _openssl_assert(copy != _ffi.NULL) | |
| pyname = X509Name.__new__(X509Name) | |
| pyname._name = _ffi.gc(copy, _lib.X509_NAME_free) | |
| result.append(pyname) | |
| return result | |
| def makefile(self, *args, **kwargs): | |
| """ | |
| The makefile() method is not implemented, since there is no dup | |
| semantics for SSL connections | |
| :raise: NotImplementedError | |
| """ | |
| raise NotImplementedError( | |
| "Cannot make file object of OpenSSL.SSL.Connection" | |
| ) | |
| def get_app_data(self): | |
| """ | |
| Retrieve application data as set by :meth:`set_app_data`. | |
| :return: The application data | |
| """ | |
| return self._app_data | |
| def set_app_data(self, data): | |
| """ | |
| Set application data | |
| :param data: The application data | |
| :return: None | |
| """ | |
| self._app_data = data | |
| def get_shutdown(self): | |
| """ | |
| Get the shutdown state of the Connection. | |
| :return: The shutdown state, a bitvector of SENT_SHUTDOWN, | |
| RECEIVED_SHUTDOWN. | |
| """ | |
| return _lib.SSL_get_shutdown(self._ssl) | |
| def set_shutdown(self, state): | |
| """ | |
| Set the shutdown state of the Connection. | |
| :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN. | |
| :return: None | |
| """ | |
| if not isinstance(state, int): | |
| raise TypeError("state must be an integer") | |
| _lib.SSL_set_shutdown(self._ssl, state) | |
| def get_state_string(self): | |
| """ | |
| Retrieve a verbose string detailing the state of the Connection. | |
| :return: A string representing the state | |
| :rtype: bytes | |
| """ | |
| return _ffi.string(_lib.SSL_state_string_long(self._ssl)) | |
| def server_random(self): | |
| """ | |
| Retrieve the random value used with the server hello message. | |
| :return: A string representing the state | |
| """ | |
| session = _lib.SSL_get_session(self._ssl) | |
| if session == _ffi.NULL: | |
| return None | |
| length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0) | |
| _openssl_assert(length > 0) | |
| outp = _no_zero_allocator("unsigned char[]", length) | |
| _lib.SSL_get_server_random(self._ssl, outp, length) | |
| return _ffi.buffer(outp, length)[:] | |
| def client_random(self): | |
| """ | |
| Retrieve the random value used with the client hello message. | |
| :return: A string representing the state | |
| """ | |
| session = _lib.SSL_get_session(self._ssl) | |
| if session == _ffi.NULL: | |
| return None | |
| length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0) | |
| _openssl_assert(length > 0) | |
| outp = _no_zero_allocator("unsigned char[]", length) | |
| _lib.SSL_get_client_random(self._ssl, outp, length) | |
| return _ffi.buffer(outp, length)[:] | |
| def master_key(self): | |
| """ | |
| Retrieve the value of the master key for this session. | |
| :return: A string representing the state | |
| """ | |
| session = _lib.SSL_get_session(self._ssl) | |
| if session == _ffi.NULL: | |
| return None | |
| length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0) | |
| _openssl_assert(length > 0) | |
| outp = _no_zero_allocator("unsigned char[]", length) | |
| _lib.SSL_SESSION_get_master_key(session, outp, length) | |
| return _ffi.buffer(outp, length)[:] | |
| def export_keying_material(self, label, olen, context=None): | |
| """ | |
| Obtain keying material for application use. | |
| :param: label - a disambiguating label string as described in RFC 5705 | |
| :param: olen - the length of the exported key material in bytes | |
| :param: context - a per-association context value | |
| :return: the exported key material bytes or None | |
| """ | |
| outp = _no_zero_allocator("unsigned char[]", olen) | |
| context_buf = _ffi.NULL | |
| context_len = 0 | |
| use_context = 0 | |
| if context is not None: | |
| context_buf = context | |
| context_len = len(context) | |
| use_context = 1 | |
| success = _lib.SSL_export_keying_material( | |
| self._ssl, | |
| outp, | |
| olen, | |
| label, | |
| len(label), | |
| context_buf, | |
| context_len, | |
| use_context, | |
| ) | |
| _openssl_assert(success == 1) | |
| return _ffi.buffer(outp, olen)[:] | |
| def sock_shutdown(self, *args, **kwargs): | |
| """ | |
| Call the :meth:`shutdown` method of the underlying socket. | |
| See :manpage:`shutdown(2)`. | |
| :return: What the socket's shutdown() method returns | |
| """ | |
| return self._socket.shutdown(*args, **kwargs) | |
| def get_certificate(self): | |
| """ | |
| Retrieve the local certificate (if any) | |
| :return: The local certificate | |
| """ | |
| cert = _lib.SSL_get_certificate(self._ssl) | |
| if cert != _ffi.NULL: | |
| _lib.X509_up_ref(cert) | |
| return X509._from_raw_x509_ptr(cert) | |
| return None | |
| def get_peer_certificate(self): | |
| """ | |
| Retrieve the other side's certificate (if any) | |
| :return: The peer's certificate | |
| """ | |
| cert = _lib.SSL_get_peer_certificate(self._ssl) | |
| if cert != _ffi.NULL: | |
| return X509._from_raw_x509_ptr(cert) | |
| return None | |
| def _cert_stack_to_list(cert_stack): | |
| """ | |
| Internal helper to convert a STACK_OF(X509) to a list of X509 | |
| instances. | |
| """ | |
| result = [] | |
| for i in range(_lib.sk_X509_num(cert_stack)): | |
| cert = _lib.sk_X509_value(cert_stack, i) | |
| _openssl_assert(cert != _ffi.NULL) | |
| res = _lib.X509_up_ref(cert) | |
| _openssl_assert(res >= 1) | |
| pycert = X509._from_raw_x509_ptr(cert) | |
| result.append(pycert) | |
| return result | |
| def get_peer_cert_chain(self): | |
| """ | |
| Retrieve the other side's certificate (if any) | |
| :return: A list of X509 instances giving the peer's certificate chain, | |
| or None if it does not have one. | |
| """ | |
| cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl) | |
| if cert_stack == _ffi.NULL: | |
| return None | |
| return self._cert_stack_to_list(cert_stack) | |
| def get_verified_chain(self): | |
| """ | |
| Retrieve the verified certificate chain of the peer including the | |
| peer's end entity certificate. It must be called after a session has | |
| been successfully established. If peer verification was not successful | |
| the chain may be incomplete, invalid, or None. | |
| :return: A list of X509 instances giving the peer's verified | |
| certificate chain, or None if it does not have one. | |
| .. versionadded:: 20.0 | |
| """ | |
| # OpenSSL 1.1+ | |
| cert_stack = _lib.SSL_get0_verified_chain(self._ssl) | |
| if cert_stack == _ffi.NULL: | |
| return None | |
| return self._cert_stack_to_list(cert_stack) | |
| def want_read(self): | |
| """ | |
| Checks if more data has to be read from the transport layer to complete | |
| an operation. | |
| :return: True iff more data has to be read | |
| """ | |
| return _lib.SSL_want_read(self._ssl) | |
| def want_write(self): | |
| """ | |
| Checks if there is data to write to the transport layer to complete an | |
| operation. | |
| :return: True iff there is data to write | |
| """ | |
| return _lib.SSL_want_write(self._ssl) | |
| def set_accept_state(self): | |
| """ | |
| Set the connection to work in server mode. The handshake will be | |
| handled automatically by read/write. | |
| :return: None | |
| """ | |
| _lib.SSL_set_accept_state(self._ssl) | |
| def set_connect_state(self): | |
| """ | |
| Set the connection to work in client mode. The handshake will be | |
| handled automatically by read/write. | |
| :return: None | |
| """ | |
| _lib.SSL_set_connect_state(self._ssl) | |
| def get_session(self): | |
| """ | |
| Returns the Session currently used. | |
| :return: An instance of :class:`OpenSSL.SSL.Session` or | |
| :obj:`None` if no session exists. | |
| .. versionadded:: 0.14 | |
| """ | |
| session = _lib.SSL_get1_session(self._ssl) | |
| if session == _ffi.NULL: | |
| return None | |
| pysession = Session.__new__(Session) | |
| pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free) | |
| return pysession | |
| def set_session(self, session): | |
| """ | |
| Set the session to be used when the TLS/SSL connection is established. | |
| :param session: A Session instance representing the session to use. | |
| :returns: None | |
| .. versionadded:: 0.14 | |
| """ | |
| if not isinstance(session, Session): | |
| raise TypeError("session must be a Session instance") | |
| result = _lib.SSL_set_session(self._ssl, session._session) | |
| _openssl_assert(result == 1) | |
| def _get_finished_message(self, function): | |
| """ | |
| Helper to implement :meth:`get_finished` and | |
| :meth:`get_peer_finished`. | |
| :param function: Either :data:`SSL_get_finished`: or | |
| :data:`SSL_get_peer_finished`. | |
| :return: :data:`None` if the desired message has not yet been | |
| received, otherwise the contents of the message. | |
| :rtype: :class:`bytes` or :class:`NoneType` | |
| """ | |
| # The OpenSSL documentation says nothing about what might happen if the | |
| # count argument given is zero. Specifically, it doesn't say whether | |
| # the output buffer may be NULL in that case or not. Inspection of the | |
| # implementation reveals that it calls memcpy() unconditionally. | |
| # Section 7.1.4, paragraph 1 of the C standard suggests that | |
| # memcpy(NULL, source, 0) is not guaranteed to produce defined (let | |
| # alone desirable) behavior (though it probably does on just about | |
| # every implementation...) | |
| # | |
| # Allocate a tiny buffer to pass in (instead of just passing NULL as | |
| # one might expect) for the initial call so as to be safe against this | |
| # potentially undefined behavior. | |
| empty = _ffi.new("char[]", 0) | |
| size = function(self._ssl, empty, 0) | |
| if size == 0: | |
| # No Finished message so far. | |
| return None | |
| buf = _no_zero_allocator("char[]", size) | |
| function(self._ssl, buf, size) | |
| return _ffi.buffer(buf, size)[:] | |
| def get_finished(self): | |
| """ | |
| Obtain the latest TLS Finished message that we sent. | |
| :return: The contents of the message or :obj:`None` if the TLS | |
| handshake has not yet completed. | |
| :rtype: :class:`bytes` or :class:`NoneType` | |
| .. versionadded:: 0.15 | |
| """ | |
| return self._get_finished_message(_lib.SSL_get_finished) | |
| def get_peer_finished(self): | |
| """ | |
| Obtain the latest TLS Finished message that we received from the peer. | |
| :return: The contents of the message or :obj:`None` if the TLS | |
| handshake has not yet completed. | |
| :rtype: :class:`bytes` or :class:`NoneType` | |
| .. versionadded:: 0.15 | |
| """ | |
| return self._get_finished_message(_lib.SSL_get_peer_finished) | |
| def get_cipher_name(self): | |
| """ | |
| Obtain the name of the currently used cipher. | |
| :returns: The name of the currently used cipher or :obj:`None` | |
| if no connection has been established. | |
| :rtype: :class:`unicode` or :class:`NoneType` | |
| .. versionadded:: 0.15 | |
| """ | |
| cipher = _lib.SSL_get_current_cipher(self._ssl) | |
| if cipher == _ffi.NULL: | |
| return None | |
| else: | |
| name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher)) | |
| return name.decode("utf-8") | |
| def get_cipher_bits(self): | |
| """ | |
| Obtain the number of secret bits of the currently used cipher. | |
| :returns: The number of secret bits of the currently used cipher | |
| or :obj:`None` if no connection has been established. | |
| :rtype: :class:`int` or :class:`NoneType` | |
| .. versionadded:: 0.15 | |
| """ | |
| cipher = _lib.SSL_get_current_cipher(self._ssl) | |
| if cipher == _ffi.NULL: | |
| return None | |
| else: | |
| return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL) | |
| def get_cipher_version(self): | |
| """ | |
| Obtain the protocol version of the currently used cipher. | |
| :returns: The protocol name of the currently used cipher | |
| or :obj:`None` if no connection has been established. | |
| :rtype: :class:`unicode` or :class:`NoneType` | |
| .. versionadded:: 0.15 | |
| """ | |
| cipher = _lib.SSL_get_current_cipher(self._ssl) | |
| if cipher == _ffi.NULL: | |
| return None | |
| else: | |
| version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher)) | |
| return version.decode("utf-8") | |
| def get_protocol_version_name(self): | |
| """ | |
| Retrieve the protocol version of the current connection. | |
| :returns: The TLS version of the current connection, for example | |
| the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown`` | |
| for connections that were not successfully established. | |
| :rtype: :class:`unicode` | |
| """ | |
| version = _ffi.string(_lib.SSL_get_version(self._ssl)) | |
| return version.decode("utf-8") | |
| def get_protocol_version(self): | |
| """ | |
| Retrieve the SSL or TLS protocol version of the current connection. | |
| :returns: The TLS version of the current connection. For example, | |
| it will return ``0x769`` for connections made over TLS version 1. | |
| :rtype: :class:`int` | |
| """ | |
| version = _lib.SSL_version(self._ssl) | |
| return version | |
| def set_alpn_protos(self, protos): | |
| """ | |
| Specify the client's ALPN protocol list. | |
| These protocols are offered to the server during protocol negotiation. | |
| :param protos: A list of the protocols to be offered to the server. | |
| This list should be a Python list of bytestrings representing the | |
| protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. | |
| """ | |
| # Different versions of OpenSSL are inconsistent about how they handle | |
| # empty proto lists (see #1043), so we avoid the problem entirely by | |
| # rejecting them ourselves. | |
| if not protos: | |
| raise ValueError("at least one protocol must be specified") | |
| # Take the list of protocols and join them together, prefixing them | |
| # with their lengths. | |
| protostr = b"".join( | |
| chain.from_iterable((bytes((len(p),)), p) for p in protos) | |
| ) | |
| # Build a C string from the list. We don't need to save this off | |
| # because OpenSSL immediately copies the data out. | |
| input_str = _ffi.new("unsigned char[]", protostr) | |
| # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: | |
| # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() | |
| # return 0 on success, and non-0 on failure. | |
| # WARNING: these functions reverse the return value convention. | |
| _openssl_assert( | |
| _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0 | |
| ) | |
| def get_alpn_proto_negotiated(self): | |
| """ | |
| Get the protocol that was negotiated by ALPN. | |
| :returns: A bytestring of the protocol name. If no protocol has been | |
| negotiated yet, returns an empty bytestring. | |
| """ | |
| data = _ffi.new("unsigned char **") | |
| data_len = _ffi.new("unsigned int *") | |
| _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) | |
| if not data_len: | |
| return b"" | |
| return _ffi.buffer(data[0], data_len[0])[:] | |
| def request_ocsp(self): | |
| """ | |
| Called to request that the server sends stapled OCSP data, if | |
| available. If this is not called on the client side then the server | |
| will not send OCSP data. Should be used in conjunction with | |
| :meth:`Context.set_ocsp_client_callback`. | |
| """ | |
| rc = _lib.SSL_set_tlsext_status_type( | |
| self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp | |
| ) | |
| _openssl_assert(rc == 1) | |