"""Manager for the authorization token required to access the Helios API."""
import json
import logging
import os
import tempfile
import requests
from helios import CONFIG
logger = logging.getLogger(__name__)
[docs]class Session(object):
"""Manages API tokens for authentication.
Authentication credentials can be specified using the env input parameter,
environment variables, or a credentials.json file in your ~/.helios
directory. See the official documentation for more authentication
information.
Required keys:
- helios_client_id: Client ID from API key pair.
- helios_client_secret: Client Secret ID from API key pair.
Optional keys:
- helios_api_url: Optional, URL for API endpoint.
A session can be established and reused for multiple core API instances.
.. code-block:: python
import helios
sess = helios.Session()
alerts = helios.Alerts(session=sess)
cameras = helios.Cameras(session=sess)
If a session is not specified before hand, one will be initialized
automatically. This is less efficient because each core API instance
will try to initialize a session.
.. code-block:: python
import helios
alerts = helios.Alerts()
cameras = helios.Cameras()
"""
ssl_verify = CONFIG['requests']['ssl_verify']
token_expiration_threshold = CONFIG['session']['token_expiration_threshold']
_default_api_url = r'https://api.helios.earth/v1'
_default_base_dir = os.path.join(os.path.expanduser('~'), '.helios')
def __init__(self, env=None):
"""Initialize Helios Session.
Args:
env (dict): Dictionary containing 'helios_client_id',
'helios_client_secret', and optionally 'helios_api_url'.
This will override any information in credentials.json and
environment variables.
"""
# Establish necessary files/directories.
self._base_dir = None
self._credentials_file = None
self._token_dir = None
self._verify_directories()
# The token will be established with a call to the start_session method.
self.token = None
# Use custom credentials.
if env is not None:
logger.info('Using custom env for session.')
data = env
# Read credentials from environment.
elif 'helios_client_id' in os.environ and 'helios_client_secret' in os.environ:
logger.info('Using environment variables for session.')
data = os.environ
# Read credentials from file.
elif os.path.exists(self._credentials_file):
logger.info('Using credentials file for session.')
with open(self._credentials_file, 'r') as auth_file:
data = json.load(auth_file)
else:
raise Exception('No credentials could be found. Be sure to '
'set environment variables or create a '
'credentials file.')
# Extract relevant authentication information from data.
self._key_id = data['helios_client_id']
self._key_secret = data['helios_client_secret']
self.api_url = data.get('helios_api_url') or self._default_api_url
self.api_url = self.api_url.rstrip('/')
# Create token filename based on authentication ID.
self._token_file = os.path.join(self._token_dir,
self._key_id + '.helios_token')
# Finally, start the session.
self.start_session()
def _delete_token(self):
"""Deletes token file."""
os.remove(self._token_file)
def _get_token(self):
"""
Gets a fresh token.
The token will be acquired and then written to file for reuse. If the
request fails over https, http will be used as a fallback.
"""
logger.info('Acquiring a new token.')
token_url = self.api_url + '/oauth/token'
data = {'grant_type': 'client_credentials'}
auth = (self._key_id, self._key_secret)
try:
resp = requests.post(token_url, data=data, auth=auth,
verify=self.ssl_verify)
resp.raise_for_status()
except requests.exceptions.HTTPError:
logger.warning('Getting token over https failed. Falling back to http.',
exc_info=True)
resp = requests.post(token_url.replace('https', 'http'), data=data,
auth=auth, verify=self.ssl_verify)
# If the token cannot be acquired raise exception.
try:
resp.raise_for_status()
except requests.exceptions.RequestException:
logger.exception('Failed to acquire token.')
raise
token_request = resp.json()
self.token = {'name': 'Authorization',
'value': 'Bearer ' + token_request['access_token']}
logger.info('Successfully acquired new token.')
def _read_token_file(self):
"""Reads token from file."""
with open(self._token_file, 'r') as token_file:
self.token = json.load(token_file)
def _verify_directories(self):
"""Verifies essential directories."""
# Explicitly check for write capability.
# Provides a fall back to the default temp directory.
base_dir = self._default_base_dir
write_test_file = os.path.join(base_dir, 'temp_file.tmp')
try:
if os.path.exists(base_dir):
with open(write_test_file, 'w+') as _:
pass
else:
os.makedirs(base_dir)
except (IOError, OSError):
base_dir = tempfile.gettempdir()
logger.warning('Could not write to %s. Falling back to %s',
self._default_base_dir, base_dir)
finally:
if os.path.exists(write_test_file):
os.remove(write_test_file)
# Establish paths.
self._base_dir = base_dir
self._token_dir = os.path.join(self._base_dir, '.tokens')
self._credentials_file = os.path.join(self._base_dir, 'credentials.json')
if not os.path.exists(self._base_dir):
os.makedirs(self._base_dir)
if not os.path.exists(self._token_dir):
os.makedirs(self._token_dir)
def _write_token_file(self):
"""Writes token to file."""
try:
with open(self._token_file, 'w+') as token_file:
json.dump(self.token, token_file)
except Exception:
# Prevent a bad token file from persisting after an exception.
if os.path.exists(self._token_file):
os.remove(self._token_file)
raise
[docs] def start_session(self):
"""
Begins Helios session.
This will establish and verify a token for the session. If a token
file exists the token will be read and verified. If the token file
doesn't exist or the token has expired then a new token will be
acquired.
"""
try:
self._read_token_file()
except (IOError, OSError):
logger.warning('Could not read token (%s). A new token will be acquired.',
self._token_file)
self._get_token()
else:
if not self.verify_token():
self._get_token()
self._write_token_file()
[docs] def verify_token(self):
"""
Verifies the token.
If the token is bad or if the expiration time is less than the
threshold False will be returned.
Returns:
bool: True if current token is valid, False otherwise.
"""
resp = requests.get(self.api_url + '/session',
headers={self.token['name']: self.token['value']},
verify=self.ssl_verify)
# If the token cannot be verified raise exception.
try:
resp.raise_for_status()
except requests.exceptions.RequestException:
logger.exception('Failed to verify token.')
raise
json_resp = resp.json()
if not json_resp['name'] or not json_resp['expires_in']:
return False
# Check token expiration time.
expiration = json_resp['expires_in'] / 60.0
if expiration < self.token_expiration_threshold:
logger.warning('Token expiration (%d) is less than the threshold (%d).',
expiration,
self.token_expiration_threshold)
return False
logger.info('Token is valid for %d minutes.', expiration)
return True