"""
Helios Observations API.
Methods are meant to represent the core functionality in the developer
documentation. Some may have additional functionality for convenience.
"""
import logging
import os
from collections import namedtuple, defaultdict
from io import BytesIO
from queue import Queue
import requests
from PIL import Image
from helios.core.mixins import SDKCore, IndexMixin, ShowMixin
from helios.core.structure import ImageRecord
from helios.utilities import logging_utils, parsing_utils
logger = logging.getLogger(__name__)
[docs]class Observations(ShowMixin, IndexMixin, SDKCore):
"""
The Observations API provides ground-truth data generated by the Helios
analytics.
"""
_core_api = 'observations'
def __init__(self, session=None):
"""
Initialize Observations instance.
Args:
session (helios.HeliosSession): An instance of the
Session. Defaults to None. If unused a session will be
created for you.
"""
super().__init__(session=session)
[docs] def index(self, **kwargs):
"""
Get observations matching the provided spatial, text, or
metadata filters.
The maximum skip value is 4000. If this is reached, truncated results
will be returned. You will need to refine your query to avoid this.
.. _observations_index_documentation: https://helios.earth/developers/api/observations/#index
Args:
**kwargs: Any keyword arguments found in the
observations_index_documentation_.
Returns:
tuple: A tuple containing:
feature_collection (:class:`ObservationsFeatureCollection <helios.observations_api.ObservationsFeatureCollection>`):
Observations feature collection.
failed (list of :class:`Record <helios.core.structure.Record>`):
Failed API call records.
"""
succeeded, failed = super().index(**kwargs)
content = []
for record in succeeded:
for feature in record.content['features']:
content.append(ObservationsFeature(feature))
return ObservationsFeatureCollection(content), failed
[docs] @logging_utils.log_entrance_exit
def preview(self, observation_ids, out_dir=None, return_image_data=False):
"""
Get preview images from observations.
Args:
observation_ids (str or list of strs): list of observation IDs.
out_dir (optional, str): Directory to write images to. Defaults to
None.
return_image_data (optional, bool): If True images will be
available as PIL images in the returned ImageRecords.
Defaults to False.
Returns:
tuple: A tuple containing:
images (list of :class:`ImageRecord <helios.core.structure.ImageRecord>`):
All received images.
failed (list of :class:`ImageRecord <helios.core.structure.ImageRecord>`):
Failed API calls.
"""
# Make sure directory exists.
if out_dir is not None:
if not os.path.exists(out_dir):
os.makedirs(out_dir)
succeeded, failed = self._batch_process(
self._preview_worker,
observation_ids,
out_dir=out_dir,
return_image_data=return_image_data,
)
return succeeded, failed
def _preview_worker(
self,
observation_id,
out_dir=None,
return_image_data=None,
_session=None,
_success_queue=None,
_failure_queue=None,
):
"""
Handles preview calls.
Args:
observation_id (str): Observation ID.
out_dir (str, optional): Optionally write data to a directory.
return_image_data (bool, optional): Optionally load image data
into PIL and include in returned data.
_session (requests.Session): Session instance.
_success_queue (Queue): Queue for successful calls.
_failure_queue (Queue): Queue for unsuccessful calls.
"""
call_params = locals()
url = '{}/{}/{}/preview'.format(
self._base_api_url, self._core_api, observation_id
)
try:
resp = _session.get(url, verify=self._ssl_verify)
resp.raise_for_status()
except Exception as e:
logger.exception('Failed to GET %s', url)
_failure_queue.put(ImageRecord(url=url, parameters=call_params, error=e))
return
image_content = resp.content
# Parse key from url.
parsed_url = parsing_utils.parse_url(str(resp.url))
_, image_name = os.path.split(parsed_url.path)
# Write image to file.
if out_dir:
out_file = os.path.join(out_dir, image_name)
with open(out_file, 'wb') as f:
f.write(image_content)
else:
out_file = None
# Read and return image data.
if return_image_data:
# Read image from response.
try:
img_data = Image.open(BytesIO(image_content))
except Exception as e:
_failure_queue.put(
ImageRecord(url=url, parameters=call_params, error=e)
)
return
else:
img_data = None
_success_queue.put(
ImageRecord(
url=url,
parameters=call_params,
name=image_name,
content=img_data,
output_file=out_file,
)
)
[docs] def show(self, observation_ids):
"""
Get attributes for observations.
Args:
observation_ids (str or list of strs): Helios observation ID(s).
Returns:
tuple: A tuple containing:
feature_collection (:class:`ObservationsFeatureCollection <helios.observations_api.ObservationsFeatureCollection>`):
Observations feature collection.
failed (list of :class:`Record <helios.core.structure.Record>`):
Failed API call records.
"""
succeeded, failed = super().show(observation_ids)
content = []
for record in succeeded:
content.append(ObservationsFeature(record.content))
return ObservationsFeatureCollection(content), failed
[docs]class ObservationsFeature:
"""
Individual Observation GeoJSON feature.
Attributes:
city (str): 'city' value for the feature.
country (str): 'country' value for the feature.
description (str): 'description' value for the feature.
id (str): 'id' value for the feature.
json (dict): Raw JSON feature.
prev_id (str): 'prev_id' value for the feature.
region (str): 'region' value for the feature.
sensors (dict): 'sensors' value for the feature.
state (str): 'state' value for the feature.
time (str): 'time' value for the feature.
"""
def __init__(self, feature):
self.json = feature
@property
def city(self):
return self.json['properties'].get('city')
@property
def country(self):
return self.json['properties'].get('country')
@property
def description(self):
return self.json['properties'].get('description')
@property
def id(self):
return self.json.get('id')
@property
def prev_id(self):
return self.json['properties'].get('prev_id')
@property
def region(self):
return self.json['properties'].get('region')
@property
def sensors(self):
return self.json['properties'].get('sensors')
@property
def state(self):
return self.json['properties'].get('state')
@property
def time(self):
return self.json['properties'].get('time')
[docs]class ObservationsFeatureCollection:
"""
Collection of GeoJSON features obtained via the Observations API.
Convenience properties are available to extract values from every feature.
Attributes:
features (list of :class:`ObservationsFeature <helios.core.structure.ObservationsFeature>`):
All features returned from a query.
"""
def __init__(self, features):
self.features = features
@property
def city(self):
"""'city' values for every feature."""
return [x.city for x in self.features]
@property
def country(self):
"""'country' values for every feature."""
return [x.country for x in self.features]
@property
def description(self):
"""'description' values for every feature."""
return [x.description for x in self.features]
@property
def id(self):
"""'id' values for every feature."""
return [x.id for x in self.features]
@property
def json(self):
"""Raw 'json' for every feature."""
return [x.json for x in self.features]
@property
def prev_id(self):
"""'prev_id' values for every feature."""
return [x.prev_id for x in self.features]
@property
def region(self):
"""'region' values for every feature."""
return [x.region for x in self.features]
@property
def sensors(self):
"""'sensors' values for every feature."""
return [x.sensors for x in self.features]
@property
def state(self):
"""'state' values for every feature."""
return [x.state for x in self.features]
@property
def time(self):
"""'time' values for every feature."""
return [x.time for x in self.features]
@property
def observations(self):
"""
Observation data from the sensor block of each feature.
Data will be returned as a dictionary with a key for each sensor.
Observation data for each sensor is a named tuple ease-of-use.
Each named tuple contains the sensor, time, data, prev, id, and prev_id.
"""
Observation = namedtuple(
'Observation', ['sensor', 'time', 'data', 'prev', 'id', 'prev_id']
)
data = defaultdict(list)
for feature in self.features:
for sensor, sensor_data in feature.sensors.items():
data[sensor].append(
Observation(
sensor,
feature.time,
sensor_data.get('data', -1),
sensor_data.get('prev', -1),
feature.id,
feature.prev_id,
)
)
return dict(data)