Add ESI service skeleton, and my customer ESI Proxy class

This commit is contained in:
blitzmann
2018-01-11 20:54:33 -05:00
parent 9f7e4e0dc0
commit 9e8166c13d
3 changed files with 285 additions and 0 deletions

View File

@@ -5,3 +5,4 @@ python-dateutil
urllib3
requests == 2.0.0
sqlalchemy == 1.0.5
esipy == 0.3.0

46
service/esi.py Normal file
View File

@@ -0,0 +1,46 @@
# noinspection PyPackageRequirements
import wx
from logbook import Logger
import threading
import copy
import uuid
import time
import eos.db
from eos.enum import Enum
from eos.saveddata.crestchar import CrestChar
import gui.globalEvents as GE
from service.settings import CRESTSettings
from service.server import StoppableHTTPServer, AuthHandler
from service.pycrest.eve import EVE
pyfalog = Logger(__name__)
class Servers(Enum):
TQ = 0
SISI = 1
class CrestModes(Enum):
IMPLICIT = 0
USER = 1
class ESI(object):
# @todo: move this to settings
clientCallback = 'http://localhost:6461'
clientTest = True
_instance = None
@classmethod
def getInstance(cls):
if cls._instance is None:
cls._instance = ESI()
return cls._instance
def __init__(self):
pass

View File

@@ -0,0 +1,238 @@
# -*- encoding: utf-8 -*-
""" EsiPy Security Proxy - An ESI Security class that directs authentication towards a third-party service.
Client key/secret not needed.
"""
from __future__ import absolute_import
import base64
import logging
import time
from requests import Session
from requests.utils import quote
from six.moves.urllib.parse import urlparse
from esipy.events import AFTER_TOKEN_REFRESH
from esipy.exceptions import APIException
LOGGER = logging.getLogger(__name__)
class EsiSecurityProxy(object):
""" Contains all the OAuth2 knowledge for ESI use.
Based on pyswagger Security object, to be used with pyswagger BaseClient
implementation.
"""
def __init__(
self,
redirect_uri,
**kwargs):
""" Init the ESI Security Object
:param redirect_uri: the uri to redirect the user after login into SSO
:param sso_url: the default sso URL used when no "app" is provided
:param esi_url: the default esi URL used for verify endpoint
:param app: (optionnal) the pyswagger app object
:param security_name: (optionnal) the name of the object holding the
informations in the securityDefinitions, used to check authed endpoint
"""
app = kwargs.pop('app', None)
sso_url = kwargs.pop('sso_url', "https://login.eveonline.com")
esi_url = kwargs.pop('esi_url', "https://esi.tech.ccp.is")
self.security_name = kwargs.pop('security_name', 'evesso')
self.redirect_uri = redirect_uri
# we provide app object, so we don't use sso_url
if app is not None:
# check if the security_name exists in the securityDefinition
security = app.root.securityDefinitions.get(
self.security_name,
None
)
if security is None:
raise NameError(
"%s is not defined in the securityDefinitions" %
self.security_name
)
self.oauth_authorize = security.authorizationUrl
# some URL we still need to "manually" define... sadly
# we parse the authUrl so we don't care if it's TQ or SISI.
# https://github.com/ccpgames/esi-issues/issues/92
parsed_uri = urlparse(security.authorizationUrl)
self.oauth_token = '%s://%s/oauth/token' % (
parsed_uri.scheme,
parsed_uri.netloc
)
# no app object is provided, so we use direct URLs
else:
if sso_url is None or sso_url == "":
raise AttributeError("sso_url cannot be None or empty "
"without app parameter")
self.oauth_authorize = '%s/oauth/authorize' % sso_url
self.oauth_token = '%s/oauth/token' % sso_url
# use ESI url for verify, since it's better for caching
if esi_url is None or esi_url == "":
raise AttributeError("esi_url cannot be None or empty")
self.oauth_verify = '%s/verify/' % esi_url
# session request stuff
self._session = Session()
self._session.headers.update({
'Accept': 'application/json',
'User-Agent': (
'EsiPy/Security/ - '
'https://github.com/Kyria/EsiPy'
)
})
# token data
self.refresh_token = None
self.access_token = None
self.token_expiry = None
def __get_oauth_header(self):
""" Return the Bearer Authorization header required in oauth calls
:return: a dict with the authorization header
"""
return {'Authorization': 'Bearer %s' % self.access_token}
def __make_token_request_parameters(self, params):
""" Return the token uri from the securityDefinition
:param params: the data given to the request
:return: the oauth/token uri
"""
request_params = {
'data': params,
'url': self.oauth_token,
}
return request_params
def get_auth_uri(self, state=None, redirect='http://localhost:8080'):
""" Constructs the full auth uri and returns it.
:param state: The state to pass through the auth process
:param redirect: The URI that the proxy server will redirect to
:return: the authorizationUrl with the correct parameters.
"""
return '%s?redirect=%s%s' % (
self.oauth_authorize,
quote(redirect, safe=''),
'&state=%s' % state if state else ''
)
def get_refresh_token_params(self):
""" Return the param object for the post() call to get the access_token
from the refresh_token
:param code: the refresh token
:return: a dict with the url, params and header
"""
if self.refresh_token is None:
raise AttributeError('No refresh token is defined.')
return self.__make_token_request_parameters(
{
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
}
)
def update_token(self, response_json):
""" Update access_token, refresh_token and token_expiry from the
response body.
The response must be converted to a json object before being passed as
a parameter
:param response_json: the response body to use.
"""
self.access_token = response_json['access_token']
self.token_expiry = int(time.time()) + response_json['expires_in']
if 'refresh_token' in response_json:
self.refresh_token = response_json['refresh_token']
def is_token_expired(self, offset=0):
""" Return true if the token is expired.
The offset can be used to change the expiry time:
- positive value decrease the time (sooner)
- negative value increase the time (later)
If the expiry is not set, always return True. This case allow the users
to define a security object, only knowing the refresh_token and get
a new access_token / expiry_time without errors.
:param offset: the expiry offset (in seconds) [default: 0]
:return: boolean true if expired, else false.
"""
if self.token_expiry is None:
return True
return int(time.time()) >= (self.token_expiry - offset)
def refresh(self):
""" Update the auth data (tokens) using the refresh token in auth.
"""
request_data = self.get_refresh_token_params()
res = self._session.post(**request_data)
if res.status_code != 200:
raise APIException(
request_data['url'],
res.status_code,
res.json()
)
json_res = res.json()
self.update_token(json_res)
return json_res
def verify(self):
""" Make a get call to the oauth/verify endpoint to get the user data
:return: the json with the data.
"""
res = self._session.get(
self.oauth_verify,
headers=self.__get_oauth_header()
)
if res.status_code != 200:
raise APIException(
self.oauth_verify,
res.status_code,
res.json()
)
return res.json()
def __call__(self, request):
""" Check if the request need security header and apply them.
Required for pyswagger.core.BaseClient.request().
:param request: the pyswagger request object to check
:return: the updated request.
"""
if not request._security:
return request
if self.is_token_expired():
json_response = self.refresh()
AFTER_TOKEN_REFRESH.send(**json_response)
for security in request._security:
if self.security_name not in security:
LOGGER.warning(
"Missing Securities: [%s]" % ", ".join(security.keys())
)
continue
if self.access_token is not None:
request._p['header'].update(self.__get_oauth_header())
return request