Source code for helios.core.session

"""Manager for the authorization token required to access the Helios API."""
import json
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path

import requests

import helios

logger = logging.getLogger(__name__)

_DEFAULT_API_URL = 'https://api.helios.earth/v1'


[docs]class Authentication: """ Authentication API. Args: client_id (str): API key ID. client_secret (str): API key secret. api_url (str): Override the default API URL. """ def __init__(self, client_id, client_secret, api_url=None): self.client_id = client_id self.client_secret = client_secret self.api_url = api_url or _DEFAULT_API_URL # To be safe. self.api_url = self.api_url.rstrip('/')
[docs] def get_access_token(self): """ Gets a new access token. Returns: Token: Helios API access token. """ logger.info('Acquiring a new token.') token_url = self.api_url + '/oauth/token' data = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret, } try: resp = requests.post(token_url, json=data) resp.raise_for_status() except Exception: logger.exception('Failed to acquire token.') raise json_resp = resp.json() logger.info('Successfully acquired a new token.') return Token(**json_resp)
[docs] def get_current_user(self, token): """ Return a compact list of current token attributes, such as the token expiration date. Args: token (Token): A Helios Token. Returns: dict: Token attributes. Raises: ValueError: If current token is invalid or expired. """ url = self.api_url + '/session' try: resp = requests.get(url, headers=token.auth_header) resp.raise_for_status() except Exception: logger.exception('Failed to get token expiration time.') raise json_resp = resp.json() if json_resp['name'] is None: _msg = 'Token appears to be invalid or expired.' logger.error(_msg) raise ValueError(_msg) return json_resp
[docs]class Token: """ Helios token. Args: access_token (str): Access token. expires_in (int): Seconds until expiration. WARNING: this value can be misleading. If reading from a token file the value will not be current. kwargs: Any other optional token attributes. """ def __init__(self, access_token, expires_in=None, **kwargs): self.access_token = access_token self._expires_in = expires_in for k, v in kwargs.items(): setattr(self, k, v) @property def auth_header(self): """Authentication header for API requests.""" try: return {'Authorization': 'Bearer ' + self.access_token} except KeyError: logger.error('A token must be acquired to establish an auth header.') raise @property def expired(self): """True if token is expired, False otherwise.""" now = datetime.now() if now >= self.expiration: return True else: return False @property def expiration(self): """The expiration time.""" now = datetime.now() delta = timedelta(seconds=self._expires_in) return now + delta
[docs] def to_json(self): """ Serializes token to JSON. Returns: str: JSON serialized string. """ output_dict = { k: v for k, v in vars(self).items() if not k.startswith(('_', '__')) } return json.dumps(output_dict, skipkeys=True)
[docs]class HeliosSession: """ Manages configuration and authentication details for the Helios APIs. The session will handle acquiring and verifying tokens as well as various developer options. Args: client_id (str, optional): API key ID. client_secret (str, optional): API key secret. api_url (str, optional): Override the default API URL. base_directory (str, optional): Override the base directory for storage of tokens and credentials.json file. max_threads (int, optional): The maximum number of threads allowed for batch calls. token_expiration_threshold (int, optional): The number of minutes to allow before token expiration. E.g. if the token will expire in 29 minutes and the threshold is set to 30, a new token will be acquired because expiration time is below the threshold. ssl_verify (bool, optional): Use SSL verification for all HTTP requests. Defaults to True. """ def __init__( self, client_id=None, client_secret=None, api_url=None, base_directory=None, max_threads=64, token_expiration_threshold=60, ssl_verify=True, ): self.client_id = client_id self.client_secret = client_secret self.api_url = api_url self.max_threads = max_threads self.ssl_verify = ssl_verify self._token_expiration_threshold = token_expiration_threshold self._base_directory = base_directory # Handle optional parameter settings. if self._base_directory is None: self._base_directory = Path.home() / '.helios' else: self._base_directory = Path(self._base_directory) if self.api_url is None: self.api_url = _DEFAULT_API_URL else: self.api_url = self.api_url.rstrip('/') if None in (self.client_id, self.client_secret): self._get_credentials() # Create token filename based on authentication ID. self._token_dir = self._base_directory / '.tokens' self._token_file = self._token_dir / (self.client_id + '.helios_token') # Create an instances of the auth API. self.auth_api = Authentication( self.client_id, self.client_secret, api_url=self.api_url ) # Establish self.token. try: self.read_token() except (IOError, OSError): logger.warning( 'Could not read token file (%s). A new token will be acquired.', self._token_file, ) self.get_new_token() except (ValueError, KeyError, TypeError): logger.warning( 'Token was invalid or expired. A new token will be ' 'acquired.' ) self.get_new_token() @property def auth_header(self): """Authentication header for API requests.""" self.verify_token() return self.token.auth_header def _get_credentials(self): """Handles the various methods for referencing API credentials.""" credentials_file = self._base_directory / 'credentials.json' if 'helios_client_id' in os.environ and 'helios_client_secret' in os.environ: logger.info('Credentials found in environment variables.') data = os.environ elif credentials_file.exists(): logger.info('Credentials found in file: %s', str(credentials_file)) with open(credentials_file, mode='r', encoding='utf-8') as f: data = json.load(f) else: raise Exception( 'No credentials could be found. Be sure to ' 'set environment variables or create a ' 'credentials file.' ) self.client_id = data['helios_client_id'] self.client_secret = data['helios_client_secret'] if self.api_url is None: self.api_url = data.get('helios_api_url')
[docs] def read_token(self): """Reads token from file.""" logger.info('Reading token from file: %s', self._token_file) with open(self._token_file, mode='r', encoding='utf-8') as f: contents = json.loads(f.read()) token = Token(**contents) # Must get the real expires_in value when reading from an old file. real_expiration = self.auth_api.get_current_user(token)['expires_in'] token._expires_in = real_expiration self.token = token
[docs] def write_token(self): """Writes token to file.""" logger.info('Writing token to file: %s', self._token_file) if not self._token_file.parent.exists(): self._token_file.parent.mkdir(parents=True) try: with open(self._token_file, mode='w', encoding='utf-8') as f: f.write(self.token.to_json()) except Exception: # Prevent a bad token file from persisting after an exception. if self._token_file.exists(): self._token_file.unlink() raise
[docs] def verify_token(self): """ Verifies that token has not expired. If the token has expired a new token will be acquired. """ if self.token.expired: logger.info('Token has expired.') self.get_new_token()
[docs] def get_new_token(self): """Gets a new token for the current Helios session.""" logger.info('Getting a new token.') self.token = self.auth_api.get_access_token() self.write_token()
[docs] def client(self, name): """ Gets a core API instance using the current HeliosSession. Args: name (str): Name of API. Returns: Core API instance. """ self.verify_token() return helios.__apis__[name.lower()](session=self)