Files
pyfa/service/esiAccess.py

307 lines
11 KiB
Python

# noinspection PyPackageRequirements
from collections import namedtuple
from logbook import Logger
import uuid
import time
import config
import base64
import secrets
import hashlib
import json
from jose import jwt
from jose.exceptions import ExpiredSignatureError, JWTError, JWTClaimsError
import os
import datetime
from service.const import EsiSsoMode, EsiEndpoints
from service.settings import EsiSettings, NetworkSettings
from datetime import timedelta
from requests_cache import CachedSession
from requests import Session
from urllib.parse import urlencode, quote
pyfalog = Logger(__name__)
class SSOError(Exception):
pass
scopes = [
'esi-skills.read_skills.v1',
'esi-fittings.read_fittings.v1',
'esi-fittings.write_fittings.v1'
]
ApiBase = namedtuple('ApiBase', ['sso', 'esi'])
supported_servers = {
"Tranquility": ApiBase("login.eveonline.com", "esi.evetech.net"),
"Singularity": ApiBase("sisilogin.testeveonline.com", "esi.evetech.net"),
"Serenity": ApiBase("login.evepc.163.com", "esi.evepc.163.com")
}
class APIException(Exception):
""" Exception for SSO related errors """
def __init__(self, url, code, json_response):
self.url = url
self.status_code = code
self.response = json_response
super(APIException, self).__init__(str(self))
def __str__(self):
if 'error' in self.response:
return 'HTTP Error %s: %s' % (self.status_code,
self.response['error'])
elif 'message' in self.response:
return 'HTTP Error %s: %s' % (self.status_code,
self.response['message'])
return 'HTTP Error %s' % self.status_code
class EsiAccess:
def __init__(self):
self.settings = EsiSettings.getInstance()
self.server_base: ApiBase = supported_servers[self.settings.get("server")]
# session request stuff
self._session = Session()
self._basicHeaders = {
'Accept': 'application/json',
'User-Agent': (
'pyfa v{}'.format(config.version)
)
}
self._session.headers.update(self._basicHeaders)
self._session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
# Set up cached session. This is only used for SSO meta data for now, but can be expanded to actually handle
# various ESI caching (using ETag, for example) in the future
cached_session = CachedSession(
os.path.join(config.savePath, config.ESI_CACHE),
backend="sqlite",
cache_control=True, # Use Cache-Control headers for expiration, if available
expire_after=timedelta(days=1), # Otherwise expire responses after one day
stale_if_error=True, # In case of request errors, use stale cache data if possible
)
cached_session.headers.update(self._basicHeaders)
cached_session.proxies = NetworkSettings.getInstance().getProxySettingsInRequestsFormat()
meta_call = cached_session.get("https://%s/.well-known/oauth-authorization-server" % self.server_base.sso)
meta_call.raise_for_status()
self.server_meta = meta_call.json()
jwks_call = cached_session.get(self.server_meta["jwks_uri"])
jwks_call.raise_for_status()
self.jwks = jwks_call.json()
@property
def sso_url(self):
return 'https://%s/v2' % self.server_base.sso
@property
def esi_url(self):
return 'https://%s' % self.server_base.esi
@property
def oauth_authorize(self):
return self.server_meta["authorization_endpoint"]
@property
def oauth_token(self):
return self.server_meta["token_endpoint"]
@property
def client_id(self):
return self.settings.get('clientID') or config.API_CLIENT_ID
@staticmethod
def update_token(char, tokenResponse):
""" helper function to update token data from SSO response """
char.accessToken = tokenResponse['access_token']
char.accessTokenExpires = datetime.datetime.fromtimestamp(time.time() + tokenResponse['expires_in'])
if 'refresh_token' in tokenResponse:
char.refreshToken = config.cipher.encrypt(tokenResponse['refresh_token'].encode())
def get_login_uri(self, redirect=None):
self.state = str(uuid.uuid4())
# Generate the PKCE code challenge
self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32))
m = hashlib.sha256()
m.update(self.code_verifier)
d = m.digest()
code_challenge = base64.urlsafe_b64encode(d).decode().replace("=", "")
state_arg = {
'mode': self.settings.get('loginMode'),
'redirect': redirect,
'state': self.state
}
args = {
'response_type': 'code',
'redirect_uri': 'http://127.0.0.1:5500/callback.html',
'client_id': self.client_id,
'scope': ' '.join(scopes),
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
'state': base64.b64encode(bytes(json.dumps(state_arg), 'utf-8'))
}
return '%s?%s' % (
self.oauth_authorize,
urlencode(args)
)
def get_oauth_header(self, token):
""" Return the Bearer Authorization header required in oauth calls
:return: a dict with the authorization header
"""
return {'Authorization': 'Bearer %s' % token}
def auth(self, code):
# todo: properly handle invalid auth code, or one that has been used already
values = {
'grant_type': 'authorization_code',
'code': code,
'client_id': self.client_id,
"code_verifier": self.code_verifier
}
res = self.token_call(values)
json_res = res.json()
decoded_jwt = self.validate_eve_jwt(json_res['access_token'])
return json_res, decoded_jwt
def refresh(self, ssoChar):
# todo: properly handle invalid refresh token
values = {
"grant_type": "refresh_token",
"refresh_token": config.cipher.decrypt(ssoChar.refreshToken).decode(),
"client_id": self.client_id,
}
res = self.token_call(values)
json_res = res.json()
self.update_token(ssoChar, json_res)
return json_res
def token_call(self, values):
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Host": self.server_base.sso,
}
res = self._session.post(
self.server_meta["token_endpoint"],
data=values,
headers=headers,
)
if res.status_code != 200:
raise APIException(
self.server_meta["token_endpoint"],
res.status_code,
res.json()
)
return res
def validate_eve_jwt(self, jwt_token):
"""Validate a JWT token retrieved from the EVE SSO.
Args:
jwt_token: A JWT token originating from the EVE SSO
Returns
dict: The contents of the validated JWT token if there are no
validation errors
"""
try:
jwk_sets = self.jwks["keys"]
except KeyError as e:
raise SSOError("Something went wrong when retrieving the JWK set. The returned "
"payload did not have the expected key {}. \nPayload returned "
"from the SSO looks like: {}".format(e, self.jwks))
jwk_set = next((item for item in jwk_sets if item["alg"] == "RS256"))
try:
return jwt.decode(
jwt_token,
jwk_set,
algorithms=jwk_set["alg"],
issuer=[self.server_base.sso, "https://%s" % self.server_base.sso]
)
except ExpiredSignatureError as e:
raise SSOError("The JWT token has expired: {}").format(str(e))
except JWTError as e:
raise SSOError("The JWT signature was invalid: {}").format(str(e))
except JWTClaimsError as e:
raise SSOError("The issuer claim was not from login.eveonline.com or "
"https://login.eveonline.com: {}".format(str(e)))
def _before_request(self, ssoChar):
self._session.headers.clear()
self._session.headers.update(self._basicHeaders)
if ssoChar is None:
return
if ssoChar.is_token_expired():
pyfalog.info("Refreshing token for {}".format(ssoChar.characterName))
self.refresh(ssoChar)
if ssoChar.accessToken is not None:
self._session.headers.update(self.get_oauth_header(ssoChar.accessToken))
def _after_request(self, resp):
if "warning" in resp.headers:
pyfalog.warn("{} - {}".format(resp.headers["warning"], resp.url))
if resp.status_code >= 400:
raise APIException(
resp.url,
resp.status_code,
resp.json()
)
return resp
def get(self, ssoChar, endpoint, **kwargs):
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.get("{}{}".format(self.esi_url, endpoint)))
def post(self, ssoChar, endpoint, json, **kwargs):
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.post("{}{}".format(self.esi_url, endpoint), data=json))
def delete(self, ssoChar, endpoint, **kwargs):
self._before_request(ssoChar)
endpoint = endpoint.format(**kwargs)
return self._after_request(self._session.delete("{}{}".format(self.esi_url, endpoint)))
# todo: move these off to another class which extends this one. This class should only handle the low level
# authentication and
def getDynamicItem(self, typeID, itemID):
return self.get(None, EsiEndpoints.DYNAMIC_ITEM.value, type_id=typeID, item_id=itemID)
def getSkills(self, char):
return self.get(char, EsiEndpoints.CHAR_SKILLS.value, character_id=char.characterID)
def getSecStatus(self, char):
return self.get(char, EsiEndpoints.CHAR.value, character_id=char.characterID)
def getFittings(self, char):
return self.get(char, EsiEndpoints.CHAR_FITTINGS.value, character_id=char.characterID)
def postFitting(self, char, json_str):
# @todo: new fitting ID can be recovered from resp.data,
return self.post(char, EsiEndpoints.CHAR_FITTINGS.value, json_str, character_id=char.characterID)
def delFitting(self, char, fittingID):
return self.delete(char, EsiEndpoints.CHAR_DEL_FIT.value, character_id=char.characterID, fitting_id=fittingID)