Source code for helios.core.session

"""Manager for the authorization token required to access the Helios API."""
import json
import logging
import os
import warnings

import requests

# Python 2 compatibility.
try:
    FileNotFoundError
except NameError:
    FileNotFoundError = IOError


[docs]class Session(object): """Manages API tokens for authentication. Authentication credentials can be specified using the env input parameter, environment variables, or a credentials.json file in your ~/.helios directory. See the official documentation for more authentication information. Required keys: - helios_client_id: Client ID from API key pair. - helios_client_secret: Client Secret ID from API key pair. Optional keys: - helios_api_url: Optional, URL for API endpoint. A session can be established and reused for multiple core API instances. .. code-block:: python import helios sess = helios.Session() alerts = helios.Alerts(session=sess) cameras = helios.Cameras(session=sess) If a session is not specified before hand, one will be initialized automatically. This is less efficient because each core API instance will try to initialize a session. .. code-block:: python import helios alerts = helios.Alerts() cameras = helios.Cameras() """ token_expiration_threshold = 60 # minutes _base_dir = os.path.join(os.path.expanduser('~'), '.helios') _token_dir = os.path.join(_base_dir, '.tokens') _credentials_file = os.path.join(_base_dir, 'credentials.json') _default_api_url = r'https://api.helios.earth/v1/' def __init__(self, env=None): """Initialize Helios Session. Args: env (dict): Dictionary containing 'helios_client_id', 'helios_client_secret', and optionally 'helios_api_url'. This will override any information in credentials.json and environment variables. """ # Initialize logger self._logger = logging.getLogger(__name__) # The token will be established with a call to the start_session method. self.token = None # Verify essential directories exist. self._verify_directories() # Use custom credentials. if env is not None: self._logger.info('Using custom env for session.') data = env # Read credentials from environment. elif 'helios_client_id' in os.environ and 'helios_client_secret' in os.environ: self._logger.info('Using environment variables for session.') data = os.environ.copy() # Read credentials from file. elif os.path.exists(self._credentials_file): self._logger.info('Using credentials file for session.') with open(self._credentials_file, 'r') as auth_file: data = json.load(auth_file) else: data = self._deprecation_check() if data is None: raise Exception('No credentials could be found. Be sure to ' 'set environment variables or setup a ' 'credentials file.') else: message = 'Deprecated auth file was found. ' \ 'Please migrate .helios_auth to ' \ '~/.helios/credentials.json. Refer to the ' \ 'authentication section of the documentation for ' \ 'more information.' warnings.warn(message, DeprecationWarning) self._logger.warning('DeprecationWarning: ' + message) # Extract relevant authentication information from data. self._key_id = data['helios_client_id'] self._key_secret = data['helios_client_secret'] try: if data['helios_api_url'] is not None: self.api_url = data['helios_api_url'] else: self.api_url = self._default_api_url except KeyError: self.api_url = self._default_api_url # Create token filename based on authentication ID. self._token_file = os.path.join(self._token_dir, self._key_id + '.helios_token') # Finally, start the session. self.start_session() def _deprecation_check(self): """Temporary method to check for old auth files.""" old_auth_file = os.path.join(os.path.expanduser('~'), '.helios_auth') data = None # Read credentials from environment. if 'HELIOS_KEY_ID' in os.environ and 'HELIOS_KEY_SECRET' in os.environ: self._logger.info('Using environment variables for session.') data = {'helios_client_id': os.environ['HELIOS_KEY_ID'], 'helios_client_secret': os.environ['HELIOS_KEY_SECRET'], 'helios_api_url': os.environ.get('HELIOS_API_URL')} # Read credentials from file. elif os.path.exists(old_auth_file): self._logger.info('Using credentials file for session.') with open(old_auth_file, 'r') as auth_file: temp = json.load(auth_file) data = {'helios_client_id': temp['HELIOS_KEY_ID'], 'helios_client_secret': temp['HELIOS_KEY_SECRET'], 'helios_api_url': temp.get('HELIOS_API_URL')} return data def _delete_token(self): """Delete token file.""" try: os.remove(self._token_file) except (WindowsError, FileNotFoundError): pass def _get_token(self): """ Gets a fresh token. The token will be acquired and then written to file for reuse. If the request fails over https, http will be used as a fallback. """ token_url = self.api_url + '/oauth/token' try: data = {'grant_type': 'client_credentials'} auth = (self._key_id, self._key_secret) resp = requests.post(token_url, data=data, auth=auth, verify=True) resp.raise_for_status() except requests.exceptions.HTTPError: self._logger.warning('Getting token over https failed. Falling ' 'back to http.') token_url_http = 'http' + token_url.split('https')[1] data = {'grant_type': 'client_credentials'} auth = (self._key_id, self._key_secret) resp = requests.post(token_url_http, data=data, auth=auth, verify=True) # If the token cannot be acquired, raise exception. resp.raise_for_status() token_request = resp.json() self.token = {'name': 'Authorization', 'value': 'Bearer ' + token_request['access_token']} self._write_token_file() def _read_token_file(self): """Read token from file.""" with open(self._token_file, 'r') as token_file: self.token = json.load(token_file) def _verify_directories(self): """Verify essential directories.""" if not os.path.exists(self._base_dir): os.makedirs(self._base_dir) if not os.path.exists(self._token_dir): os.makedirs(self._token_dir) def _write_token_file(self): """Write token to file.""" with open(self._token_file, 'w+') as token_file: json.dump(self.token, token_file)
[docs] def start_session(self): """ Begin Helios session. This will establish and verify a token for the session. If a token file exists the token will be read and verified. If the token file doesn't exist or the token has expired then a new token will be acquired. """ try: self._read_token_file() if not self.verify_token(): self._get_token() except (IOError, FileNotFoundError): self._logger.warning('Token file was not found. A new token will ' 'be acquired.') self._get_token()
[docs] def verify_token(self): """ Verifies if the current token is still valid. If the token is bad or if the expiration time is less than the threshold False will be returned. Returns: bool: True if current token is valid, False otherwise. """ resp = requests.get(self.api_url + '/session', headers={self.token['name']: self.token['value']}, verify=True) resp.raise_for_status() json_resp = resp.json() if not json_resp['name'] or not json_resp['expires_in']: return False # Check token expiration time. expiration = json_resp['expires_in'] / 60.0 if expiration < self.token_expiration_threshold: self._logger.warning('Token is valid, but expires in %s minutes.', expiration) return False self._logger.info('Token is valid for %d minutes.', expiration) return True