Source code for geogen.config

"""The ``config`` module holds the :class:`Config <geogen.config.Config>` class and related default variables.
"""

import spiceypy as spice
import json
import os
from loguru import logger

DEFAULT_DATA_DIR = 'data'
"""Default output data directory is the data directory relative to from where GEOGEN is executed.
"""
KERNELS_PATH_SYMBOL = '$ESA_SPICE_KERNELS'
"""Symbol used in the Configuration file to specify the path of a mission metakernel file relative to path of the
ESA's SPICE Kernels Datasets (SKDs) top-level directory, which specified by the ``ESA_SPICE_KERNELS`` environment
variable.
"""

PDS_NAIF_MAPPING = {
    -41:     ['MEX', 'MARS EXPRESS', 'MARS-EXPRESS', 'MARS_EXPRESS'],
    -248:    ['VEX', 'VENUS EXPRESS', 'VENUS-EXPRESS', 'VENUS_EXPRESS'],
    -226:    ['ROSETTA', 'ROS', 'ROSETTA ORBITER'],
    -226800: ['RL', 'ROS_LANDER', 'PHILAE'],
    -143:    ['TGO', 'TRACE GAS ORBITER', 'EXOMARS TGO', 'EXOMARS 2016 TGO'],
    -238:    ['S1', 'SM1', 'SMART1', 'SMART-1'],
    -121:    ['MPO', 'BEPICOLOMBO MPO', 'MERCURY PLANETARY ORBITER'],
    -652:    ['MTM', 'BEPICOLOMBO MTM', 'MERCURY TRANSFER MODULE']
}
""" Set dictionary variable mapping PDS instrument host ID with SPICE NAIF spacecraft body name and synonyms, and
consequently defining allowed mission IDs in configuration file. For example: 'Mercury Planetary Orbiter' as instrument
host/spacecraft name in PDS label will be translated into 'MPO', which becomes an allowed mission ID in the
configuration file.
"""
# Define a instrument host/spacecraft body name/ID code pair based on PDS_NAIF_MAPPING above.
for code in PDS_NAIF_MAPPING.keys():
    for name in reversed(PDS_NAIF_MAPPING[code]):
        spice.boddef(name, code)

[docs]class Config: """Class that represents a configuration to be used for computation. It mainly allows to retrieve information from a :ref:`config_file_spec` and load SPICE kernels. Attributes: config_file (str): Configuration file path. b3f_dir (str): B3F data directory path. geojson_dir (str): GeoJSON data directory path. log_dir (str): LOG data directory path. addendum_dir (str): Addendum kernels directory path. skd_basedir (str): SPICE Kernels Datasets (SKD) top-level directory path. """ def __init__(self, config_file, data_dir=DEFAULT_DATA_DIR, addendum_dir=''): """Constructs Config object. Args: config_file (str): Configuration file path. data_dir (str, optional): Data directory path (default is :ref:`DEFAULT_DATA_DIR`). addendum_dir (str, optional): Addendum directory path (default is the package data addendum directory). """ # load config file a json object. with open(config_file, 'r') as fp: self.config_dict = json.load(fp) # set config file self.config_file = config_file # Set output B3F and GeoJSON data and log files base directories abs_data_dir = os.path.join(os.path.abspath(data_dir), '') self.b3f_dir = os.path.join(abs_data_dir, 'b3f/') self.geojson_dir = os.path.join(abs_data_dir, 'geojson/') self.log_dir = os.path.join(abs_data_dir, 'log/') # Addendum kernels are always located in the data/addendum/ directory # where the geogen package is installed. if addendum_dir: self.addendum_dir = os.path.join(os.path.abspath(addendum_dir), '') else: package_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.addendum_dir = package_dir + '/data/addendum/' # Set path to ESA's SKDs base directory self.skd_basedir = os.getenv('ESA_SPICE_KERNELS') assert (self.skd_basedir != ''), "ESA_SPICE_KERNELS environment variable must be defined."
[docs] def get_missions(self): """Returns all mission IDs and definitions. Returns: Tuple[List[str], List[dict]]: """ mission_ids = [] mission_dicts = self.config_dict['missions'] for mission_dict in mission_dicts: mission_ids.append(mission_dict['id']) return mission_ids, mission_dicts
[docs] def get_mission(self, mission_id): """Returns the mission definition for a given mission ID. Args: mission_id (str): Mission ID. Returns: dict: Mission definition dictionary. """ mission_dict = {} mission_ids, mission_dicts = self.get_missions() for this_mission_dict in mission_dicts: if this_mission_dict['id'] == mission_id: mission_dict = this_mission_dict return mission_dict
[docs] def get_instruments(self, mission_id): """Returns instrument IDs and definitions for a given mission ID. Args: mission_id (str): Mission ID. Returns: Tuple[List[str], List[str]]: """ instrument_ids = [] mission_dict = self.get_mission(mission_id) instrument_dicts = [] if mission_dict: instrument_dicts = mission_dict['instruments'] for this_instrument_dict in instrument_dicts: instrument_ids.append(this_instrument_dict['id']) return instrument_ids, instrument_dicts
[docs] def get_instrument(self, mission_id, instrument_id): """Returns the instrument definition for a given mission ID and instrument ID. Args: mission_id (str): Mission ID. instrument_id (str): Instrument ID. Returns: dict: Instrument definition dictionary. """ instrument_dict = {} instrument_ids, instrument_dicts = self.get_instruments(mission_id) for this_instrument_dict in instrument_dicts: if this_instrument_dict['id'] == instrument_id: instrument_dict = this_instrument_dict return instrument_dict
[docs] def get_products(self, mission_id, instrument_id): """Returns product types, definitions and default type for a given mission ID. Args: mission_id (str): Mission ID. instrument_id (str): Instrument ID. Returns: Tuple[List[str], List[dict], str]: """ product_types = [] instrument_dict = self.get_instrument(mission_id, instrument_id) product_dicts = [] default_product_type = '' if instrument_dict: product_dicts = instrument_dict['products'] default_product_type = product_dicts[0]['type'] for this_product_dict in product_dicts: product_types.append(this_product_dict['type']) return product_types, product_dicts, default_product_type
[docs] def get_product(self, mission_id, instrument_id, product_type): """Returns the product definition and default type for a given mission ID, instrument ID and product type. Args: mission_id (str): Mission ID. instrument_id (str): Instrument ID. product_type (str): Product type. Returns: Tuple[dict, str]: """ product_dict = {} product_types, product_dicts, default_product_type = self.get_products(mission_id, instrument_id) # Set input product_type to default if empty if product_type not in product_types: product_type = default_product_type logger.warning( 'Undefined or null input product_type: default <{}/{}/{}> product type definition used instead.'.format( mission_id, instrument_id, default_product_type)) for this_product_dict in product_dicts: if this_product_dict['type'] == product_type: product_dict = this_product_dict product_dict['type'] = product_type # update product type to handle null input product_type return product_dict, default_product_type
[docs] def get_product_class(self, mission_id, instrument_id, product_type): """Returns the product class name for a given mission ID, instrument ID and product type. Args: mission_id (str): Mission ID. instrument_id (str): Instrument ID. product_type (str): Product type. Returns: str: Product class name (see :ref:`instrument_product_class`). """ product_class = '' product_dict, default_product_type = self.get_product(mission_id, instrument_id, product_type) if product_dict: product_class = product_dict['py_class'] if product_dict['py_class'] != '' else 'Product' return product_class
[docs] def get_product_req_props(self, mission_id, instrument_id, product_type, pds_version='PDS3'): """Returns the product required properties for a given mission ID, instrument ID and product type. Args: mission_id (str): Mission ID. instrument_id (str): Instrument ID. product_type (str): Product type. pds_version (str, optional): PDS version, either 'PDS3' (default) or 'PDS4'. Returns: Tuple[List[str], List[str], str]: List of properties names, paths, and default product type. """ # Add product properties common to all products # if mission_id == '': pds_version = pds_version else: # get PDS version for input mission ID mission_dict = self.get_mission(mission_id) pds_version = mission_dict['pds_version'] # get required product properties for applicable PDS version req_product_props = self.config_dict['req_product_props'] # list of {"name": "", "path": ""} req_props_names = [] req_props_paths = [] for req_product_prop in req_product_props: req_props_names.append(req_product_prop['name']) req_props_paths.append(req_product_prop['path'][pds_version]) # return required properties name and path if mission or instrument ID are not provided default_product_type = '' if mission_id == '' or instrument_id == '': return req_props_names, req_props_paths, default_product_type # Append additional required properties for given instrument, instrument and product type. product_dict, default_product_type = self.get_product(mission_id, instrument_id, product_type) req_props = [] if 'req_props' in product_dict.keys(): req_props = product_dict['req_props'] for req_prop in req_props: req_props_names.append(req_prop['name']) req_props_paths.append(req_prop['path']) return req_props_names, req_props_paths, default_product_type
[docs] def get_kernels(self, mission): """Returns the list of SPICE kernels defined for a given mission ID. Args: mission (str): Mission ID. Returns: List[str]: """ spice_kernels = [] missions = self.config_dict['missions'] for this_mission in missions: if this_mission['id'] == mission: spice_kernels = this_mission['spice_kernels'] for i, spice_kernel in enumerate(spice_kernels): if KERNELS_PATH_SYMBOL in spice_kernel: spice_kernel = spice_kernel.replace(KERNELS_PATH_SYMBOL, self.skd_basedir, 1) spice_kernel = spice_kernel.replace('//', '/', 1) else: spice_kernel = self.addendum_dir + spice_kernel spice_kernel = spice_kernel.replace('//', '/', 1) spice_kernels[i] = spice_kernel return spice_kernels
[docs] def load_kernels(self, spice_kernels): """Load a list of SPICE kernels. Args: spice_kernels (List[str]): List of SPICE kernels. """ # Set initial loaded status and SKD version loaded = False skd_version = '' # set mission kernels base directory from metakernel path # Assumption: kernels directory in an SKD is always 'kernels'. Eg: # ~/workspace/git-repos/spice_kernels/mars-express/kernels # path = spice_kernels[0] # full metakernel path basename = '' while basename != 'kernels': path = os.path.dirname(path) if path == '' or path == '/': break basename = os.path.basename(path) kernels_path = path if kernels_path == '': logger.warning('Unable to derive mission SKD kernels/ path.') return loaded, skd_version # first kernel must always be a mission metakernel that is not loaded # using usual furnsh routine. for i, spice_kernel in enumerate(spice_kernels): if i == 0: # load meta-kernel spice.ldpool(spice_kernel) path_symb, found = spice.gcpool('PATH_SYMBOLS', 0, 1, 20) pool_kernels, found = spice.gcpool('KERNELS_TO_LOAD', 0, 5000, 100) skd_version, found = spice.gcpool('SKD_VERSION', 0, 1, 100) if found: skd_version = skd_version[0] for pool_kernel in pool_kernels: spice.furnsh(pool_kernel.replace('$' + path_symb[0], kernels_path)) else: # load addendum kernels spice.furnsh(spice_kernel) loaded = True return loaded, skd_version
[docs] def get_kernels_reference_time(self, body_name): # SPK (used by geogen fov command) """Get reference ephemeris time for a given SPICE body name. Used by the :ref:`geogen_cli` ``fov`` command. Args: body_name (str): SPICE body name (mission ID). Returns: float: """ reference_time = 0.0 idcode, exists = spice.bodn2c(body_name) n_kernels = spice.ktotal('SPK') for i in range(n_kernels): file, type, source, handle, found = spice.kdata(i, 'SPK', 256, 33, 256) if found: spk_file = os.path.abspath(file) time_window = spice.spkcov(spk_file, idcode) n_intervals = spice.wncard(time_window) if n_intervals > 0: et_start, et_stop = spice.wnfetd(time_window, 0) reference_time = (et_start + et_stop) / 2 return reference_time return reference_time
[docs] def get_primary_target(self, mission): """Get primary target of a given mission ID. Args: mission (str): Mission ID. Returns: str: Target name. """ missions = self.config_dict['missions'] for this_mission in missions: if this_mission['id'] == mission: if 'primary_target' in this_mission.keys(): return this_mission['primary_target'] else: print('WARNING: No primary_target defined for ' + this_mission['name'] + '.') return ''
[docs] def get_applicable_targets(self): """Return applicable target names. Returns: List[str]: """ try: applicable_targets = self.config_dict['applicable_targets'] except Exception as e: logger.warning('Could not retrieve applicable_targets in config file {}'.format(self.config_file)) applicable_targets = [] return applicable_targets
[docs] def get_target_frame(self, target_name): """Returns body-fixed reference frame defined for a given target. Args: target_name (str): Target name. Returns: str: Target body-fixed reference frame name. """ targets = self.config_dict['targets'] for this_target in targets: if this_target['name'] == target_name: if 'frame' in this_target.keys(): return this_target['frame'] return ''
[docs] def get_dsk_file(self, target_name): """Returns the SPICE DSK file defined for a given target. Args: target_name (str): Target name. Returns: str: SPICE DSK file path. """ targets = self.config_dict['targets'] for this_target in targets: if this_target['name'] == target_name: if 'dsk_file' in this_target.keys(): return self.addendum_dir + this_target['dsk_file'] return ''