236 lines
7.9 KiB
Python
236 lines
7.9 KiB
Python
# -*- encoding: utf-8 -*-
|
|
""" EsiPy Security Proxy - An ESI Security class that directs authentication towards a third-party service.
|
|
Client key/secret not needed.
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import base64
|
|
import logging
|
|
import time
|
|
|
|
from requests import Session
|
|
from requests.utils import quote
|
|
from six.moves.urllib.parse import urlparse
|
|
|
|
from esipy.events import AFTER_TOKEN_REFRESH
|
|
from esipy.exceptions import APIException
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class EsiSecurityProxy(object):
|
|
""" Contains all the OAuth2 knowledge for ESI use.
|
|
Based on pyswagger Security object, to be used with pyswagger BaseClient
|
|
implementation.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
**kwargs):
|
|
""" Init the ESI Security Object
|
|
|
|
:param sso_url: the default sso URL used when no "app" is provided
|
|
:param esi_url: the default esi URL used for verify endpoint
|
|
:param app: (optionnal) the pyswagger app object
|
|
:param security_name: (optionnal) the name of the object holding the
|
|
informations in the securityDefinitions, used to check authed endpoint
|
|
"""
|
|
|
|
app = kwargs.pop('app', None)
|
|
sso_url = kwargs.pop('sso_url', "https://login.eveonline.com")
|
|
esi_url = kwargs.pop('esi_url', "https://esi.tech.ccp.is")
|
|
|
|
self.security_name = kwargs.pop('security_name', 'evesso')
|
|
|
|
# we provide app object, so we don't use sso_url
|
|
if app is not None:
|
|
# check if the security_name exists in the securityDefinition
|
|
security = app.root.securityDefinitions.get(
|
|
self.security_name,
|
|
None
|
|
)
|
|
if security is None:
|
|
raise NameError(
|
|
"%s is not defined in the securityDefinitions" %
|
|
self.security_name
|
|
)
|
|
|
|
self.oauth_authorize = security.authorizationUrl
|
|
|
|
# some URL we still need to "manually" define... sadly
|
|
# we parse the authUrl so we don't care if it's TQ or SISI.
|
|
# https://github.com/ccpgames/esi-issues/issues/92
|
|
parsed_uri = urlparse(security.authorizationUrl)
|
|
self.oauth_token = '%s://%s/oauth/token' % (
|
|
parsed_uri.scheme,
|
|
parsed_uri.netloc
|
|
)
|
|
|
|
# no app object is provided, so we use direct URLs
|
|
else:
|
|
if sso_url is None or sso_url == "":
|
|
raise AttributeError("sso_url cannot be None or empty "
|
|
"without app parameter")
|
|
|
|
self.oauth_authorize = '%s/oauth/authorize' % sso_url
|
|
self.oauth_token = '%s/oauth/token' % sso_url
|
|
|
|
# use ESI url for verify, since it's better for caching
|
|
if esi_url is None or esi_url == "":
|
|
raise AttributeError("esi_url cannot be None or empty")
|
|
self.oauth_verify = '%s/verify/' % esi_url
|
|
|
|
# session request stuff
|
|
self._session = Session()
|
|
self._session.headers.update({
|
|
'Accept': 'application/json',
|
|
'User-Agent': (
|
|
'EsiPy/Security/ - '
|
|
'https://github.com/Kyria/EsiPy'
|
|
)
|
|
})
|
|
|
|
# token data
|
|
self.refresh_token = None
|
|
self.access_token = None
|
|
self.token_expiry = None
|
|
|
|
def __get_oauth_header(self):
|
|
""" Return the Bearer Authorization header required in oauth calls
|
|
|
|
:return: a dict with the authorization header
|
|
"""
|
|
return {'Authorization': 'Bearer %s' % self.access_token}
|
|
|
|
def __make_token_request_parameters(self, params):
|
|
""" Return the token uri from the securityDefinition
|
|
|
|
:param params: the data given to the request
|
|
:return: the oauth/token uri
|
|
"""
|
|
request_params = {
|
|
'data': params,
|
|
'url': self.oauth_token,
|
|
}
|
|
|
|
return request_params
|
|
|
|
def get_auth_uri(self, state=None, redirect='http://localhost:8080'):
|
|
""" Constructs the full auth uri and returns it.
|
|
|
|
:param state: The state to pass through the auth process
|
|
:param redirect: The URI that the proxy server will redirect to
|
|
:return: the authorizationUrl with the correct parameters.
|
|
"""
|
|
|
|
return '%s?redirect=%s%s' % (
|
|
self.oauth_authorize,
|
|
quote(redirect, safe=''),
|
|
'&state=%s' % state if state else ''
|
|
)
|
|
|
|
def get_refresh_token_params(self):
|
|
""" Return the param object for the post() call to get the access_token
|
|
from the refresh_token
|
|
|
|
:param code: the refresh token
|
|
:return: a dict with the url, params and header
|
|
"""
|
|
if self.refresh_token is None:
|
|
raise AttributeError('No refresh token is defined.')
|
|
|
|
return self.__make_token_request_parameters(
|
|
{
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': self.refresh_token,
|
|
}
|
|
)
|
|
|
|
def update_token(self, response_json):
|
|
""" Update access_token, refresh_token and token_expiry from the
|
|
response body.
|
|
The response must be converted to a json object before being passed as
|
|
a parameter
|
|
|
|
:param response_json: the response body to use.
|
|
"""
|
|
self.access_token = response_json['access_token']
|
|
self.token_expiry = int(time.time()) + response_json['expires_in']
|
|
|
|
if 'refresh_token' in response_json:
|
|
self.refresh_token = response_json['refresh_token']
|
|
|
|
def is_token_expired(self, offset=0):
|
|
""" Return true if the token is expired.
|
|
|
|
The offset can be used to change the expiry time:
|
|
- positive value decrease the time (sooner)
|
|
- negative value increase the time (later)
|
|
If the expiry is not set, always return True. This case allow the users
|
|
to define a security object, only knowing the refresh_token and get
|
|
a new access_token / expiry_time without errors.
|
|
|
|
:param offset: the expiry offset (in seconds) [default: 0]
|
|
:return: boolean true if expired, else false.
|
|
"""
|
|
if self.token_expiry is None:
|
|
return True
|
|
return int(time.time()) >= (self.token_expiry - offset)
|
|
|
|
def refresh(self):
|
|
""" Update the auth data (tokens) using the refresh token in auth.
|
|
"""
|
|
request_data = self.get_refresh_token_params()
|
|
res = self._session.post(**request_data)
|
|
if res.status_code != 200:
|
|
raise APIException(
|
|
request_data['url'],
|
|
res.status_code,
|
|
res.json()
|
|
)
|
|
json_res = res.json()
|
|
self.update_token(json_res)
|
|
return json_res
|
|
|
|
def verify(self):
|
|
""" Make a get call to the oauth/verify endpoint to get the user data
|
|
|
|
:return: the json with the data.
|
|
"""
|
|
res = self._session.get(
|
|
self.oauth_verify,
|
|
headers=self.__get_oauth_header()
|
|
)
|
|
if res.status_code != 200:
|
|
raise APIException(
|
|
self.oauth_verify,
|
|
res.status_code,
|
|
res.json()
|
|
)
|
|
return res.json()
|
|
|
|
def __call__(self, request):
|
|
""" Check if the request need security header and apply them.
|
|
Required for pyswagger.core.BaseClient.request().
|
|
|
|
:param request: the pyswagger request object to check
|
|
:return: the updated request.
|
|
"""
|
|
if not request._security:
|
|
return request
|
|
|
|
if self.is_token_expired():
|
|
json_response = self.refresh()
|
|
AFTER_TOKEN_REFRESH.send(**json_response)
|
|
|
|
for security in request._security:
|
|
if self.security_name not in security:
|
|
LOGGER.warning(
|
|
"Missing Securities: [%s]" % ", ".join(security.keys())
|
|
)
|
|
continue
|
|
if self.access_token is not None:
|
|
request._p['header'].update(self.__get_oauth_header())
|
|
|
|
return request
|