expose the client's cipher suites from the handshake (closes #23186)

This commit is contained in:
Benjamin Peterson 2015-01-07 11:14:26 -06:00
parent e5db863c22
commit 4cb17812d9
5 changed files with 94 additions and 21 deletions

View File

@ -925,6 +925,17 @@ SSL sockets also have the following additional methods and attributes:
version of the SSL protocol that defines its use, and the number of secret version of the SSL protocol that defines its use, and the number of secret
bits being used. If no connection has been established, returns ``None``. bits being used. If no connection has been established, returns ``None``.
.. method:: SSLSocket.shared_ciphers()
Return the list of ciphers shared by the client during the handshake. Each
entry of the returned list is a three-value tuple containing the name of the
cipher, the version of the SSL protocol that defines its use, and the number
of secret bits the cipher uses. :meth:`~SSLSocket.shared_ciphers` returns
``None`` if no connection has been established or the socket is a client
socket.
.. versionadded:: 3.5
.. method:: SSLSocket.compression() .. method:: SSLSocket.compression()
Return the compression algorithm being used as a string, or ``None`` Return the compression algorithm being used as a string, or ``None``
@ -1784,6 +1795,7 @@ provided.
- :meth:`~SSLSocket.getpeercert` - :meth:`~SSLSocket.getpeercert`
- :meth:`~SSLSocket.selected_npn_protocol` - :meth:`~SSLSocket.selected_npn_protocol`
- :meth:`~SSLSocket.cipher` - :meth:`~SSLSocket.cipher`
- :meth:`~SSLSocket.shared_ciphers`
- :meth:`~SSLSocket.compression` - :meth:`~SSLSocket.compression`
- :meth:`~SSLSocket.pending` - :meth:`~SSLSocket.pending`
- :meth:`~SSLSocket.do_handshake` - :meth:`~SSLSocket.do_handshake`

View File

@ -572,6 +572,10 @@ class SSLObject:
ssl_version, secret_bits)``.""" ssl_version, secret_bits)``."""
return self._sslobj.cipher() return self._sslobj.cipher()
def shared_ciphers(self):
"""Return the ciphers shared by the client during the handshake."""
return self._sslobj.shared_ciphers()
def compression(self): def compression(self):
"""Return the current compression algorithm in use, or ``None`` if """Return the current compression algorithm in use, or ``None`` if
compression was not negotiated or not supported by one of the peers.""" compression was not negotiated or not supported by one of the peers."""
@ -784,6 +788,12 @@ class SSLSocket(socket):
else: else:
return self._sslobj.cipher() return self._sslobj.cipher()
def shared_ciphers(self):
self._checkClosed()
if not self._sslobj:
return None
return self._sslobj.shared_ciphers()
def compression(self): def compression(self):
self._checkClosed() self._checkClosed()
if not self._sslobj: if not self._sslobj:

View File

@ -1698,11 +1698,13 @@ class NetworkedBIOTests(unittest.TestCase):
sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org') sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org')
self.assertIs(sslobj._sslobj.owner, sslobj) self.assertIs(sslobj._sslobj.owner, sslobj)
self.assertIsNone(sslobj.cipher()) self.assertIsNone(sslobj.cipher())
self.assertIsNone(sslobj.shared_ciphers())
self.assertRaises(ValueError, sslobj.getpeercert) self.assertRaises(ValueError, sslobj.getpeercert)
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertIsNone(sslobj.get_channel_binding('tls-unique')) self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
self.assertTrue(sslobj.cipher()) self.assertTrue(sslobj.cipher())
self.assertIsNone(sslobj.shared_ciphers())
self.assertTrue(sslobj.getpeercert()) self.assertTrue(sslobj.getpeercert())
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertTrue(sslobj.get_channel_binding('tls-unique')) self.assertTrue(sslobj.get_channel_binding('tls-unique'))
@ -1776,6 +1778,7 @@ else:
self.close() self.close()
return False return False
else: else:
self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
if self.server.context.verify_mode == ssl.CERT_REQUIRED: if self.server.context.verify_mode == ssl.CERT_REQUIRED:
cert = self.sslconn.getpeercert() cert = self.sslconn.getpeercert()
if support.verbose and self.server.chatty: if support.verbose and self.server.chatty:
@ -1891,6 +1894,7 @@ else:
self.flag = None self.flag = None
self.active = False self.active = False
self.selected_protocols = [] self.selected_protocols = []
self.shared_ciphers = []
self.conn_errors = [] self.conn_errors = []
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
@ -2121,6 +2125,7 @@ else:
}) })
s.close() s.close()
stats['server_npn_protocols'] = server.selected_protocols stats['server_npn_protocols'] = server.selected_protocols
stats['server_shared_ciphers'] = server.shared_ciphers
return stats return stats
def try_protocol_combo(server_protocol, client_protocol, expect_success, def try_protocol_combo(server_protocol, client_protocol, expect_success,
@ -3157,6 +3162,18 @@ else:
self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR') self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
self.assertIn("TypeError", stderr.getvalue()) self.assertIn("TypeError", stderr.getvalue())
def test_shared_ciphers(self):
server_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
client_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
client_context.set_ciphers("3DES")
server_context.set_ciphers("3DES:AES")
stats = server_params_test(client_context, server_context)
ciphers = stats['server_shared_ciphers'][0]
self.assertGreater(len(ciphers), 0)
for name, tls_version, bits in ciphers:
self.assertIn("DES-CBC3-", name)
self.assertEqual(bits, 112)
def test_read_write_after_close_raises_valuerror(self): def test_read_write_after_close_raises_valuerror(self):
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED context.verify_mode = ssl.CERT_REQUIRED

View File

@ -199,6 +199,10 @@ Core and Builtins
Library Library
------- -------
- Issue #23186: Add ssl.SSLObject.shared_ciphers() and
ssl.SSLSocket.shared_ciphers() to fetch the client's list ciphers sent at
handshake.
- Issue #23143: Remove compatibility with OpenSSLs older than 0.9.8. - Issue #23143: Remove compatibility with OpenSSLs older than 0.9.8.
- Issue #23132: Improve performance and introspection support of comparison - Issue #23132: Improve performance and introspection support of comparison

View File

@ -1360,54 +1360,83 @@ If the optional argument is True, returns a DER-encoded copy of the\n\
peer certificate, or None if no certificate was provided. This will\n\ peer certificate, or None if no certificate was provided. This will\n\
return the certificate even if it wasn't validated."); return the certificate even if it wasn't validated.");
static PyObject *PySSL_cipher (PySSLSocket *self) { static PyObject *
cipher_to_tuple(const SSL_CIPHER *cipher)
PyObject *retval, *v; {
const SSL_CIPHER *current; const char *cipher_name, *cipher_protocol;
char *cipher_name; PyObject *v, *retval = PyTuple_New(3);
char *cipher_protocol;
if (self->ssl == NULL)
Py_RETURN_NONE;
current = SSL_get_current_cipher(self->ssl);
if (current == NULL)
Py_RETURN_NONE;
retval = PyTuple_New(3);
if (retval == NULL) if (retval == NULL)
return NULL; return NULL;
cipher_name = (char *) SSL_CIPHER_get_name(current); cipher_name = SSL_CIPHER_get_name(cipher);
if (cipher_name == NULL) { if (cipher_name == NULL) {
Py_INCREF(Py_None); Py_INCREF(Py_None);
PyTuple_SET_ITEM(retval, 0, Py_None); PyTuple_SET_ITEM(retval, 0, Py_None);
} else { } else {
v = PyUnicode_FromString(cipher_name); v = PyUnicode_FromString(cipher_name);
if (v == NULL) if (v == NULL)
goto fail0; goto fail;
PyTuple_SET_ITEM(retval, 0, v); PyTuple_SET_ITEM(retval, 0, v);
} }
cipher_protocol = (char *) SSL_CIPHER_get_version(current);
cipher_protocol = SSL_CIPHER_get_version(cipher);
if (cipher_protocol == NULL) { if (cipher_protocol == NULL) {
Py_INCREF(Py_None); Py_INCREF(Py_None);
PyTuple_SET_ITEM(retval, 1, Py_None); PyTuple_SET_ITEM(retval, 1, Py_None);
} else { } else {
v = PyUnicode_FromString(cipher_protocol); v = PyUnicode_FromString(cipher_protocol);
if (v == NULL) if (v == NULL)
goto fail0; goto fail;
PyTuple_SET_ITEM(retval, 1, v); PyTuple_SET_ITEM(retval, 1, v);
} }
v = PyLong_FromLong(SSL_CIPHER_get_bits(current, NULL));
v = PyLong_FromLong(SSL_CIPHER_get_bits(cipher, NULL));
if (v == NULL) if (v == NULL)
goto fail0; goto fail;
PyTuple_SET_ITEM(retval, 2, v); PyTuple_SET_ITEM(retval, 2, v);
return retval; return retval;
fail0: fail:
Py_DECREF(retval); Py_DECREF(retval);
return NULL; return NULL;
} }
static PyObject *PySSL_shared_ciphers(PySSLSocket *self)
{
STACK_OF(SSL_CIPHER) *ciphers;
int i;
PyObject *res;
if (!self->ssl->session || !self->ssl->session->ciphers)
Py_RETURN_NONE;
ciphers = self->ssl->session->ciphers;
res = PyList_New(sk_SSL_CIPHER_num(ciphers));
if (!res)
return NULL;
for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) {
PyObject *tup = cipher_to_tuple(sk_SSL_CIPHER_value(ciphers, i));
if (!tup) {
Py_DECREF(res);
return NULL;
}
PyList_SET_ITEM(res, i, tup);
}
return res;
}
static PyObject *PySSL_cipher (PySSLSocket *self)
{
const SSL_CIPHER *current;
if (self->ssl == NULL)
Py_RETURN_NONE;
current = SSL_get_current_cipher(self->ssl);
if (current == NULL)
Py_RETURN_NONE;
return cipher_to_tuple(current);
}
static PyObject *PySSL_version(PySSLSocket *self) static PyObject *PySSL_version(PySSLSocket *self)
{ {
const char *version; const char *version;
@ -2019,6 +2048,7 @@ static PyMethodDef PySSLMethods[] = {
{"peer_certificate", (PyCFunction)PySSL_peercert, METH_VARARGS, {"peer_certificate", (PyCFunction)PySSL_peercert, METH_VARARGS,
PySSL_peercert_doc}, PySSL_peercert_doc},
{"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS}, {"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS},
{"shared_ciphers", (PyCFunction)PySSL_shared_ciphers, METH_NOARGS},
{"version", (PyCFunction)PySSL_version, METH_NOARGS}, {"version", (PyCFunction)PySSL_version, METH_NOARGS},
#ifdef OPENSSL_NPN_NEGOTIATED #ifdef OPENSSL_NPN_NEGOTIATED
{"selected_npn_protocol", (PyCFunction)PySSL_selected_npn_protocol, METH_NOARGS}, {"selected_npn_protocol", (PyCFunction)PySSL_selected_npn_protocol, METH_NOARGS},