Merge pull request #2363 from pyfa-org/sso_v2

Sso v2
This commit is contained in:
Ryan Holmes
2021-10-26 17:22:46 -04:00
committed by GitHub
11 changed files with 331 additions and 432 deletions

View File

@@ -43,7 +43,9 @@ experimentalFeatures = None
version = None
language = None
API_CLIENT_ID = '095d8cd841ac40b581330919b49fe746'
ESI_CACHE = 'esi_cache'
SSO_CALLBACK = 'https://pyfa-org.github.io/Pyfa/callback'
LOGLEVEL_MAP = {
"critical": CRITICAL,

View File

@@ -37,118 +37,19 @@ class PFEsiPref(PreferenceView):
rbSizer = wx.BoxSizer(wx.HORIZONTAL)
self.rbMode = wx.RadioBox(panel, -1, _t("Login Authentication Method"), wx.DefaultPosition, wx.DefaultSize,
[_t('Local Server'), _t('Manual')], 1, wx.RA_SPECIFY_COLS)
self.rbMode.SetItemToolTip(0, _t("This options starts a local webserver that the web application will call back to"
self.rbMode.SetItemToolTip(0, _t("This option starts a local webserver that EVE SSO Server will call back to"
" with information about the character login."))
self.rbMode.SetItemToolTip(1, _t("This option prompts users to copy and paste information from the web application "
"to allow for character login. Use this if having issues with the local server."))
self.rbSsoMode = wx.RadioBox(panel, -1, _t("SSO Mode"), wx.DefaultPosition, wx.DefaultSize,
[_t('pyfa.io'), _t('Custom application')], 1, wx.RA_SPECIFY_COLS)
self.rbSsoMode.SetItemToolTip(0, _t("This options routes SSO Logins through pyfa.io, allowing you to easily login "
"without any configuration. When in doubt, use this option."))
self.rbSsoMode.SetItemToolTip(1, _t("This option goes through EVE SSO directly, but requires more configuration. Use "
"this if pyfa.io is blocked for some reason, or if you do not wish to route data throguh pyfa.io."))
self.rbMode.SetItemToolTip(1, _t("This option prompts users to copy and paste information to allow for"
" character login. Use this if having issues with the local server."))
self.rbMode.SetSelection(self.settings.get('loginMode'))
self.rbSsoMode.SetSelection(self.settings.get('ssoMode'))
rbSizer.Add(self.rbSsoMode, 1, wx.ALL, 5)
rbSizer.Add(self.rbMode, 1, wx.TOP | wx.RIGHT, 5)
self.rbMode.Bind(wx.EVT_RADIOBOX, self.OnModeChange)
self.rbSsoMode.Bind(wx.EVT_RADIOBOX, self.OnSSOChange)
mainSizer.Add(rbSizer, 1, wx.ALL | wx.EXPAND, 0)
detailsTitle = wx.StaticText(panel, wx.ID_ANY, _t("Custom Application"), wx.DefaultPosition, wx.DefaultSize, 0)
detailsTitle.Wrap(-1)
detailsTitle.SetFont(wx.Font(12, 70, 90, 90, False, wx.EmptyString))
mainSizer.Add(detailsTitle, 0, wx.ALL, 5)
mainSizer.Add(wx.StaticLine(panel, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.LI_HORIZONTAL), 0,
wx.EXPAND, 5)
fgAddrSizer = wx.FlexGridSizer(2, 2, 0, 0)
fgAddrSizer.AddGrowableCol(1)
fgAddrSizer.SetFlexibleDirection(wx.BOTH)
fgAddrSizer.SetNonFlexibleGrowMode(wx.FLEX_GROWMODE_SPECIFIED)
self.stSetID = wx.StaticText(panel, wx.ID_ANY, _t("Client ID:"), wx.DefaultPosition, wx.DefaultSize, 0)
self.stSetID.Wrap(-1)
fgAddrSizer.Add(self.stSetID, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 5)
self.inputClientID = wx.TextCtrl(panel, wx.ID_ANY, self.settings.get('clientID'), wx.DefaultPosition,
wx.DefaultSize, 0)
fgAddrSizer.Add(self.inputClientID, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL | wx.EXPAND, 5)
self.stSetSecret = wx.StaticText(panel, wx.ID_ANY, _t("Client Secret:"), wx.DefaultPosition, wx.DefaultSize, 0)
self.stSetSecret.Wrap(-1)
fgAddrSizer.Add(self.stSetSecret, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 5)
self.inputClientSecret = wx.TextCtrl(panel, wx.ID_ANY, self.settings.get('clientSecret'), wx.DefaultPosition,
wx.DefaultSize, 0)
fgAddrSizer.Add(self.inputClientSecret, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL | wx.EXPAND, 5)
self.inputClientID.Bind(wx.EVT_TEXT, self.OnClientDetailChange)
self.inputClientSecret.Bind(wx.EVT_TEXT, self.OnClientDetailChange)
mainSizer.Add(fgAddrSizer, 0, wx.EXPAND, 5)
# self.stTimout = wx.StaticText(panel, wx.ID_ANY, "Timeout (seconds):", wx.DefaultPosition, wx.DefaultSize, 0)
# self.stTimout.Wrap(-1)
#
# timeoutSizer.Add(self.stTimout, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 5)
# self.intTimeout = IntCtrl(panel, max=300000, limited=True, value=self.settings.get('timeout'))
# timeoutSizer.Add(self.intTimeout, 0, wx.ALL, 5)
# self.intTimeout.Bind(wx.lib.intctrl.EVT_INT, self.OnTimeoutChange)
#
# mainSizer.Add(timeoutSizer, 0, wx.ALL | wx.EXPAND, 0)
# detailsTitle = wx.StaticText(panel, wx.ID_ANY, "CREST client details", wx.DefaultPosition, wx.DefaultSize, 0)
# detailsTitle.Wrap(-1)
# detailsTitle.SetFont(wx.Font(12, 70, 90, 90, False, wx.EmptyString))
#
# mainSizer.Add(detailsTitle, 0, wx.ALL, 5)
# mainSizer.Add(wx.StaticLine(panel, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.LI_HORIZONTAL), 0,
# wx.EXPAND, 5)
# fgAddrSizer = wx.FlexGridSizer(2, 2, 0, 0)
# fgAddrSizer.AddGrowableCol(1)
# fgAddrSizer.SetFlexibleDirection(wx.BOTH)
# fgAddrSizer.SetNonFlexibleGrowMode(wx.FLEX_GROWMODE_SPECIFIED)
#
# self.stSetID = wx.StaticText(panel, wx.ID_ANY, "Client ID:", wx.DefaultPosition, wx.DefaultSize, 0)
# self.stSetID.Wrap(-1)
# fgAddrSizer.Add(self.stSetID, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 5)
#
# self.inputClientID = wx.TextCtrl(panel, wx.ID_ANY, self.settings.get('clientID'), wx.DefaultPosition,
# wx.DefaultSize, 0)
#
# fgAddrSizer.Add(self.inputClientID, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL | wx.EXPAND, 5)
#
# self.stSetSecret = wx.StaticText(panel, wx.ID_ANY, "Client Secret:", wx.DefaultPosition, wx.DefaultSize, 0)
# self.stSetSecret.Wrap(-1)
#
# fgAddrSizer.Add(self.stSetSecret, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 5)
#
# self.inputClientSecret = wx.TextCtrl(panel, wx.ID_ANY, self.settings.get('clientSecret'), wx.DefaultPosition,
# wx.DefaultSize, 0)
#
# fgAddrSizer.Add(self.inputClientSecret, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL | wx.EXPAND, 5)
#
# self.btnApply = wx.Button(panel, wx.ID_ANY, "Save Client Settings", wx.DefaultPosition, wx.DefaultSize, 0)
# self.btnApply.Bind(wx.EVT_BUTTON, self.OnBtnApply)
#
# mainSizer.Add(fgAddrSizer, 0, wx.EXPAND, 5)
# mainSizer.Add(self.btnApply, 0, wx.ALIGN_RIGHT, 5)
# self.ToggleProxySettings(self.settings.get('loginMode'))
self.ToggleSSOMode(self.settings.get('ssoMode'))
panel.SetSizer(mainSizer)
panel.Layout()
@@ -158,32 +59,6 @@ class PFEsiPref(PreferenceView):
def OnModeChange(self, event):
self.settings.set('loginMode', event.GetInt())
def OnSSOChange(self, event):
self.settings.set('ssoMode', event.GetInt())
self.ToggleSSOMode(event.GetInt())
def ToggleSSOMode(self, mode):
if mode:
self.stSetID.Enable()
self.inputClientID.Enable()
self.stSetSecret.Enable()
self.inputClientSecret.Enable()
self.rbMode.Disable()
else:
self.stSetID.Disable()
self.inputClientID.Disable()
self.stSetSecret.Disable()
self.inputClientSecret.Disable()
self.rbMode.Enable()
def OnClientDetailChange(self, evt):
self.settings.set('clientID', self.inputClientID.GetValue().strip())
self.settings.set('clientSecret', self.inputClientSecret.GetValue().strip())
# sEsi = Esi.getInstance()
# sEsi.delAllCharacters()
#
def getImage(self):
return BitmapLoader.getBitmap("eve", "gui")

View File

@@ -42,6 +42,7 @@ from gui.contextMenu import ContextMenu
from gui.utils.clipboard import fromClipboard, toClipboard
from service.character import Character
from service.esi import Esi
from service.esiAccess import APIException
from service.fit import Fit
from service.market import Market
@@ -888,14 +889,7 @@ class APIView(wx.Panel):
def fetchCallback(e=None):
if e:
pyfalog.warn("Error fetching skill information for character for __fetchCallback")
exc_type, exc_value, exc_trace = e
if config.debug:
exc_value = ''.join(traceback.format_exception(exc_type, exc_value, exc_trace))
pyfalog.warn(exc_value)
wx.MessageBox(
_t("Error fetching skill information"),
_t("Error"), wx.ICON_ERROR | wx.STAY_ON_TOP)
SkillFetchExceptionHandler(e)
else:
wx.MessageBox(
_t("Successfully fetched skills"), _t("Success"), wx.ICON_INFORMATION | wx.STAY_ON_TOP)
@@ -926,3 +920,24 @@ class SecStatusDialog(wx.Dialog):
self.Layout()
self.Center(wx.BOTH)
class SkillFetchExceptionHandler:
def __init__(self, e):
from gui.esiFittings import ESIExceptionHandler
exc_type, exc_value, exc_trace = e
if config.debug:
exc_value = ''.join(traceback.format_exception(exc_type, exc_value, exc_trace))
pyfalog.warn(exc_value)
try:
try:
raise exc_value
except APIException as ex:
pyfalog.error(ex)
ESIExceptionHandler(ex)
except Exception as ex:
pyfalog.error(ex)
wx.MessageBox(
_t("Error fetching skill information"),
_t("Error"), wx.ICON_ERROR | wx.STAY_ON_TOP)

View File

@@ -163,15 +163,9 @@ class CharacterSelection(wx.Panel):
if e is None:
self.refreshCharacterList()
else:
from gui.characterEditor import SkillFetchExceptionHandler
pyfalog.warn("Error fetching skill information for character for refreshAPICallback")
exc_type, exc_value, exc_trace = e
if config.debug:
exc_value = ''.join(traceback.format_exception(exc_type, exc_value, exc_trace))
pyfalog.warn(exc_value)
wx.MessageBox(
_t("Error fetching skill information"),
_t("Error"), wx.ICON_ERROR | wx.STAY_ON_TOP)
SkillFetchExceptionHandler(e)
def charChanged(self, event):
fitID = self.mainFrame.getActiveFit()

View File

@@ -9,6 +9,7 @@ import config
import gui.globalEvents as GE
from eos.db import getItem
from eos.saveddata.cargo import Cargo
import gui.mainFrame
from gui.auxWindow import AuxiliaryFrame
from gui.display import Display
from gui.characterEditor import APIView
@@ -132,7 +133,7 @@ class EveFittings(AuxiliaryFrame):
except APIException as ex:
# Can't do this in a finally because then it obscures the message dialog
del waitDialog # noqa: F821
ESIExceptionHandler(self, ex)
ESIExceptionHandler(ex)
except (KeyboardInterrupt, SystemExit):
raise
except Exception as ex:
@@ -149,6 +150,7 @@ class EveFittings(AuxiliaryFrame):
self.mainFrame._openAfterImport(fits)
def deleteFitting(self, event):
self.statusbar.SetStatusText("")
sEsi = Esi.getInstance()
selection = self.fitView.fitSelection
if not selection:
@@ -164,16 +166,26 @@ class EveFittings(AuxiliaryFrame):
if activeChar is None:
return
try:
sEsi.delFitting(activeChar, data['fitting_id'])
# repopulate the fitting list
self.fitTree.populateSkillTree(self.fittings)
self.fitView.update([])
try:
sEsi.delFitting(activeChar, data['fitting_id'])
# repopulate the fitting list
self.fitTree.populateSkillTree(self.fittings)
self.fitView.update([])
except APIException as ex:
pyfalog.error(ex)
self.statusbar.SetStatusText("Failed to delete fit: ESI error {} received - {}".format(ex.status_code, ex.response["error"]))
try:
ESIExceptionHandler(ex)
except:
# don't need to do anything - we should already have error code in the status
pass
except requests.exceptions.ConnectionError:
msg = _t("Connection error, please check your internet connection")
pyfalog.error(msg)
self.statusbar.SetStatusText(msg)
def deleteAllFittings(self, event):
self.statusbar.SetStatusText("")
sEsi = Esi.getInstance()
activeChar = self.getActiveCharacter()
if activeChar is None:
@@ -186,20 +198,30 @@ class EveFittings(AuxiliaryFrame):
) as dlg:
if dlg.ShowModal() == wx.ID_YES:
try:
for fit in self.fittings:
sEsi.delFitting(activeChar, fit['fitting_id'])
anyDeleted = True
try:
for fit in self.fittings:
sEsi.delFitting(activeChar, fit['fitting_id'])
anyDeleted = True
except APIException as ex:
pyfalog.error(ex)
if anyDeleted:
msg = "Some fits were not deleted: ESI error {} received - {}".format(ex.status_code,
ex.response["error"])
else:
msg = "Failed to delete fits: ESI error {} received - {}".format(ex.status_code,
ex.response["error"])
pyfalog.error(msg)
self.statusbar.SetStatusText(msg)
try:
ESIExceptionHandler(ex)
except:
# don't need to do anything - we should already have error code in the status
pass
except requests.exceptions.ConnectionError:
msg = "Connection error, please check your internet connection"
pyfalog.error(msg)
self.statusbar.SetStatusText(msg)
except APIException as ex:
if anyDeleted:
msg = "Some fits were not deleted: ESI error {} received".format(ex.status_code)
else:
msg = "Failed to delete fits: ESI error {} received".format(ex.status_code)
pyfalog.error(msg)
self.statusbar.SetStatusText(msg)
# repopulate the fitting list
self.fitTree.populateSkillTree(self.fittings)
self.fitView.update([])
@@ -222,15 +244,33 @@ class ESIServerExceptionHandler:
class ESIExceptionHandler:
# todo: make this a generate excetpion handler for all calls
def __init__(self, parentWindow, ex):
if ex.response['error'].startswith('Token is not valid') or ex.response['error'] == 'invalid_token': # todo: this seems messy, figure out a better response
def __init__(self, ex):
# raise ex
if ex.response['error'].startswith('Token is not valid') \
or ex.response['error'] == 'invalid_token' \
or ex.response['error'] == 'invalid_grant': # todo: this seems messy, figure out a better response
pyfalog.error(ex)
with wx.MessageDialog(
parentWindow,
gui.mainFrame.MainFrame.getInstance(),
_t("There was an error validating characters' SSO token. Please try "
"logging into the character again to reset the token."),
_t("Invalid Token"),
wx.OK | wx.ICON_ERROR
wx.OK | wx.ICON_ERROR | wx.CANCEL
) as dlg:
dlg.SetOKLabel("Manage ESI Characters")
ret = dlg.ShowModal()
if ret == wx.ID_OK:
SsoCharacterMgmt.openOne(parent=gui.mainFrame.MainFrame.getInstance())
# todo: spawn manage esi characters
pass
elif ex.response['error'].startswith('Timeout contacting'):
pyfalog.error(ex)
with wx.MessageDialog(
gui.mainFrame.MainFrame.getInstance(),
"HTTP %s: %s\n\n" % (ex.status_code, ex.response['error'])
+ _t("The server took too long to response. Please try again in a moment."),
_t("Timeout"),
wx.OK | wx.ICON_ERROR
) as dlg:
dlg.ShowModal()
else:
@@ -335,9 +375,9 @@ class ExportToEve(AuxiliaryFrame):
pyfalog.warning(msg)
self.statusbar.SetStatusText(msg, 1)
return
res = sEsi.postFitting(activeChar, data)
try:
res = sEsi.postFitting(activeChar, data)
res.raise_for_status()
self.statusbar.SetStatusText("", 0)
self.statusbar.SetStatusText(res.reason, 1)
@@ -346,19 +386,19 @@ class ExportToEve(AuxiliaryFrame):
pyfalog.error(msg)
self.statusbar.SetStatusText(_t("ERROR"), 0)
self.statusbar.SetStatusText(msg, 1)
except ESIExportException as ex:
except APIException as ex:
pyfalog.error(ex)
self.statusbar.SetStatusText(_t("ERROR"), 0)
self.statusbar.SetStatusText("{} - {}".format(res.status_code, res.reason), 1)
except APIException as ex:
self.statusbar.SetStatusText("HTTP {} - {}".format(ex.status_code, ex.response["error"]), 1)
try:
ESIExceptionHandler(self, ex)
except (KeyboardInterrupt, SystemExit):
raise
except Exception as ex:
self.statusbar.SetStatusText(_t("ERROR"), 0)
self.statusbar.SetStatusText("{} - {}".format(res.status_code, res.reason), 1)
pyfalog.error(ex)
ESIExceptionHandler(ex)
except:
# don't need to do anything - we should already get the error in ex.response
pass
except Exception as ex:
self.statusbar.SetStatusText(_t("ERROR"), 0)
self.statusbar.SetStatusText("Unknown error", 1)
pyfalog.error(ex)
class SsoCharacterMgmt(AuxiliaryFrame):

View File

@@ -38,7 +38,7 @@ class SsoLogin(wx.Dialog):
from service.esi import Esi
self.sEsi = Esi.getInstance()
uri = self.sEsi.getLoginURI(None)
uri = self.sEsi.get_login_uri(None)
webbrowser.open(uri)
@@ -53,7 +53,7 @@ class SsoLoginServer(wx.Dialog):
self.sEsi = Esi.getInstance()
serverAddr = self.sEsi.startServer(port)
uri = self.sEsi.getLoginURI(serverAddr)
uri = self.sEsi.get_login_uri(serverAddr)
bSizer1 = wx.BoxSizer(wx.VERTICAL)
self.mainFrame.Bind(GE.EVT_SSO_LOGIN, self.OnLogin)

View File

@@ -11,3 +11,5 @@ packaging >= 16.8
roman >= 2.0.0
beautifulsoup4 >= 4.6.0
pyyaml >= 5.1
python-jose==3.0.1
requests-cache==0.8.1

View File

@@ -11,7 +11,7 @@ import webbrowser
import eos.db
from service.const import EsiLoginMethod, EsiSsoMode
from eos.saveddata.ssocharacter import SsoCharacter
from service.esiAccess import APIException
from service.esiAccess import APIException, GenericSsoError
import gui.globalEvents as GE
from gui.ssoLogin import SsoLogin, SsoLoginServer
from service.server import StoppableHTTPServer, AuthHandler
@@ -102,13 +102,14 @@ class Esi(EsiAccess):
def login(self):
# always start the local server if user is using client details. Otherwise, start only if they choose to do so.
if self.settings.get('ssoMode') == EsiSsoMode.CUSTOM or self.settings.get('loginMode') == EsiLoginMethod.SERVER:
with gui.ssoLogin.SsoLoginServer(6461 if self.settings.get('ssoMode') == EsiSsoMode.CUSTOM else 0) as dlg:
if self.settings.get('loginMode') == EsiLoginMethod.SERVER:
with gui.ssoLogin.SsoLoginServer(0) as dlg:
dlg.ShowModal()
else:
with gui.ssoLogin.SsoLogin() as dlg:
if dlg.ShowModal() == wx.ID_OK:
self.handleLogin({'SSOInfo': [dlg.ssoInfoCtrl.Value.strip()]})
message = json.loads(base64.b64decode(dlg.ssoInfoCtrl.Value.strip()))
self.handleLogin(message)
def stopServer(self):
pyfalog.debug("Stopping Server")
@@ -134,31 +135,16 @@ class Esi(EsiAccess):
return 'http://localhost:{}'.format(port)
def handleLogin(self, message):
auth_response, data = self.auth(message['code'])
# we already have authenticated stuff for the auto mode
if self.settings.get('ssoMode') == EsiSsoMode.AUTO:
ssoInfo = message['SSOInfo'][0]
auth_response = json.loads(base64.b64decode(ssoInfo))
else:
# otherwise, we need to fetch the information
auth_response = self.auth(message['code'][0])
res = self._session.get(
self.oauth_verify,
headers=self.get_oauth_header(auth_response['access_token'])
)
if res.status_code != 200:
raise APIException(
self.oauth_verify,
res.status_code,
res.json()
)
cdata = res.json()
currentCharacter = self.getSsoCharacter(cdata['CharacterName'])
currentCharacter = self.getSsoCharacter(data['name'])
sub_split = data["sub"].split(":")
if (len(sub_split) != 3):
raise GenericSsoError("JWT sub does not contain the expected data. Contents: %s" % data["sub"])
cid = sub_split[-1]
if currentCharacter is None:
currentCharacter = SsoCharacter(cdata['CharacterID'], cdata['CharacterName'], config.getClientSecret())
currentCharacter = SsoCharacter(cid, data['name'], config.getClientSecret())
Esi.update_token(currentCharacter, auth_response)
@@ -169,11 +155,17 @@ class Esi(EsiAccess):
def handleServerLogin(self, message):
if not message:
raise Exception("Could not parse out querystring parameters.")
raise GenericSsoError("Could not parse out querystring parameters.")
if message['state'][0] != self.state:
try:
state_enc = message['state']
state = json.loads(base64.b64decode(state_enc))['state']
except Exception:
raise GenericSsoError("There was a problem decoding state parameter.")
if state != self.state:
pyfalog.warn("OAUTH state mismatch")
raise Exception("OAUTH State Mismatch.")
raise GenericSsoError("OAUTH State Mismatch.")
pyfalog.debug("Handling SSO login with: {0}", message)

View File

@@ -1,49 +1,49 @@
"""
A lot of the inspiration (and straight up code copying!) for this class comes from EsiPy <https://github.com/Kyria/EsiPy>
Much of the credit goes to the maintainer of that package, Kyria <tweetfleet slack: @althalus>. The reasoning for no
longer using EsiPy was due to it's reliance on pyswagger, which has caused a bit of a headache in how it operates on a
low level.
Eventually I'll rewrite this to be a bit cleaner and a bit more generic, but for now, it works!
"""
# noinspection PyPackageRequirements
from collections import namedtuple
from logbook import Logger
import uuid
import time
import config
import base64
import secrets
import hashlib
import json
from jose import jwt
from jose.exceptions import ExpiredSignatureError, JWTError, JWTClaimsError
import os
import datetime
from service.const import EsiSsoMode, EsiEndpoints
from service.settings import EsiSettings, NetworkSettings
from datetime import timedelta
from requests_cache import CachedSession
from requests import Session
from urllib.parse import urlencode, quote
from urllib.parse import urlencode
pyfalog = Logger(__name__)
# todo: reimplement Caching for calls
# from esipy.cache import FileCache
# file_cache = FileCache(cache_path)
# cache_path = os.path.join(config.savePath, config.ESI_CACHE)
#
# if not os.path.exists(cache_path):
# os.mkdir(cache_path)
#
scopes = [
'esi-skills.read_skills.v1',
'esi-fittings.read_fittings.v1',
'esi-fittings.write_fittings.v1'
]
ApiBase = namedtuple('ApiBase', ['sso', 'esi'])
supported_servers = {
"Tranquility": ApiBase("login.eveonline.com", "esi.evetech.net"),
"Singularity": ApiBase("sisilogin.testeveonline.com", "esi.evetech.net"),
"Serenity": ApiBase("login.evepc.163.com", "esi.evepc.163.com")
}
class GenericSsoError(Exception):
""" Exception used for generic SSO errors that aren't directly related to an API call
"""
pass
class APIException(Exception):
""" Exception for SSO related errors """
""" Exception for API related errors """
def __init__(self, url, code, json_response):
self.url = url
@@ -51,10 +51,11 @@ class APIException(Exception):
self.response = json_response
super(APIException, self).__init__(str(self))
def __str__(self):
if 'error' in self.response:
if 'error_description' in self.response:
return 'HTTP Error %s: %s' % (self.status_code,
self.response['error'])
self.response['error_description'])
elif 'message' in self.response:
return 'HTTP Error %s: %s' % (self.status_code,
self.response['message'])
@@ -64,6 +65,7 @@ class APIException(Exception):
class EsiAccess:
def __init__(self):
self.settings = EsiSettings.getInstance()
self.server_base: ApiBase = supported_servers[self.settings.get("server")]
# session request stuff
self._session = Session()
@@ -76,46 +78,45 @@ class EsiAccess:
self._session.headers.update(self._basicHeaders)
self._session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
# Set up cached session. This is only used for SSO meta data for now, but can be expanded to actually handle
# various ESI caching (using ETag, for example) in the future
cached_session = CachedSession(
os.path.join(config.savePath, config.ESI_CACHE),
backend="sqlite",
cache_control=True, # Use Cache-Control headers for expiration, if available
expire_after=timedelta(days=1), # Otherwise expire responses after one day
stale_if_error=True, # In case of request errors, use stale cache data if possible
)
cached_session.headers.update(self._basicHeaders)
cached_session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
meta_call = cached_session.get("https://%s/.well-known/oauth-authorization-server" % self.server_base.sso)
meta_call.raise_for_status()
self.server_meta = meta_call.json()
jwks_call = cached_session.get(self.server_meta["jwks_uri"])
jwks_call.raise_for_status()
self.jwks = jwks_call.json()
@property
def sso_url(self):
if self.settings.get("ssoMode") == EsiSsoMode.CUSTOM:
return "https://login.eveonline.com"
return "https://www.pyfa.io"
return 'https://%s/v2' % self.server_base.sso
@property
def esi_url(self):
return "https://esi.evetech.net"
@property
def oauth_verify(self):
return '%s/verify/' % self.esi_url
return 'https://%s' % self.server_base.esi
@property
def oauth_authorize(self):
return '%s/oauth/authorize' % self.sso_url
return self.server_meta["authorization_endpoint"]
@property
def oauth_token(self):
return '%s/oauth/token' % self.sso_url
return self.server_meta["token_endpoint"]
def getDynamicItem(self, typeID, itemID):
return self.get(None, EsiEndpoints.DYNAMIC_ITEM.value, type_id=typeID, item_id=itemID)
def getSkills(self, char):
return self.get(char, EsiEndpoints.CHAR_SKILLS.value, character_id=char.characterID)
def getSecStatus(self, char):
return self.get(char, EsiEndpoints.CHAR.value, character_id=char.characterID)
def getFittings(self, char):
return self.get(char, EsiEndpoints.CHAR_FITTINGS.value, character_id=char.characterID)
def postFitting(self, char, json_str):
# @todo: new fitting ID can be recovered from resp.data,
return self.post(char, EsiEndpoints.CHAR_FITTINGS.value, json_str, character_id=char.characterID)
def delFitting(self, char, fittingID):
return self.delete(char, EsiEndpoints.CHAR_DEL_FIT.value, character_id=char.characterID, fitting_id=fittingID)
@property
def client_id(self):
return self.settings.get('clientID') or config.API_CLIENT_ID
@staticmethod
def update_token(char, tokenResponse):
@@ -125,33 +126,36 @@ class EsiAccess:
if 'refresh_token' in tokenResponse:
char.refreshToken = config.cipher.encrypt(tokenResponse['refresh_token'].encode())
def getLoginURI(self, redirect=None):
def get_login_uri(self, redirect=None):
self.state = str(uuid.uuid4())
if self.settings.get("ssoMode") == EsiSsoMode.AUTO:
args = {
'state': self.state,
'pyfa_version': config.version,
'login_method': self.settings.get('loginMode'),
'client_hash': config.getClientSecret()
}
# Generate the PKCE code challenge
self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32))
m = hashlib.sha256()
m.update(self.code_verifier)
d = m.digest()
code_challenge = base64.urlsafe_b64encode(d).decode().replace("=", "")
if redirect is not None:
args['redirect'] = redirect
state_arg = {
'mode': self.settings.get('loginMode'),
'redirect': redirect,
'state': self.state
}
return '%s?%s' % (
self.oauth_authorize,
urlencode(args)
)
else:
return '%s?response_type=%s&redirect_uri=%s&client_id=%s%s%s' % (
self.oauth_authorize,
'code',
quote('http://localhost:6461', safe=''),
self.settings.get('clientID'),
'&scope=%s' % '+'.join(scopes) if scopes else '',
'&state=%s' % self.state
)
args = {
'response_type': 'code',
'redirect_uri': config.SSO_CALLBACK,
'client_id': self.client_id,
'scope': ' '.join(scopes),
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
'state': base64.b64encode(bytes(json.dumps(state_arg), 'utf-8'))
}
return '%s?%s' % (
self.oauth_authorize,
urlencode(args)
)
def get_oauth_header(self, token):
""" Return the Bearer Authorization header required in oauth calls
@@ -160,85 +164,87 @@ class EsiAccess:
"""
return {'Authorization': 'Bearer %s' % token}
def get_refresh_token_params(self, refreshToken):
""" Return the param object for the post() call to get the access_token
from the refresh_token
:param code: the refresh token
:return: a dict with the url, params and header
"""
if refreshToken is None:
raise AttributeError('No refresh token is defined.')
data = {
'grant_type': 'refresh_token',
'refresh_token': refreshToken,
}
if self.settings.get('ssoMode') == EsiSsoMode.AUTO:
# data is all we really need, the rest is handled automatically by pyfa.io
return {
'data': data,
'url': self.oauth_token,
}
# otherwise, we need to make the token with the client keys
return self.__make_token_request_parameters(data)
def __get_token_auth_header(self):
""" Return the Basic Authorization header required to get the tokens
:return: a dict with the headers
"""
# encode/decode for py2/py3 compatibility
auth_b64 = "%s:%s" % (self.settings.get('clientID'), self.settings.get('clientSecret'))
auth_b64 = base64.b64encode(auth_b64.encode('latin-1'))
auth_b64 = auth_b64.decode('latin-1')
return {'Authorization': 'Basic %s' % auth_b64}
def __make_token_request_parameters(self, params):
request_params = {
'headers': self.__get_token_auth_header(),
'data': params,
'url': self.oauth_token,
}
return request_params
def get_access_token_request_params(self, code):
return self.__make_token_request_parameters(
{
'grant_type': 'authorization_code',
'code': code,
}
)
def auth(self, code):
request_data = self.get_access_token_request_params(code)
res = self._session.post(**request_data)
if res.status_code != 200:
raise Exception(
request_data['url'],
res.status_code,
res.json()
)
values = {
'grant_type': 'authorization_code',
'code': code,
'client_id': self.client_id,
"code_verifier": self.code_verifier
}
res = self.token_call(values)
json_res = res.json()
return json_res
decoded_jwt = self.validate_eve_jwt(json_res['access_token'])
return json_res, decoded_jwt
def refresh(self, ssoChar):
request_data = self.get_refresh_token_params(config.cipher.decrypt(ssoChar.refreshToken).decode())
res = self._session.post(**request_data)
if res.status_code != 200:
raise APIException(
request_data['url'],
res.status_code,
res.json()
)
# todo: properly handle invalid refresh token
values = {
"grant_type": "refresh_token",
"refresh_token": config.cipher.decrypt(ssoChar.refreshToken).decode(),
"client_id": self.client_id,
}
res = self.token_call(values)
json_res = res.json()
self.update_token(ssoChar, json_res)
return json_res
def token_call(self, values):
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Host": self.server_base.sso,
}
res = self._session.post(
self.server_meta["token_endpoint"],
data=values,
headers=headers,
)
if res.status_code != 200:
raise APIException(
self.server_meta["token_endpoint"],
res.status_code,
res.json()
)
return res
def validate_eve_jwt(self, jwt_token):
"""Validate a JWT token retrieved from the EVE SSO.
Args:
jwt_token: A JWT token originating from the EVE SSO
Returns
dict: The contents of the validated JWT token if there are no
validation errors
"""
try:
jwk_sets = self.jwks["keys"]
except KeyError as e:
raise GenericSsoError("Something went wrong when retrieving the JWK set. The returned "
"payload did not have the expected key {}. \nPayload returned "
"from the SSO looks like: {}".format(e, self.jwks))
jwk_set = next((item for item in jwk_sets if item["alg"] == "RS256"))
try:
return jwt.decode(
jwt_token,
jwk_set,
algorithms=jwk_set["alg"],
issuer=[self.server_base.sso, "https://%s" % self.server_base.sso]
)
except ExpiredSignatureError as e:
raise GenericSsoError("The JWT token has expired: {}".format(str(e)))
except JWTError as e:
raise GenericSsoError("The JWT signature was invalid: {}".format(str(e)))
except JWTClaimsError as e:
raise GenericSsoError("The issuer claim was not from login.eveonline.com or "
"https://login.eveonline.com: {}".format(str(e)))
def _before_request(self, ssoChar):
self._session.headers.clear()
self._session.headers.update(self._basicHeaders)
@@ -279,3 +285,24 @@ class EsiAccess:
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.delete("{}{}".format(self.esi_url, endpoint)))
# todo: move these off to another class which extends this one. This class should only handle the low level
# authentication and
def getDynamicItem(self, typeID, itemID):
return self.get(None, EsiEndpoints.DYNAMIC_ITEM.value, type_id=typeID, item_id=itemID)
def getSkills(self, char):
return self.get(char, EsiEndpoints.CHAR_SKILLS.value, character_id=char.characterID)
def getSecStatus(self, char):
return self.get(char, EsiEndpoints.CHAR.value, character_id=char.characterID)
def getFittings(self, char):
return self.get(char, EsiEndpoints.CHAR_FITTINGS.value, character_id=char.characterID)
def postFitting(self, char, json_str):
# @todo: new fitting ID can be recovered from resp.data,
return self.post(char, EsiEndpoints.CHAR_FITTINGS.value, json_str, character_id=char.characterID)
def delFitting(self, char, fittingID):
return self.delete(char, EsiEndpoints.CHAR_DEL_FIT.value, character_id=char.characterID, fitting_id=fittingID)

View File

@@ -4,103 +4,54 @@ import socket
import threading
from logbook import Logger
import socketserver
import json
import traceback
from service.esiAccess import APIException, GenericSsoError
pyfalog = Logger(__name__)
# noinspection PyPep8
HTML = '''
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-type" content="text/html;charset=UTF-8">
<title>pyfa Local Server</title>
<style type="text/css">
body {{ text-align: center; padding: 150px; }}
h1 {{ font-size: 40px; }}
h2 {{ font-size: 32px; }}
body {{ font: 20px Helvetica, sans-serif; color: #333; }}
#article {{ display: block; text-align: left; width: 650px; margin: 0 auto; }}
a {{ color: #dc8100; text-decoration: none; }}
a:hover {{ color: #333; text-decoration: none; }}
</style>
</head>
<body>
<!-- Layout from Short Circuit's CREST login. Shout out! https://github.com/farshield/shortcircuit -->
<div id="article">
<h1>pyfa</h1>
{0}
</div>
<script type="text/javascript">
function extractFromHash(name, hash) {{
var match = hash.match(new RegExp(name + "=([^&]+)"));
return !!match && match[1];
}}
var hash = window.location.hash;
var token = extractFromHash("access_token", hash);
var step2 = extractFromHash("step2", hash);
function doRedirect() {{
if (token){{
// implicit authentication
var redirect = window.location.origin.concat('/?', window.location.hash.substr(1), '&step=2');
window.location = redirect;
}}
else {{
// user-defined
var redirect = window.location.href + '&step=2';
window.location = redirect;
}}
}}
// do redirect if we are not already on step 2
if (window.location.href.indexOf('step=2') == -1) {{
setTimeout(doRedirect(), 1000);
}}
</script>
</body>
</html>
'''
# https://github.com/fuzzysteve/CREST-Market-Downloader/
class AuthHandler(http.server.BaseHTTPRequestHandler):
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
def do_GET(self):
if self.path == "/favicon.ico":
return
parsed_path = urllib.parse.urlparse(self.path)
parts = urllib.parse.parse_qs(parsed_path.query)
msg = ""
step2 = 'step' in parts
parts = {k: ";".join(v) for k, v in urllib.parse.parse_qs(parsed_path.query).items()}
is_success = False
try:
if step2:
self.server.callback(parts)
pyfalog.info("Successfully logged into EVE.")
msg = "If you see this message then it means you should be logged into EVE SSO. You may close this window and return to the application."
else:
# For implicit mode, we have to serve up the page which will take the hash and redirect using a querystring
pyfalog.info("Processing response from EVE Online.")
msg = "Processing response from EVE Online"
self.server.callback(parts)
pyfalog.info("Successfully logged into EVE.")
is_success = True
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
except (KeyboardInterrupt, SystemExit):
raise
except (GenericSsoError, APIException) as ex:
pyfalog.error("Error logging into EVE")
pyfalog.error(ex)
self.send_response(400)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(str.encode(str(ex)))
except Exception as ex:
pyfalog.error("Error logging into EVE")
pyfalog.error(ex)
msg = "<h2>Error</h2>\n<p>{}</p>".format(ex.message)
finally:
self.send_response(200)
self.send_response(500)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(str.encode(HTML.format(msg)))
self.wfile.write(str.encode(str(''.join(traceback.format_tb(ex.__traceback__)))))
if step2:
# Only stop once if we've received something in the querystring
# send error
if is_success:
self.server.stop()
def log_message(self, format, *args):

View File

@@ -374,6 +374,7 @@ class EsiSettings:
"clientID": "",
"clientSecret": "",
"timeout": 60,
"server": "Tranquility",
"exportCharges": True}
self.settings = SettingsProvider.getInstance().getSettings(