Pull endpoints from .well-known route

Add request caching (only for meta calls for now)
Add server list (eventually want to support various servers, for now only TQ)
This commit is contained in:
blitzmann
2021-10-18 13:09:16 -04:00
parent 33aa208513
commit abd138a015
7 changed files with 115 additions and 149 deletions

View File

@@ -43,6 +43,7 @@ experimentalFeatures = None
version = None
language = None
API_CLIENT_ID = '095d8cd841ac40b581330919b49fe746'
ESI_CACHE = 'esi_cache'
LOGLEVEL_MAP = {

View File

@@ -38,7 +38,7 @@ class SsoLogin(wx.Dialog):
from service.esi import Esi
self.sEsi = Esi.getInstance()
uri = self.sEsi.getLoginURI(None)
uri = self.sEsi.get_login_uri(None)
webbrowser.open(uri)
@@ -53,7 +53,7 @@ class SsoLoginServer(wx.Dialog):
self.sEsi = Esi.getInstance()
serverAddr = self.sEsi.startServer(port)
uri = self.sEsi.getLoginURI(serverAddr)
uri = self.sEsi.get_login_uri(serverAddr)
bSizer1 = wx.BoxSizer(wx.VERTICAL)
self.mainFrame.Bind(GE.EVT_SSO_LOGIN, self.OnLogin)

View File

@@ -11,4 +11,5 @@ packaging >= 16.8
roman >= 2.0.0
beautifulsoup4 >= 4.6.0
pyyaml >= 5.1
python-jose==3.0.1
python-jose==3.0.1
requests-cache=0.8.1

View File

@@ -11,10 +11,10 @@ import webbrowser
import eos.db
from service.const import EsiLoginMethod, EsiSsoMode
from eos.saveddata.ssocharacter import SsoCharacter
from service.esiAccess import APIException
from service.esiAccess import APIException, SSOError
import gui.globalEvents as GE
from gui.ssoLogin import SsoLogin, SsoLoginServer
from service.server import StoppableHTTPServer, AuthHandler, SSOError
from service.server import StoppableHTTPServer, AuthHandler
from service.settings import EsiSettings
from service.esiAccess import EsiAccess
import gui.mainFrame
@@ -134,24 +134,16 @@ class Esi(EsiAccess):
return 'http://localhost:{}'.format(port)
def handleLogin(self, message):
auth_response = self.auth(message['code'][0])
auth_response, data = self.auth(message['code'][0])
res = self._session.get(
self.oauth_verify,
headers=self.get_oauth_header(auth_response['access_token'])
)
if res.status_code != 200:
raise APIException(
self.oauth_verify,
res.status_code,
res.json()
)
cdata = res.json()
currentCharacter = self.getSsoCharacter(cdata['CharacterName'])
currentCharacter = self.getSsoCharacter(data['name'])
sub_split = data["sub"].split(":")
if (len(sub_split) != 3):
raise SSOError("JWT sub does not contain the expected data. Contents: %s" % data["sub"])
cid = sub_split[-1]
if currentCharacter is None:
currentCharacter = SsoCharacter(cdata['CharacterID'], cdata['CharacterName'], config.getClientSecret())
currentCharacter = SsoCharacter(cid, data['name'], config.getClientSecret())
Esi.update_token(currentCharacter, auth_response)

View File

@@ -10,6 +10,8 @@ Eventually I'll rewrite this to be a bit cleaner and a bit more generic, but for
"""
# noinspection PyPackageRequirements
from collections import namedtuple
from logbook import Logger
import uuid
import time
@@ -20,12 +22,14 @@ import hashlib
import json
from jose import jwt
from jose.exceptions import ExpiredSignatureError, JWTError, JWTClaimsError
import os
import datetime
from service.const import EsiSsoMode, EsiEndpoints
from service.server import SSOError
from service.settings import EsiSettings, NetworkSettings
from datetime import timedelta
from requests_cache import CachedSession
from requests import Session
from urllib.parse import urlencode, quote
@@ -40,6 +44,8 @@ pyfalog = Logger(__name__)
# os.mkdir(cache_path)
#
class SSOError(Exception):
pass
scopes = [
'esi-skills.read_skills.v1',
@@ -47,6 +53,13 @@ scopes = [
'esi-fittings.write_fittings.v1'
]
ApiBase = namedtuple('ApiBase', ['sso', 'esi'])
supported_servers = {
"Tranquility": ApiBase("login.eveonline.com", "esi.evetech.net"),
"Singularity": ApiBase("sisilogin.testeveonline.com", "esi.evetech.net"),
"Serenity": ApiBase("login.evepc.163.com", "esi.evepc.163.com")
}
class APIException(Exception):
""" Exception for SSO related errors """
@@ -82,46 +95,47 @@ class EsiAccess:
self._session.headers.update(self._basicHeaders)
self._session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
# Set up cached session. This is only used for SSO meta data for now, but can be expanded to actually handle
# various ESI caching (using ETag, for example) in the future
cached_session = CachedSession(
os.path.join(config.savePath, config.ESI_CACHE),
backend="sqlite",
cache_control=True, # Use Cache-Control headers for expiration, if available
expire_after=timedelta(days=1), # Otherwise expire responses after one day
stale_if_error=True, # In case of request errors, use stale cache data if possible
)
cached_session.headers.update(self._basicHeaders)
cached_session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
meta_call = cached_session.get("https://%s/.well-known/oauth-authorization-server" % self.server_base.sso)
meta_call.raise_for_status()
self.server_meta = meta_call.json()
jwks_call = cached_session.get(self.server_meta["jwks_uri"])
jwks_call.raise_for_status()
self.jwks = jwks_call.json()
self.server_base: ApiBase = supported_servers[self.settings.get("server")]
@property
def sso_url(self):
# if self.settings.get("ssoMode") == EsiSsoMode.CUSTOM:
return "https://login.eveonline.com/v2"
# return "https://www.pyfa.io"
return 'https://%s/v2' % self.server_base.sso
@property
def esi_url(self):
return "https://esi.evetech.net"
@property
def oauth_verify(self):
return '%s/verify/' % self.esi_url
return 'https://%s' % self.server_base.esi
@property
def oauth_authorize(self):
return '%s/oauth/authorize' % self.sso_url
return self.server_meta["authorization_endpoint"]
@property
def oauth_token(self):
return '%s/oauth/token' % self.sso_url
return self.server_meta["token_endpoint"]
def getDynamicItem(self, typeID, itemID):
return self.get(None, EsiEndpoints.DYNAMIC_ITEM.value, type_id=typeID, item_id=itemID)
def getSkills(self, char):
return self.get(char, EsiEndpoints.CHAR_SKILLS.value, character_id=char.characterID)
def getSecStatus(self, char):
return self.get(char, EsiEndpoints.CHAR.value, character_id=char.characterID)
def getFittings(self, char):
return self.get(char, EsiEndpoints.CHAR_FITTINGS.value, character_id=char.characterID)
def postFitting(self, char, json_str):
# @todo: new fitting ID can be recovered from resp.data,
return self.post(char, EsiEndpoints.CHAR_FITTINGS.value, json_str, character_id=char.characterID)
def delFitting(self, char, fittingID):
return self.delete(char, EsiEndpoints.CHAR_DEL_FIT.value, character_id=char.characterID, fitting_id=fittingID)
@property
def client_id(self):
return self.settings.get('clientID') or config.API_CLIENT_ID
@staticmethod
def update_token(char, tokenResponse):
@@ -131,7 +145,7 @@ class EsiAccess:
if 'refresh_token' in tokenResponse:
char.refreshToken = config.cipher.encrypt(tokenResponse['refresh_token'].encode())
def getLoginURI(self, redirect=None):
def get_login_uri(self, redirect=None):
self.state = str(uuid.uuid4())
# Generate the PKCE code challenge
@@ -148,12 +162,9 @@ class EsiAccess:
}
args = {
# 'pyfa_version': config.version,
# 'login_method': self.settings.get('loginMode'), # todo: encode this into the state
# 'client_hash': config.getClientSecret(),
'response_type': 'code',
'redirect_uri': 'http://127.0.0.1:5500/callback.html',
'client_id': self.settings.get('clientID') or '095d8cd841ac40b581330919b49fe746', # pyfa PKCE app # TODO: move this to some central config location, not hardcoded
'client_id': self.client_id,
'scope': ' '.join(scopes),
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
@@ -165,7 +176,6 @@ class EsiAccess:
urlencode(args)
)
def get_oauth_header(self, token):
""" Return the Bearer Authorization header required in oauth calls
@@ -173,92 +183,54 @@ class EsiAccess:
"""
return {'Authorization': 'Bearer %s' % token}
def get_refresh_token_params(self, refreshToken):
""" Return the param object for the post() call to get the access_token
from the refresh_token
:param code: the refresh token
:return: a dict with the url, params and header
"""
if refreshToken is None:
raise AttributeError('No refresh token is defined.')
data = {
'grant_type': 'refresh_token',
'refresh_token': refreshToken,
}
if self.settings.get('ssoMode') == EsiSsoMode.AUTO:
# data is all we really need, the rest is handled automatically by pyfa.io
return {
'data': data,
'url': self.oauth_token,
}
# otherwise, we need to make the token with the client keys
return self.__make_token_request_parameters(data)
def __get_token_auth_header(self):
""" Return the Basic Authorization header required to get the tokens
:return: a dict with the headers
"""
# encode/decode for py2/py3 compatibility
auth_b64 = "%s:%s" % (self.settings.get('clientID'), self.settings.get('clientSecret'))
auth_b64 = base64.b64encode(auth_b64.encode('latin-1'))
auth_b64 = auth_b64.decode('latin-1')
return {'Authorization': 'Basic %s' % auth_b64}
def __make_token_request_parameters(self, params):
request_params = {
'headers': self.__get_token_auth_header(),
'data': params,
'url': self.oauth_token,
}
return request_params
def get_access_token_request_params(self, code):
return self.__make_token_request_parameters(
{
'grant_type': 'authorization_code',
'code': code,
'client_id': self.settings.get('clientID') or '095d8cd841ac40b581330919b49fe746',
"code_verifier": self.code_verifier
}
)
def auth(self, code):
# todo: handle invalid auth code, or one that has been used already
# todo: properly handle invalid auth code, or one that has been used already
values = {
'grant_type': 'authorization_code',
'code': code,
'client_id': self.settings.get('clientID') or '095d8cd841ac40b581330919b49fe746',
'client_id': self.client_id,
"code_verifier": self.code_verifier
}
res = self.token_call(values)
json_res = res.json();
decoded_jwt = self.validate_eve_jwt(json_res['access_token'])
return json_res, decoded_jwt
def refresh(self, ssoChar):
# todo: properly handle invalid refresh token
values = {
"grant_type": "refresh_token",
"refresh_token": config.cipher.decrypt(ssoChar.refreshToken).decode(),
"client_id": self.client_id,
}
res = self.token_call(values)
json_res = res.json();
self.update_token(ssoChar, json_res)
return json_res
def token_call(self, values):
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Host": "login.eveonline.com",
"Host": self.server_base.sso,
}
res = self._session.post(
"https://login.eveonline.com/v2/oauth/token",
self.server_meta["token_endpoint"],
data=values,
headers=headers,
)
if res.status_code != 200:
raise SSOError(
"https://login.eveonline.com/v2/oauth/token",
raise APIException(
self.server_meta["token_endpoint"],
res.status_code,
res.json()
)
json_res = res.json()
self.validate_eve_jwt(json_res['access_token'])
return json_res
return res
def validate_eve_jwt(self, jwt_token):
"""Validate a JWT token retrieved from the EVE SSO.
@@ -269,19 +241,12 @@ class EsiAccess:
validation errors
"""
jwk_set_url = "https://login.eveonline.com/oauth/jwks"
res = self._session.get(jwk_set_url)
res.raise_for_status()
data = res.json()
try:
jwk_sets = data["keys"]
jwk_sets = self.jwks["keys"]
except KeyError as e:
raise SSOError("Something went wrong when retrieving the JWK set. The returned "
"payload did not have the expected key {}. \nPayload returned "
"from the SSO looks like: {}".format(e, data))
"from the SSO looks like: {}".format(e, self.jwks))
jwk_set = next((item for item in jwk_sets if item["alg"] == "RS256"))
@@ -290,7 +255,7 @@ class EsiAccess:
jwt_token,
jwk_set,
algorithms=jwk_set["alg"],
issuer=["login.eveonline.com", "https://login.eveonline.com"]
issuer=[self.server_base.sso, "https://%s" % self.server_base.sso]
)
except ExpiredSignatureError as e:
raise SSOError("The JWT token has expired: {}").format(str(e))
@@ -300,20 +265,6 @@ class EsiAccess:
raise SSOError("The issuer claim was not from login.eveonline.com or "
"https://login.eveonline.com: {}".format(str(e)))
def refresh(self, ssoChar):
request_data = self.get_refresh_token_params(config.cipher.decrypt(ssoChar.refreshToken).decode())
res = self._session.post(**request_data)
if res.status_code != 200:
raise APIException(
request_data['url'],
res.status_code,
res.json()
)
json_res = res.json()
self.update_token(ssoChar, json_res)
return json_res
def _before_request(self, ssoChar):
self._session.headers.clear()
self._session.headers.update(self._basicHeaders)
@@ -354,3 +305,24 @@ class EsiAccess:
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.delete("{}{}".format(self.esi_url, endpoint)))
# todo: move these off to another class which extends this one. This class should only handle the low level
# authentication and
def getDynamicItem(self, typeID, itemID):
return self.get(None, EsiEndpoints.DYNAMIC_ITEM.value, type_id=typeID, item_id=itemID)
def getSkills(self, char):
return self.get(char, EsiEndpoints.CHAR_SKILLS.value, character_id=char.characterID)
def getSecStatus(self, char):
return self.get(char, EsiEndpoints.CHAR.value, character_id=char.characterID)
def getFittings(self, char):
return self.get(char, EsiEndpoints.CHAR_FITTINGS.value, character_id=char.characterID)
def postFitting(self, char, json_str):
# @todo: new fitting ID can be recovered from resp.data,
return self.post(char, EsiEndpoints.CHAR_FITTINGS.value, json_str, character_id=char.characterID)
def delFitting(self, char, fittingID):
return self.delete(char, EsiEndpoints.CHAR_DEL_FIT.value, character_id=char.characterID, fitting_id=fittingID)

View File

@@ -6,10 +6,9 @@ from logbook import Logger
import socketserver
import json
pyfalog = Logger(__name__)
from service.esiAccess import APIException, SSOError
class SSOError(Exception):
pass
pyfalog = Logger(__name__)
# https://github.com/fuzzysteve/CREST-Market-Downloader/
class AuthHandler(http.server.BaseHTTPRequestHandler):
@@ -34,7 +33,7 @@ class AuthHandler(http.server.BaseHTTPRequestHandler):
self.end_headers()
except (KeyboardInterrupt, SystemExit):
raise
except SSOError as ex:
except (SSOError, APIException) as ex:
pyfalog.error("Error logging into EVE")
pyfalog.error(ex)
self.send_response(500)

View File

@@ -374,6 +374,7 @@ class EsiSettings:
"clientID": "",
"clientSecret": "",
"timeout": 60,
"server": "Tranquility",
"exportCharges": True}
self.settings = SettingsProvider.getInstance().getSettings(