141 lines
5.2 KiB
Python
141 lines
5.2 KiB
Python
import datetime
|
|
import ssl
|
|
import sys
|
|
import warnings
|
|
|
|
from requests.adapters import HTTPAdapter
|
|
|
|
try:
|
|
from requests.packages import urllib3
|
|
from requests.packages.urllib3.util import ssl_
|
|
|
|
from requests.packages.urllib3.exceptions import (
|
|
SystemTimeWarning,
|
|
SecurityWarning,
|
|
)
|
|
from requests.packages.urllib3.packages.ssl_match_hostname import \
|
|
match_hostname
|
|
except:
|
|
import urllib3
|
|
from urllib3.util import ssl_
|
|
|
|
from urllib3.exceptions import (
|
|
SystemTimeWarning,
|
|
SecurityWarning,
|
|
)
|
|
from urllib3.packages.ssl_match_hostname import \
|
|
match_hostname
|
|
|
|
|
|
|
|
|
|
class WeakCiphersHTTPSConnection(
|
|
urllib3.connection.VerifiedHTTPSConnection): # pragma: no cover
|
|
|
|
# Python versions >=2.7.9 and >=3.4.1 do not (by default) allow ciphers
|
|
# with MD5. Unfortunately, the CREST public server _only_ supports
|
|
# TLS_RSA_WITH_RC4_128_MD5 (as of 5 Jan 2015). The cipher list below is
|
|
# nearly identical except for allowing that cipher as a last resort (and
|
|
# excluding export versions of ciphers).
|
|
DEFAULT_CIPHERS = (
|
|
'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:'
|
|
'ECDH+HIGH:DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:'
|
|
'RSA+3DES:ECDH+RC4:DH+RC4:RSA+RC4:!aNULL:!eNULL:!EXP:-MD5:RSA+RC4+MD5'
|
|
)
|
|
|
|
def __init__(self, host, port, ciphers=None, **kwargs):
|
|
self.ciphers = ciphers if ciphers is not None else self.DEFAULT_CIPHERS
|
|
super(WeakCiphersHTTPSConnection, self).__init__(host, port, **kwargs)
|
|
|
|
def connect(self):
|
|
# Yup, copied in VerifiedHTTPSConnection.connect just to change the
|
|
# default cipher list.
|
|
|
|
# Add certificate verification
|
|
conn = self._new_conn()
|
|
|
|
resolved_cert_reqs = ssl_.resolve_cert_reqs(self.cert_reqs)
|
|
resolved_ssl_version = ssl_.resolve_ssl_version(self.ssl_version)
|
|
|
|
hostname = self.host
|
|
if getattr(self, '_tunnel_host', None):
|
|
# _tunnel_host was added in Python 2.6.3
|
|
# (See: http://hg.python.org/cpython/rev/0f57b30a152f)
|
|
|
|
self.sock = conn
|
|
# Calls self._set_hostport(), so self.host is
|
|
# self._tunnel_host below.
|
|
self._tunnel()
|
|
# Mark this connection as not reusable
|
|
self.auto_open = 0
|
|
|
|
# Override the host with the one we're requesting data from.
|
|
hostname = self._tunnel_host
|
|
|
|
is_time_off = datetime.date.today() < urllib3.connection.RECENT_DATE
|
|
if is_time_off:
|
|
warnings.warn((
|
|
'System time is way off (before {0}). This will probably '
|
|
'lead to SSL verification errors').format(
|
|
urllib3.connection.RECENT_DATE),
|
|
SystemTimeWarning
|
|
)
|
|
|
|
# Wrap socket using verification with the root certs in
|
|
# trusted_root_certs
|
|
self.sock = ssl_.ssl_wrap_socket(conn, self.key_file, self.cert_file,
|
|
cert_reqs=resolved_cert_reqs,
|
|
ca_certs=self.ca_certs,
|
|
server_hostname=hostname,
|
|
ssl_version=resolved_ssl_version,
|
|
ciphers=self.ciphers)
|
|
|
|
if self.assert_fingerprint:
|
|
ssl_.assert_fingerprint(self.sock.getpeercert(binary_form=True),
|
|
self.assert_fingerprint)
|
|
elif resolved_cert_reqs != ssl.CERT_NONE \
|
|
and self.assert_hostname is not False:
|
|
cert = self.sock.getpeercert()
|
|
if not cert.get('subjectAltName', ()):
|
|
warnings.warn((
|
|
'Certificate has no `subjectAltName`, falling back to check for a `commonName` for now. '
|
|
'This feature is being removed by major browsers and deprecated by RFC 2818. '
|
|
'(See https://github.com/shazow/urllib3/issues/497 for details.)'),
|
|
SecurityWarning
|
|
)
|
|
match_hostname(cert, self.assert_hostname or hostname)
|
|
|
|
self.is_verified = (resolved_cert_reqs == ssl.CERT_REQUIRED
|
|
or self.assert_fingerprint is not None)
|
|
|
|
|
|
class WeakCiphersHTTPSConnectionPool(
|
|
urllib3.connectionpool.HTTPSConnectionPool):
|
|
|
|
ConnectionCls = WeakCiphersHTTPSConnection
|
|
|
|
|
|
class WeakCiphersPoolManager(urllib3.poolmanager.PoolManager):
|
|
|
|
def _new_pool(self, scheme, host, port):
|
|
if scheme == 'https':
|
|
return WeakCiphersHTTPSConnectionPool(host, port,
|
|
**(self.connection_pool_kw))
|
|
return super(WeakCiphersPoolManager, self)._new_pool(scheme, host,
|
|
port)
|
|
|
|
|
|
class WeakCiphersAdapter(HTTPAdapter):
|
|
""""Transport adapter" that allows us to use TLS_RSA_WITH_RC4_128_MD5."""
|
|
|
|
def init_poolmanager(self, connections, maxsize, block=False,
|
|
**pool_kwargs):
|
|
# Rewrite of the requests.adapters.HTTPAdapter.init_poolmanager method
|
|
# to use WeakCiphersPoolManager instead of urllib3's PoolManager
|
|
self._pool_connections = connections
|
|
self._pool_maxsize = maxsize
|
|
self._pool_block = block
|
|
|
|
self.poolmanager = WeakCiphersPoolManager(num_pools=connections,
|
|
maxsize=maxsize, block=block, strict=True, **pool_kwargs)
|