Merge pull request #2590 from huangzheng2016/master

Update the SSO Login for Serenity and Singularity server's player
This commit is contained in:
Anton Vorobyov
2024-02-26 23:35:31 +04:00
committed by GitHub
12 changed files with 239 additions and 125 deletions

View File

@@ -9,6 +9,7 @@ import hashlib
from eos.const import FittingSlot
from cryptography.fernet import Fernet
from collections import namedtuple
pyfalog = Logger(__name__)
@@ -44,9 +45,16 @@ experimentalFeatures = None
version = None
language = None
API_CLIENT_ID = '095d8cd841ac40b581330919b49fe746'
ApiServer = namedtuple('ApiBase', ['name', 'sso', 'esi', 'client_id', 'callback', 'supports_auto_login'])
supported_servers = {
"Tranquility": ApiServer("Tranquility", "login.eveonline.com", "esi.evetech.net", '095d8cd841ac40b581330919b49fe746', 'https://pyfa-org.github.io/Pyfa/callback', True),
# No point having SISI: https://developers.eveonline.com/blog/article/removing-datasource-singularity
# "Singularity": ApiServer("Singularity", "sisilogin.testeveonline.com", "esi.evetech.net", 'b9c3cc79448f449ab17f3aebd018842e', 'https://pyfa-org.github.io/Pyfa/callback'),
"Serenity": ApiServer("Serenity", "login.evepc.163.com", "ali-esi.evepc.163.com", 'bc90aa496a404724a93f41b4f4e97761', 'https://ali-esi.evepc.163.com/ui/oauth2-redirect.html', False)
}
SSO_LOGOFF_SERENITY='https://login.evepc.163.com/account/logoff'
ESI_CACHE = 'esi_cache'
SSO_CALLBACK = 'https://pyfa-org.github.io/Pyfa/callback'
LOGLEVEL_MAP = {
"critical": CRITICAL,

View File

@@ -0,0 +1,19 @@
"""
Migration 28
- adds baseItemID and mutaplasmidID to modules table
"""
import sqlalchemy
def upgrade(saveddata_engine):
try:
saveddata_engine.execute("SELECT server FROM ssoCharacter LIMIT 1")
except sqlalchemy.exc.DatabaseError:
saveddata_engine.execute("ALTER TABLE ssoCharacter ADD COLUMN server VARCHAR;")
saveddata_engine.execute("UPDATE ssoCharacter SET server = 'Tranquility';")
# update all characters to TQ

View File

@@ -44,6 +44,7 @@ sso_table = Table("ssoCharacter", saveddata_meta,
Column("client", String, nullable=False),
Column("characterID", Integer, nullable=False),
Column("characterName", String, nullable=False),
Column("server", String, nullable=False),
Column("refreshToken", String, nullable=False),
Column("accessToken", String, nullable=False),
Column("accessTokenExpires", DateTime, nullable=False),

View File

@@ -493,9 +493,12 @@ def getSsoCharacters(clientHash, eager=None):
@cachedQuery(SsoCharacter, 1, "lookfor", "clientHash")
def getSsoCharacter(lookfor, clientHash, eager=None):
def getSsoCharacter(lookfor, clientHash, server=None, eager=None):
filter = SsoCharacter.client == clientHash
if server is not None:
filter = and_(filter, SsoCharacter.server == server)
if isinstance(lookfor, int):
filter = and_(filter, SsoCharacter.ID == lookfor)
elif isinstance(lookfor, str):

View File

@@ -25,10 +25,11 @@ import time
class SsoCharacter:
def __init__(self, charID, name, client, accessToken=None, refreshToken=None):
def __init__(self, charID, name, client, server, accessToken=None, refreshToken=None):
self.characterID = charID
self.characterName = name
self.client = client
self.server = server
self.accessToken = accessToken
self.refreshToken = refreshToken
self.accessTokenExpires = None
@@ -37,6 +38,9 @@ class SsoCharacter:
def init(self):
pass
@property
def characterDisplay(self):
return "{} [{}]".format(self.characterName, self.server)
def is_token_expired(self):
if self.accessTokenExpires is None:
return True

View File

@@ -1,9 +1,11 @@
# noinspection PyPackageRequirements
import wx
import config
import gui.mainFrame
from gui.bitmap_loader import BitmapLoader
from gui.preferenceView import PreferenceView
from service.esi import Esi
from service.settings import EsiSettings
# noinspection PyPackageRequirements
@@ -41,38 +43,68 @@ class PFEsiPref(PreferenceView):
"due to 'Signature has expired' error")))
mainSizer.Add(self.enforceJwtExpiration, 0, wx.ALL | wx.EXPAND, 5)
self.ssoServer = wx.CheckBox(panel, wx.ID_ANY, _t("Auto-login (starts local server)"), wx.DefaultPosition,
wx.DefaultSize,
0)
self.ssoServer.SetToolTip(wx.ToolTip(_t("This allows the EVE SSO to callback to your local pyfa instance and complete the authentication process without manual intervention.")))
mainSizer.Add(self.ssoServer, 0, wx.ALL | wx.EXPAND, 5)
rbSizer = wx.BoxSizer(wx.HORIZONTAL)
self.rbMode = wx.RadioBox(panel, -1, _t("Login Authentication Method"), wx.DefaultPosition, wx.DefaultSize,
[_t('Local Server'), _t('Manual')], 1, wx.RA_SPECIFY_COLS)
self.rbMode.SetItemToolTip(0, _t("This option starts a local webserver that EVE SSO Server will call back to"
" with information about the character login."))
self.rbMode.SetItemToolTip(1, _t("This option prompts users to copy and paste information to allow for"
" character login. Use this if having issues with the local server."))
self.rbMode.SetSelection(self.settings.get('loginMode'))
self.enforceJwtExpiration.SetValue(self.settings.get("enforceJwtExpiration" or True))
self.enforceJwtExpiration.SetValue(self.settings.get("enforceJwtExpiration") or True)
self.ssoServer.SetValue(True if self.settings.get("loginMode") == 0 else False)
rbSizer.Add(self.rbMode, 1, wx.TOP | wx.RIGHT, 5)
mainSizer.Add(rbSizer, 0, wx.ALL | wx.EXPAND, 0)
self.rbMode.Bind(wx.EVT_RADIOBOX, self.OnModeChange)
esiSizer = wx.BoxSizer(wx.HORIZONTAL)
self.esiServer = wx.StaticText(panel, wx.ID_ANY, _t("Default SSO Server:"), wx.DefaultPosition, wx.DefaultSize, 0)
self.esiServer.Wrap(-1)
esiSizer.Add(self.esiServer, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 5)
self.esiServer.SetToolTip(wx.ToolTip(_t('The source you choose will be used on connection.')))
self.chESIserver = wx.Choice(panel, choices=list(self.settings.keys()))
self.chESIserver.SetStringSelection(self.settings.get("server"))
esiSizer.Add(self.chESIserver, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 10)
mainSizer.Add(esiSizer, 0, wx.TOP | wx.RIGHT, 10)
self.chESIserver.Bind(wx.EVT_CHOICE, self.OnServerChange)
self.enforceJwtExpiration.Bind(wx.EVT_CHECKBOX, self.OnEnforceChange)
mainSizer.Add(rbSizer, 1, wx.ALL | wx.EXPAND, 0)
self.ssoServer.Bind(wx.EVT_CHECKBOX, self.OnModeChange)
panel.SetSizer(mainSizer)
panel.Layout()
def OnTimeoutChange(self, event):
self.settings.set('timeout', event.GetEventObject().GetValue())
event.Skip()
def OnModeChange(self, event):
self.settings.set('loginMode', event.GetInt())
self.settings.set('loginMode', 0 if self.ssoServer.GetValue() else 1)
event.Skip()
def OnEnforceChange(self, event):
self.settings.set('enforceJwtExpiration', self.enforceJwtExpiration.GetValue())
event.Skip()
def OnServerChange(self, event):
# pass
source = self.chESIserver.GetString(self.chESIserver.GetSelection())
esiService = Esi.getInstance()
# init servers
esiService.init(config.supported_servers[source])
self.settings.set("server", source)
event.Skip()
def getImage(self):
return BitmapLoader.getBitmap("eve", "gui")
PFEsiPref.register()
PFEsiPref.register()

View File

@@ -864,7 +864,7 @@ class APIView(wx.Panel):
noneID = self.charChoice.Append(_t("None"), None)
for char in ssoChars:
currId = self.charChoice.Append(char.characterName, char.ID)
currId = self.charChoice.Append(char.characterDisplay, char.ID)
if sso is not None and char.ID == sso.ID:
self.charChoice.SetSelection(currId)

View File

@@ -96,7 +96,7 @@ class EveFittings(AuxiliaryFrame):
self.charChoice.Clear()
for char in chars:
self.charChoice.Append(char.characterName, char.ID)
self.charChoice.Append(char.characterDisplay, char.ID)
if len(chars) > 0:
self.charChoice.SetSelection(0)
@@ -227,21 +227,6 @@ class EveFittings(AuxiliaryFrame):
self.fitView.update([])
class ESIServerExceptionHandler:
def __init__(self, parentWindow, ex):
pyfalog.error(ex)
with wx.MessageDialog(
parentWindow,
_t("There was an issue starting up the localized server, try setting "
"Login Authentication Method to Manual by going to Preferences -> EVE SS0 -> "
"Login Authentication Method. If this doesn't fix the problem please file an "
"issue on Github."),
_t("Add Character Error"),
wx.OK | wx.ICON_ERROR
) as dlg:
dlg.ShowModal()
class ESIExceptionHandler:
# todo: make this a generate excetpion handler for all calls
def __init__(self, ex):
@@ -348,7 +333,7 @@ class ExportToEve(AuxiliaryFrame):
self.charChoice.Clear()
for char in chars:
self.charChoice.Append(char.characterName, char.ID)
self.charChoice.Append(char.characterDisplay, char.ID)
if len(chars) > 0:
self.charChoice.SetSelection(0)
@@ -434,6 +419,7 @@ class SsoCharacterMgmt(AuxiliaryFrame):
self.lcCharacters.InsertColumn(0, heading=_t('Character'))
self.lcCharacters.InsertColumn(1, heading=_t('Character ID'))
self.lcCharacters.InsertColumn(2, heading=_t('Server'))
self.popCharList()
@@ -496,9 +482,11 @@ class SsoCharacterMgmt(AuxiliaryFrame):
self.lcCharacters.InsertItem(index, char.characterName)
self.lcCharacters.SetItem(index, 1, str(char.characterID))
self.lcCharacters.SetItemData(index, char.ID)
self.lcCharacters.SetItem(index, 2, char.server or "<unknown>")
self.lcCharacters.SetColumnWidth(0, wx.LIST_AUTOSIZE)
self.lcCharacters.SetColumnWidth(1, wx.LIST_AUTOSIZE)
self.lcCharacters.SetColumnWidth(2, wx.LIST_AUTOSIZE)
def addChar(self, event):
try:
@@ -506,8 +494,6 @@ class SsoCharacterMgmt(AuxiliaryFrame):
sEsi.login()
except (KeyboardInterrupt, SystemExit):
raise
except Exception as ex:
ESIServerExceptionHandler(self, ex)
def delChar(self, event):
item = self.lcCharacters.GetFirstSelected()

View File

@@ -2,30 +2,48 @@ import wx
import gui.mainFrame
import webbrowser
import gui.globalEvents as GE
import config
import time
from service.settings import EsiSettings
_t = wx.GetTranslation
class SsoLogin(wx.Dialog):
def __init__(self):
mainFrame = gui.mainFrame.MainFrame.getInstance()
def __init__(self, server: config.ApiServer, start_local_server=True):
self.mainFrame = gui.mainFrame.MainFrame.getInstance()
from service.esi import Esi
super().__init__(
mainFrame, id=wx.ID_ANY, title=_t("SSO Login"), style=wx.DEFAULT_DIALOG_STYLE,
self.mainFrame, id=wx.ID_ANY, title=_t("SSO Login"), style=wx.DEFAULT_DIALOG_STYLE,
size=wx.Size(450, 240) if "wxGTK" in wx.PlatformInfo else wx.Size(400, 240))
bSizer1 = wx.BoxSizer(wx.VERTICAL)
text = wx.StaticText(self, wx.ID_ANY, _t("Copy and paste the block of text provided by pyfa.io"))
bSizer1.Add(text, 0, wx.ALL | wx.EXPAND, 10)
if start_local_server:
text = wx.StaticText(self, wx.ID_ANY, _t("Waiting for character login through EVE Single Sign-On."))
bSizer1.Add(text, 0, wx.ALL | wx.EXPAND, 10)
bSizer1.Add(wx.StaticLine(self, wx.ID_ANY), 0, wx.EXPAND, 15)
text = wx.StaticText(self, wx.ID_ANY, _t("If auto-login fails, copy and paste the token provided by pyfa.io"))
bSizer1.Add(text, 0, wx.ALL | wx.EXPAND, 10)
elif server.name == "Serenity":
text = wx.StaticText(self, wx.ID_ANY, _t("Please copy and paste the url when your authorization is completed"))
bSizer1.Add(text, 0, wx.ALL | wx.EXPAND, 10)
else:
text = wx.StaticText(self, wx.ID_ANY, _t("Please copy and paste the token provided by pyfa.io"))
bSizer1.Add(text, 0, wx.ALL | wx.EXPAND, 10)
self.ssoInfoCtrl = wx.TextCtrl(self, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, (-1, -1), style=wx.TE_MULTILINE)
self.ssoInfoCtrl.SetFont(wx.Font(8, wx.FONTFAMILY_TELETYPE, wx.NORMAL, wx.NORMAL))
self.ssoInfoCtrl.Layout()
self.ssoInfoCtrl.Bind(wx.EVT_TEXT, self.OnTextEnter)
bSizer1.Add(self.ssoInfoCtrl, 1, wx.LEFT | wx.RIGHT | wx.EXPAND, 10)
self.Esisettings = EsiSettings.getInstance()
bSizer3 = wx.BoxSizer(wx.VERTICAL)
bSizer3.Add(wx.StaticLine(self, wx.ID_ANY), 0, wx.BOTTOM | wx.EXPAND, 10)
@@ -34,51 +52,43 @@ class SsoLogin(wx.Dialog):
self.SetSizer(bSizer1)
self.Center()
from service.esi import Esi
self.sEsi = Esi.getInstance()
uri = self.sEsi.get_login_uri(None)
webbrowser.open(uri)
class SsoLoginServer(wx.Dialog):
def __init__(self, port):
self.mainFrame = gui.mainFrame.MainFrame.getInstance()
super().__init__(self.mainFrame, id=wx.ID_ANY, title=_t("SSO Login"), size=(-1, -1), style=wx.DEFAULT_DIALOG_STYLE)
from service.esi import Esi
self.sEsi = Esi.getInstance()
serverAddr = self.sEsi.startServer(port)
serverAddr = self.sEsi.startServer(0) if start_local_server else None
uri = self.sEsi.get_login_uri(serverAddr)
bSizer1 = wx.BoxSizer(wx.VERTICAL)
self.mainFrame.Bind(GE.EVT_SSO_LOGIN, self.OnLogin)
self.Bind(wx.EVT_WINDOW_DESTROY, self.OnDestroy)
if server.name == "Serenity":
webbrowser.open(config.SSO_LOGOFF_SERENITY)
time.sleep(1)
text = wx.StaticText(self, wx.ID_ANY, _t("Waiting for character login through EVE Single Sign-On."))
bSizer1.Add(text, 0, wx.ALL | wx.EXPAND, 10)
bSizer3 = wx.BoxSizer(wx.VERTICAL)
bSizer3.Add(wx.StaticLine(self, wx.ID_ANY), 0, wx.BOTTOM | wx.EXPAND, 10)
bSizer3.Add(self.CreateStdDialogButtonSizer(wx.CANCEL), 0, wx.EXPAND)
bSizer1.Add(bSizer3, 0, wx.BOTTOM | wx.RIGHT | wx.LEFT | wx.EXPAND, 10)
self.SetSizer(bSizer1)
self.Fit()
self.Center()
self.okBtn = self.FindWindow(wx.ID_OK)
self.okBtn.Enable(False)
# Ensure we clean up once they hit the "OK" button
self.okBtn.Bind(wx.EVT_BUTTON, self.OnDestroy)
webbrowser.open(uri)
self.mainFrame.Bind(GE.EVT_SSO_LOGIN, self.OnLogin)
# Ensure we clean up if ESC is pressed
self.Bind(wx.EVT_WINDOW_DESTROY, self.OnDestroy)
def OnTextEnter(self, event):
t = event.String.strip()
if t == "":
self.okBtn.Enable(False)
else:
self.okBtn.Enable(True)
event.Skip()
def OnLogin(self, event):
self.EndModal(wx.ID_OK)
# This would normally happen if it was logged in via server auto-login. In this case, the modal is done, we effectively want to cancel out
self.EndModal(wx.ID_CANCEL)
event.Skip()
def OnDestroy(self, event):
# Clean up by unbinding some events and stopping the server
self.mainFrame.Unbind(GE.EVT_SSO_LOGIN, handler=self.OnLogin)
if self:
self.Unbind(wx.EVT_WINDOW_DESTROY, handler=self.OnDestroy)
self.sEsi.stopServer()
event.Skip()

View File

@@ -6,14 +6,14 @@ import time
import base64
import json
import config
import webbrowser
import re
import eos.db
from service.const import EsiLoginMethod, EsiSsoMode
from eos.saveddata.ssocharacter import SsoCharacter
from service.esiAccess import APIException, GenericSsoError
import gui.globalEvents as GE
from gui.ssoLogin import SsoLogin, SsoLoginServer
from gui.ssoLogin import SsoLogin
from service.server import StoppableHTTPServer, AuthHandler
from service.settings import EsiSettings
from service.esiAccess import EsiAccess
@@ -22,6 +22,7 @@ import gui.mainFrame
from requests import Session
pyfalog = Logger(__name__)
_t = wx.GetTranslation
class Esi(EsiAccess):
@@ -69,8 +70,8 @@ class Esi(EsiAccess):
chars = eos.db.getSsoCharacters(config.getClientSecret())
return chars
def getSsoCharacter(self, id):
char = eos.db.getSsoCharacter(id, config.getClientSecret())
def getSsoCharacter(self, id, server=None):
char = eos.db.getSsoCharacter(id, config.getClientSecret(), server)
eos.db.commit()
return char
@@ -101,15 +102,36 @@ class Esi(EsiAccess):
self.fittings_deleted.add(fittingID)
def login(self):
# always start the local server if user is using client details. Otherwise, start only if they choose to do so.
if self.settings.get('loginMode') == EsiLoginMethod.SERVER:
with gui.ssoLogin.SsoLoginServer(0) as dlg:
dlg.ShowModal()
else:
with gui.ssoLogin.SsoLogin() as dlg:
if dlg.ShowModal() == wx.ID_OK:
message = json.loads(base64.b64decode(dlg.ssoInfoCtrl.Value.strip()))
self.handleLogin(message)
start_server = self.settings.get('loginMode') == EsiLoginMethod.SERVER and self.server_base.supports_auto_login
with gui.ssoLogin.SsoLogin(self.server_base, start_server) as dlg:
if dlg.ShowModal() == wx.ID_OK:
from gui.esiFittings import ESIExceptionHandler
try:
if self.server_name == "Serenity":
s = re.search(r'(?<=code=)[a-zA-Z0-9\-_]*', dlg.ssoInfoCtrl.Value.strip())
if s:
# skip state verification and go directly through the auth code processing
self.handleLogin(s.group(0))
else:
pass
# todo: throw error
else:
self.handleServerRequest(json.loads(base64.b64decode(dlg.ssoInfoCtrl.Value.strip())))
except GenericSsoError as ex:
pyfalog.error(ex)
with wx.MessageDialog(
self.mainFrame,
str(ex),
_t("SSO Error"),
wx.OK | wx.ICON_ERROR
) as dlg:
dlg.ShowModal()
except APIException as ex:
pyfalog.error(ex)
ESIExceptionHandler(ex)
pass
def stopServer(self):
pyfalog.debug("Stopping Server")
@@ -127,24 +149,26 @@ class Esi(EsiAccess):
self.httpd = StoppableHTTPServer(('localhost', port), AuthHandler)
port = self.httpd.socket.getsockname()[1]
self.serverThread = threading.Thread(target=self.httpd.serve, args=(self.handleServerLogin,))
self.serverThread = threading.Thread(target=self.httpd.serve, args=(self.handleServerRequest,))
self.serverThread.name = "SsoCallbackServer"
self.serverThread.daemon = True
self.serverThread.start()
return 'http://localhost:{}'.format(port)
def handleLogin(self, message):
auth_response, data = self.auth(message['code'])
def handleLogin(self, code):
auth_response, data = self.auth(code)
currentCharacter = self.getSsoCharacter(data['name'])
currentCharacter = self.getSsoCharacter(data['name'], self.server_base.name)
sub_split = data["sub"].split(":")
if (len(sub_split) != 3):
if len(sub_split) != 3:
raise GenericSsoError("JWT sub does not contain the expected data. Contents: %s" % data["sub"])
cid = sub_split[-1]
if currentCharacter is None:
currentCharacter = SsoCharacter(cid, data['name'], config.getClientSecret())
currentCharacter = SsoCharacter(cid, data['name'], config.getClientSecret(), self.server_base.name)
Esi.update_token(currentCharacter, auth_response)
@@ -153,7 +177,7 @@ class Esi(EsiAccess):
# get (endpoint, char, data?)
def handleServerLogin(self, message):
def handleServerRequest(self, message):
if not message:
raise GenericSsoError("Could not parse out querystring parameters.")
@@ -169,4 +193,4 @@ class Esi(EsiAccess):
pyfalog.debug("Handling SSO login with: {0}", message)
self.handleLogin(message)
self.handleLogin(message['code'])

View File

@@ -1,6 +1,7 @@
# noinspection PyPackageRequirements
from collections import namedtuple
import requests
from logbook import Logger
import uuid
import time
@@ -30,13 +31,6 @@ scopes = [
'esi-fittings.write_fittings.v1'
]
ApiBase = namedtuple('ApiBase', ['sso', 'esi'])
supported_servers = {
"Tranquility": ApiBase("login.eveonline.com", "esi.evetech.net"),
"Singularity": ApiBase("sisilogin.testeveonline.com", "esi.evetech.net"),
"Serenity": ApiBase("login.evepc.163.com", "esi.evepc.163.com")
}
class GenericSsoError(Exception):
""" Exception used for generic SSO errors that aren't directly related to an API call
"""
@@ -63,10 +57,11 @@ class APIException(Exception):
class EsiAccess:
server_meta = {}
def __init__(self):
self.settings = EsiSettings.getInstance()
self.server_base: ApiBase = supported_servers[self.settings.get("server")]
self.default_server_name = self.settings.get('server')
self.default_server_base = config.supported_servers[self.default_server_name]
# session request stuff
self._session = Session()
self._basicHeaders = {
@@ -78,23 +73,38 @@ class EsiAccess:
self._session.headers.update(self._basicHeaders)
self._session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
self.mem_cached_session = {}
# Set up cached session. This is only used for SSO meta data for now, but can be expanded to actually handle
# various ESI caching (using ETag, for example) in the future
cached_session = CachedSession(
self.cached_session = CachedSession(
os.path.join(config.savePath, config.ESI_CACHE),
backend="sqlite",
cache_control=True, # Use Cache-Control headers for expiration, if available
expire_after=timedelta(days=1), # Otherwise expire responses after one day
stale_if_error=True, # In case of request errors, use stale cache data if possible
)
cached_session.headers.update(self._basicHeaders)
cached_session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
self.cached_session.headers.update(self._basicHeaders)
self.cached_session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
self.init(self.default_server_base)
def init(self, server_base):
self.server_base: config.ApiServer = server_base
self.server_name = self.server_base.name
try:
meta_call = self.cached_session.get("https://%s/.well-known/oauth-authorization-server" % self.server_base.sso)
except:
# The http data of expire_after in evepc.163.com is -1
meta_call = requests.get("https://%s/.well-known/oauth-authorization-server" % self.server_base.sso)
meta_call = cached_session.get("https://%s/.well-known/oauth-authorization-server" % self.server_base.sso)
meta_call.raise_for_status()
self.server_meta = meta_call.json()
jwks_call = cached_session.get(self.server_meta["jwks_uri"])
try:
jwks_call = self.cached_session.get(self.server_meta["jwks_uri"])
except:
jwks_call = requests.get(self.server_meta["jwks_uri"])
jwks_call.raise_for_status()
self.jwks = jwks_call.json()
@@ -116,7 +126,7 @@ class EsiAccess:
@property
def client_id(self):
return self.settings.get('clientID') or config.API_CLIENT_ID
return self.settings.get('clientID') or self.server_base.client_id
@staticmethod
def update_token(char, tokenResponse):
@@ -142,16 +152,25 @@ class EsiAccess:
'state': self.state
}
args = {
'response_type': 'code',
'redirect_uri': config.SSO_CALLBACK,
'client_id': self.client_id,
'scope': ' '.join(scopes),
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
'state': base64.b64encode(bytes(json.dumps(state_arg), 'utf-8'))
}
if(self.server_name=="Serenity"):
args = {
'response_type': 'code',
'redirect_uri': self.server_base.callback,
'client_id': self.client_id,
'scope': ' '.join(scopes),
'state': 'hilltech',
'device_id': 'eims'
}
else:
args = {
'response_type': 'code',
'redirect_uri': self.server_base.callback,
'client_id': self.client_id,
'scope': ' '.join(scopes),
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
'state': base64.b64encode(bytes(json.dumps(state_arg), 'utf-8'))
}
return '%s?%s' % (
self.oauth_authorize,
urlencode(args)
@@ -252,6 +271,11 @@ class EsiAccess:
"https://login.eveonline.com: {}".format(str(e)))
def _before_request(self, ssoChar):
if ssoChar:
self.init(config.supported_servers[ssoChar.server])
else:
self.init(self.default_server_base)
self._session.headers.clear()
self._session.headers.update(self._basicHeaders)
if ssoChar is None:
@@ -280,17 +304,17 @@ class EsiAccess:
def get(self, ssoChar, endpoint, **kwargs):
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.get("{}{}".format(self.esi_url, endpoint)))
return self._after_request(self._session.get("{}{}?datasource={}".format(self.esi_url, endpoint, self.server_name.lower())))
def post(self, ssoChar, endpoint, json, **kwargs):
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.post("{}{}".format(self.esi_url, endpoint), data=json))
return self._after_request(self._session.post("{}{}?datasource={}".format(self.esi_url, endpoint, self.server_name.lower()), data=json))
def delete(self, ssoChar, endpoint, **kwargs):
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.delete("{}{}".format(self.esi_url, endpoint)))
return self._after_request(self._session.delete("{}{}?datasource={}".format(self.esi_url, endpoint, self.server_name.lower())))
# todo: move these off to another class which extends this one. This class should only handle the low level
# authentication and

View File

@@ -392,6 +392,9 @@ class EsiSettings:
def set(self, type, value):
self.settings[type] = value
def keys(self):
return config.supported_servers.keys()
class StatViewSettings:
_instance = None