"""The ``config`` module holds the :class:`Config <geogen.config.Config>` class and related default variables.
"""
import spiceypy as spice
import json
import os
from loguru import logger
DEFAULT_DATA_DIR = 'data'
"""Default output data directory is the data directory relative to from where GEOGEN is executed.
"""
KERNELS_PATH_SYMBOL = '$ESA_SPICE_KERNELS'
"""Symbol used in the Configuration file to specify the path of a mission metakernel file relative to path of the
ESA's SPICE Kernels Datasets (SKDs) top-level directory, which specified by the ``ESA_SPICE_KERNELS`` environment
variable.
"""
PDS_NAIF_MAPPING = {
-41: ['MEX', 'MARS EXPRESS', 'MARS-EXPRESS', 'MARS_EXPRESS'],
-248: ['VEX', 'VENUS EXPRESS', 'VENUS-EXPRESS', 'VENUS_EXPRESS'],
-226: ['ROSETTA', 'ROS', 'ROSETTA ORBITER'],
-226800: ['RL', 'ROS_LANDER', 'PHILAE'],
-143: ['TGO', 'TRACE GAS ORBITER', 'EXOMARS TGO', 'EXOMARS 2016 TGO'],
-238: ['S1', 'SM1', 'SMART1', 'SMART-1'],
-121: ['MPO', 'BEPICOLOMBO MPO', 'MERCURY PLANETARY ORBITER'],
-652: ['MTM', 'BEPICOLOMBO MTM', 'MERCURY TRANSFER MODULE']
}
""" Set dictionary variable mapping PDS instrument host ID with SPICE NAIF spacecraft body name and synonyms, and
consequently defining allowed mission IDs in configuration file. For example: 'Mercury Planetary Orbiter' as instrument
host/spacecraft name in PDS label will be translated into 'MPO', which becomes an allowed mission ID in the
configuration file.
"""
# Define a instrument host/spacecraft body name/ID code pair based on PDS_NAIF_MAPPING above.
for code in PDS_NAIF_MAPPING.keys():
for name in reversed(PDS_NAIF_MAPPING[code]):
spice.boddef(name, code)
[docs]class Config:
"""Class that represents a configuration to be used for computation.
It mainly allows to retrieve information from a :ref:`config_file_spec` and load SPICE kernels.
Attributes:
config_file (str): Configuration file path.
b3f_dir (str): B3F data directory path.
geojson_dir (str): GeoJSON data directory path.
log_dir (str): LOG data directory path.
addendum_dir (str): Addendum kernels directory path.
skd_basedir (str): SPICE Kernels Datasets (SKD) top-level directory path.
"""
def __init__(self, config_file, data_dir=DEFAULT_DATA_DIR, addendum_dir=''):
"""Constructs Config object.
Args:
config_file (str): Configuration file path.
data_dir (str, optional): Data directory path (default is :ref:`DEFAULT_DATA_DIR`).
addendum_dir (str, optional): Addendum directory path (default is the package data addendum directory).
"""
# load config file a json object.
with open(config_file, 'r') as fp:
self.config_dict = json.load(fp)
# set config file
self.config_file = config_file
# Set output B3F and GeoJSON data and log files base directories
abs_data_dir = os.path.join(os.path.abspath(data_dir), '')
self.b3f_dir = os.path.join(abs_data_dir, 'b3f/')
self.geojson_dir = os.path.join(abs_data_dir, 'geojson/')
self.log_dir = os.path.join(abs_data_dir, 'log/')
# Addendum kernels are always located in the data/addendum/ directory
# where the geogen package is installed.
if addendum_dir:
self.addendum_dir = os.path.join(os.path.abspath(addendum_dir), '')
else:
package_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.addendum_dir = package_dir + '/data/addendum/'
# Set path to ESA's SKDs base directory
self.skd_basedir = os.getenv('ESA_SPICE_KERNELS')
assert (self.skd_basedir != ''), "ESA_SPICE_KERNELS environment variable must be defined."
[docs] def get_missions(self):
"""Returns all mission IDs and definitions.
Returns:
Tuple[List[str], List[dict]]:
"""
mission_ids = []
mission_dicts = self.config_dict['missions']
for mission_dict in mission_dicts:
mission_ids.append(mission_dict['id'])
return mission_ids, mission_dicts
[docs] def get_mission(self, mission_id):
"""Returns the mission definition for a given mission ID.
Args:
mission_id (str): Mission ID.
Returns:
dict: Mission definition dictionary.
"""
mission_dict = {}
mission_ids, mission_dicts = self.get_missions()
for this_mission_dict in mission_dicts:
if this_mission_dict['id'] == mission_id:
mission_dict = this_mission_dict
return mission_dict
[docs] def get_instruments(self, mission_id):
"""Returns instrument IDs and definitions for a given mission ID.
Args:
mission_id (str): Mission ID.
Returns:
Tuple[List[str], List[str]]:
"""
instrument_ids = []
mission_dict = self.get_mission(mission_id)
instrument_dicts = []
if mission_dict:
instrument_dicts = mission_dict['instruments']
for this_instrument_dict in instrument_dicts:
instrument_ids.append(this_instrument_dict['id'])
return instrument_ids, instrument_dicts
[docs] def get_instrument(self, mission_id, instrument_id):
"""Returns the instrument definition for a given mission ID and instrument ID.
Args:
mission_id (str): Mission ID.
instrument_id (str): Instrument ID.
Returns:
dict: Instrument definition dictionary.
"""
instrument_dict = {}
instrument_ids, instrument_dicts = self.get_instruments(mission_id)
for this_instrument_dict in instrument_dicts:
if this_instrument_dict['id'] == instrument_id:
instrument_dict = this_instrument_dict
return instrument_dict
[docs] def get_products(self, mission_id, instrument_id):
"""Returns product types, definitions and default type for a given mission ID.
Args:
mission_id (str): Mission ID.
instrument_id (str): Instrument ID.
Returns:
Tuple[List[str], List[dict], str]:
"""
product_types = []
instrument_dict = self.get_instrument(mission_id, instrument_id)
product_dicts = []
default_product_type = ''
if instrument_dict:
product_dicts = instrument_dict['products']
default_product_type = product_dicts[0]['type']
for this_product_dict in product_dicts:
product_types.append(this_product_dict['type'])
return product_types, product_dicts, default_product_type
[docs] def get_product(self, mission_id, instrument_id, product_type):
"""Returns the product definition and default type for a given mission ID, instrument ID and product type.
Args:
mission_id (str): Mission ID.
instrument_id (str): Instrument ID.
product_type (str): Product type.
Returns:
Tuple[dict, str]:
"""
product_dict = {}
product_types, product_dicts, default_product_type = self.get_products(mission_id, instrument_id)
# Set input product_type to default if empty
if product_type not in product_types:
product_type = default_product_type
logger.warning(
'Undefined or null input product_type: default <{}/{}/{}> product type definition used instead.'.format(
mission_id, instrument_id, default_product_type))
for this_product_dict in product_dicts:
if this_product_dict['type'] == product_type:
product_dict = this_product_dict
product_dict['type'] = product_type # update product type to handle null input product_type
return product_dict, default_product_type
[docs] def get_product_class(self, mission_id, instrument_id, product_type):
"""Returns the product class name for a given mission ID, instrument ID and product type.
Args:
mission_id (str): Mission ID.
instrument_id (str): Instrument ID.
product_type (str): Product type.
Returns:
str: Product class name (see :ref:`instrument_product_class`).
"""
product_class = ''
product_dict, default_product_type = self.get_product(mission_id, instrument_id, product_type)
if product_dict:
product_class = product_dict['py_class'] if product_dict['py_class'] != '' else 'Product'
return product_class
[docs] def get_product_req_props(self, mission_id, instrument_id, product_type, pds_version='PDS3'):
"""Returns the product required properties for a given mission ID, instrument ID and product type.
Args:
mission_id (str): Mission ID.
instrument_id (str): Instrument ID.
product_type (str): Product type.
pds_version (str, optional): PDS version, either 'PDS3' (default) or 'PDS4'.
Returns:
Tuple[List[str], List[str], str]: List of properties names, paths, and default product type.
"""
# Add product properties common to all products
#
if mission_id == '':
pds_version = pds_version
else:
# get PDS version for input mission ID
mission_dict = self.get_mission(mission_id)
pds_version = mission_dict['pds_version']
# get required product properties for applicable PDS version
req_product_props = self.config_dict['req_product_props'] # list of {"name": "", "path": ""}
req_props_names = []
req_props_paths = []
for req_product_prop in req_product_props:
req_props_names.append(req_product_prop['name'])
req_props_paths.append(req_product_prop['path'][pds_version])
# return required properties name and path if mission or instrument ID are not provided
default_product_type = ''
if mission_id == '' or instrument_id == '':
return req_props_names, req_props_paths, default_product_type
# Append additional required properties for given instrument, instrument and product type.
product_dict, default_product_type = self.get_product(mission_id, instrument_id, product_type)
req_props = []
if 'req_props' in product_dict.keys():
req_props = product_dict['req_props']
for req_prop in req_props:
req_props_names.append(req_prop['name'])
req_props_paths.append(req_prop['path'])
return req_props_names, req_props_paths, default_product_type
[docs] def get_kernels(self, mission):
"""Returns the list of SPICE kernels defined for a given mission ID.
Args:
mission (str): Mission ID.
Returns:
List[str]:
"""
spice_kernels = []
missions = self.config_dict['missions']
for this_mission in missions:
if this_mission['id'] == mission:
spice_kernels = this_mission['spice_kernels']
for i, spice_kernel in enumerate(spice_kernels):
if KERNELS_PATH_SYMBOL in spice_kernel:
spice_kernel = spice_kernel.replace(KERNELS_PATH_SYMBOL, self.skd_basedir, 1)
spice_kernel = spice_kernel.replace('//', '/', 1)
else:
spice_kernel = self.addendum_dir + spice_kernel
spice_kernel = spice_kernel.replace('//', '/', 1)
spice_kernels[i] = spice_kernel
return spice_kernels
[docs] def load_kernels(self, spice_kernels):
"""Load a list of SPICE kernels.
Args:
spice_kernels (List[str]): List of SPICE kernels.
"""
# Set initial loaded status and SKD version
loaded = False
skd_version = ''
# set mission kernels base directory from metakernel path
# Assumption: kernels directory in an SKD is always 'kernels'. Eg:
# ~/workspace/git-repos/spice_kernels/mars-express/kernels
#
path = spice_kernels[0] # full metakernel path
basename = ''
while basename != 'kernels':
path = os.path.dirname(path)
if path == '' or path == '/':
break
basename = os.path.basename(path)
kernels_path = path
if kernels_path == '':
logger.warning('Unable to derive mission SKD kernels/ path.')
return loaded, skd_version
# first kernel must always be a mission metakernel that is not loaded
# using usual furnsh routine.
for i, spice_kernel in enumerate(spice_kernels):
if i == 0: # load meta-kernel
spice.ldpool(spice_kernel)
path_symb, found = spice.gcpool('PATH_SYMBOLS', 0, 1, 20)
pool_kernels, found = spice.gcpool('KERNELS_TO_LOAD', 0, 5000, 100)
skd_version, found = spice.gcpool('SKD_VERSION', 0, 1, 100)
if found: skd_version = skd_version[0]
for pool_kernel in pool_kernels:
spice.furnsh(pool_kernel.replace('$' + path_symb[0], kernels_path))
else: # load addendum kernels
spice.furnsh(spice_kernel)
loaded = True
return loaded, skd_version
[docs] def get_kernels_reference_time(self, body_name): # SPK (used by geogen fov command)
"""Get reference ephemeris time for a given SPICE body name.
Used by the :ref:`geogen_cli` ``fov`` command.
Args:
body_name (str): SPICE body name (mission ID).
Returns:
float:
"""
reference_time = 0.0
idcode, exists = spice.bodn2c(body_name)
n_kernels = spice.ktotal('SPK')
for i in range(n_kernels):
file, type, source, handle, found = spice.kdata(i, 'SPK', 256, 33, 256)
if found:
spk_file = os.path.abspath(file)
time_window = spice.spkcov(spk_file, idcode)
n_intervals = spice.wncard(time_window)
if n_intervals > 0:
et_start, et_stop = spice.wnfetd(time_window, 0)
reference_time = (et_start + et_stop) / 2
return reference_time
return reference_time
[docs] def get_primary_target(self, mission):
"""Get primary target of a given mission ID.
Args:
mission (str): Mission ID.
Returns:
str: Target name.
"""
missions = self.config_dict['missions']
for this_mission in missions:
if this_mission['id'] == mission:
if 'primary_target' in this_mission.keys():
return this_mission['primary_target']
else:
print('WARNING: No primary_target defined for ' + this_mission['name'] + '.')
return ''
[docs] def get_applicable_targets(self):
"""Return applicable target names.
Returns:
List[str]:
"""
try:
applicable_targets = self.config_dict['applicable_targets']
except Exception as e:
logger.warning('Could not retrieve applicable_targets in config file {}'.format(self.config_file))
applicable_targets = []
return applicable_targets
[docs] def get_target_frame(self, target_name):
"""Returns body-fixed reference frame defined for a given target.
Args:
target_name (str): Target name.
Returns:
str: Target body-fixed reference frame name.
"""
targets = self.config_dict['targets']
for this_target in targets:
if this_target['name'] == target_name:
if 'frame' in this_target.keys():
return this_target['frame']
return ''
[docs] def get_dsk_file(self, target_name):
"""Returns the SPICE DSK file defined for a given target.
Args:
target_name (str): Target name.
Returns:
str: SPICE DSK file path.
"""
targets = self.config_dict['targets']
for this_target in targets:
if this_target['name'] == target_name:
if 'dsk_file' in this_target.keys():
return self.addendum_dir + this_target['dsk_file']
return ''