Source code for geogen.tamn

""" The ``tamn`` module is a wrapper to the TAMN web service, and provides additional functions for handling
GeoJSON LineString geometry type, and to derive boundary box coordinates of GeoJSON Geometry, and whether or not this
geometry is crossing the target body's antimeridian, North Pole and South Pole.

.. note::

    ``TAMN_DEFAULT_RADIUS`` and ``TAMN_DEFAULT_POLE_DISTANCE`` values might need to be adapted to the target body.
    Currently yielding good results for Earth, Mars and Phobos.
"""

import json
from geojson import LineString, MultiLineString
import spiceypy as spice
import numpy as np
from shapely.geometry import shape
import requests
import geodaisy.converters as convert
from os import getenv

env_tamn_url = getenv('TAMN_URL')
TAMN_URL = env_tamn_url if env_tamn_url else 'http://localhost:3333/'
"""Default TAMN web service end-point URL.
"""

TAMN_DEFAULT_RADIUS = 50000
"""Default radius (m) used in TAMN algorithm.
"""

TAMN_DEFAULT_POLE_DISTANCE = 500000
"""Default pole distance (m) used in TAMN algorithm.
"""
[docs]def sort_frustum(frustum): """Sort frustum. Args: frustum (list): set of surface points originating from a target body center (for example: [ [x1, y1, z1], [x2, y2, z2], [x3, y3, z3], [x4, y4, z4] ]). Returns: list: """ lons = [] z_values = [] n_vertices = len(frustum) # 4 for LINE detector for j in range(n_vertices): r, lon, lat = spice.reclat(frustum[j]) lons.append(lon) z_values.append(frustum[j][2]) order = np.argsort(lons) sorted_frustum = [frustum[i] for i in order] if np.mean(z_values) < 0: sorted_frustum.reverse() return sorted_frustum
[docs]def in_frustum(vec, frustum): """Returns whether or not a vector is contained within a frustum. Args: vec (list): 3d-vector representing a surface point, expressed in cartesian coordinates. frustum (list): set of surface points originating from a target body center. Returns: bool: """ n_vertices = len(frustum) # 4 for LINE detector sorted_frustum = sort_frustum(frustum) # sorted_frustum = frustum inside = True for j in range(n_vertices): v1 = np.asarray(sorted_frustum[j]) v2 = np.asarray(sorted_frustum[(j + 1) % n_vertices]) vnorm = spice.vcrss(v1, v2) vsep = spice.vsep(vnorm, vec) if vsep >= spice.halfpi(): inside = False return inside
[docs]def get_bbox(geometry): """Returns the boundary box coordinates of an input GeoJSON Geometry objects, and whether or not this geometry is crossing the antimeridian, North Pole and South Pole. Args: geometry (GeoJSON Geometry): Input GeoJSON Geometry object. Returns: Tuple[List[float], bool, bool, bool]: """ bbox = None antimeridian_crossing = None npole_crossing = None spole_crossing = None if not geometry: return bbox, antimeridian_crossing, npole_crossing, spole_crossing # Get lon/lat coordinate tuples from input GeoJSON geometry if geometry.type == 'LineString': coordinates = geometry.coordinates elif geometry.type == 'Polygon': if len(geometry.coordinates) == 0: print('Input Polygon does not contain any points.') bbox = [0.0, 0.0, 0.0, 0.0] return bbox, antimeridian_crossing, npole_crossing, spole_crossing coordinates = geometry.coordinates[0] else: print(f'Invalid input GeoJSON geometry type: {geometry.type}. Only LineString or Polygon allowed.') return bbox, antimeridian_crossing, npole_crossing, spole_crossing # Determine whether or not input geometry is crossing the antimeridian and/or the pole(s) # n_points = len(coordinates) # last point is identical to first point Polygon geometry longitudes = [] latitudes = [] for coordinate in coordinates: longitudes.append(coordinate[0]) latitudes.append(coordinate[1]) longitudes = np.array(longitudes) latitudes = np.array(latitudes) # North and/or South pole crossing determination, for Polygon geometry only. if geometry.type == 'Polygon': frustum = [] for i in range(n_points-1): # exclude last, corresponding to first point. frustum.append(spice.latrec(1.0, longitudes[i]*spice.rpd(), latitudes[i]*spice.rpd())) npole_crossing = in_frustum([0.0, 0.0, 1.0], frustum) spole_crossing = in_frustum([0.0, 0.0, -1.0], frustum) # Antimeridian crossing determination antimeridian_crossing = False if npole_crossing or spole_crossing: antimeridian_crossing = True else: for i in range(n_points-1): if crossing(coordinates[i], coordinates[i+1])[0]: antimeridian_crossing = True break # Derive boundary box coordinates westernmost_longitude = np.min(longitudes) easternmost_longitude = np.max(longitudes) if npole_crossing or spole_crossing: westernmost_longitude = -180.0 easternmost_longitude = 180.0 elif antimeridian_crossing: westernmost_longitude = np.min(longitudes[np.where(longitudes > 0)]) easternmost_longitude = np.max(longitudes[np.where(longitudes < 0)]) minimum_latitude = np.min(latitudes) maximum_latitude = np.max(latitudes) if npole_crossing: maximum_latitude = 90.0 if spole_crossing: minimum_latitude = -90.0 bbox = [westernmost_longitude, minimum_latitude, easternmost_longitude, maximum_latitude] return bbox, antimeridian_crossing, npole_crossing, spole_crossing
[docs]def crossing(p1, p2): """Returns whether or not two surface points, defining a line, crosses the antimeridian (+/ 180 degrees). Args: p1 (list): Latitudinal coordinates of first point defining a line ([lon, lat]). p2 (list): Latitudinal coordinates of second point defining a line ([lon, lat]). Returns: bool: """ p0 = [0., 0.] p1_x = np.cos(p1[0] * spice.rpd()) p2_x = np.cos(p2[0] * spice.rpd()) if p1_x < 0 and p2_x < 0: p1_y = np.sin(p1[0] * spice.rpd()) p2_y = np.sin(p2[0] * spice.rpd()) if (p1_y > 0 and p2_y < 0) or (p1_y < 0 and p2_y > 0): lat0 = 0.5 * (p1[1] + p2[1]) lon0 = p1_y / abs(p1_y) * 180. p0 = [lon0, lat0] return True, p0 return False, p0
[docs]def split_line(line): """Returns split line, which can be a collection lines. Args: line (list): Input line, which consists of two points latitudinal coordinates ([lon, lat]). Returns: List[list]: """ coords = line lines = [] i0 = 0 for i in range(len(coords) - 1): flag, p0 = crossing(coords[i], coords[i + 1]) if flag: if i0 > 0: line = [] line.append(p0_next) for coord in coords[i0:i]: line.append(coord) else: line = coords[i0:i] line.append(p0) if len(line) > 1: # exclude one-point line (workaround) lines.append(line) p0_next = [-p0[0], p0[1]] # first point of the next line i0 = i + 1 # start index of next line # add last line if i0 > 0: line = [] line.append(p0_next) for coord in coords[i0:]: line.append(coord) lines.append(line) else: lines = [line] return lines
[docs]def getSplitLineString(feature): """Returns split `GeoJSON Feature`_ of LineString geometry type. Other geometry types are handled by the :func:`getSplitLineString <geogen.tamn.getSplitFeature>` TAMN wrapper function. Args: feature (`GeoJSON Feature`_): Input GeoJSON Feature object of LineString geometry type. Returns: `GeoJSON Feature`_: """ if feature['geometry']['type'] != 'LineString': return feature coords = feature['geometry']['coordinates'] lines = split_line(coords) feature['geometry'] = MultiLineString(lines) return feature
[docs]def getSplitFeature(feature): """Returns split GeoJSON Feature geometry. Wrapper to the TAMN web service. Feature of LineString geometry are handled independantly of TAMN by the :func:`getSplitLineString <geogen.tamn.getSplitLineString>` function. Args: feature (`GeoJSON Feature`_): Input GeoJSON Feature object to split. Returns: `GeoJSON Feature`_: """ if feature['geometry'] == None: print('Cannot split a null geometry.') return feature # Handle LineString geometries independently of TAMN. # if feature['geometry']['type'] == 'LineString': return getSplitLineString(feature) if len(feature['geometry']['coordinates']) == 0: print('Cannot split a Polygon which does not contain any points.') return feature # Otherwise, let TAMN handle other geometry types. # # set TAMN web server end-point request = TAMN_URL + 'split' # convert input feature GeoJSON dict in WKT string if not feature['geometry']['coordinates']: print('No Polygon coordinates.') return feature wkt = convert.geojson_to_wkt(json.dumps(feature['geometry'])) data = json.dumps({"geometry": wkt, "radius": TAMN_DEFAULT_RADIUS, "pole_distance": TAMN_DEFAULT_POLE_DISTANCE}) # send split request to TAMN web service r = requests.post(request, data=data, headers={'Content-Type': 'application/json'}) if r.status_code != 200: print('[ERROR] POST failed - {code} {text}'.format(code=r.status_code, text=r.text)) return 0 # set new geometry feature['geometry'] = json.loads(r.text)['geometry'] return feature
[docs]def splitGeoJSON(in_geojson_file, out_geojson_file): """Split geometry within an input GeoJSON file. Args: in_geojson_file: Input GeoJSON file path. out_geojson_file: Output GeoJSON file path. """ # Load input GeoJSON file that can contain either only one Feature or a FeatureCollection. with open(in_geojson_file) as f: geojson_dict = json.load(f) # If only one Feature, split and update feature geometry in geojson_dict if geojson_dict['type'] == 'Feature': splitFeature = getSplitFeature(geojson_dict) geojson_dict['geometry'] = splitFeature # If FeatureCollection, split and update each feature elif geojson_dict['type'] == 'FeatureCollection': splitFeatures = [] for feature in geojson_dict['features']: splitFeatures.append(getSplitFeature(feature)) geojson_dict['features'] = splitFeatures # If exists, update "antimeridian_split" property from false to true. geojson_dict['coverage']['antimeridian_split'] = True # Write geojson_dict input ouput GeoJSON file with open(out_geojson_file, 'w') as f: json.dump(geojson_dict, f)
[docs]def check_tamn_server(): """Check connectivity of TAMN server application. Returns: Tuple[bool, int, str]: Whether or not server application is working, server error code and message. """ server_ok = True server_error = 200 server_message = '' wkt = 'POLYGON ((179.0 0.0, -179.0 0.0, -179.0 1.0, 179.0 1.0, 179.0 0.0))' data = json.dumps({"geometry": wkt, "radius": TAMN_DEFAULT_RADIUS, "pole_distance": TAMN_DEFAULT_POLE_DISTANCE}) try: r = requests.post(TAMN_URL + 'split', data=data, headers={'Content-Type': 'application/json'}) if r.status_code != 200: server_ok = False server_error = r.status_code server_message = r.text except Exception: server_ok = False server_error = 404 server_message = 'Connection error.' return server_ok, server_error, server_message