""" The ``tamn`` module is a wrapper to the TAMN web service, and provides additional functions for handling
GeoJSON LineString geometry type, and to derive boundary box coordinates of GeoJSON Geometry, and whether or not this
geometry is crossing the target body's antimeridian, North Pole and South Pole.
.. note::
``TAMN_DEFAULT_RADIUS`` and ``TAMN_DEFAULT_POLE_DISTANCE`` values might need to be adapted to the target body.
Currently yielding good results for Earth, Mars and Phobos.
"""
import json
from geojson import LineString, MultiLineString
import spiceypy as spice
import numpy as np
from shapely.geometry import shape
import requests
import geodaisy.converters as convert
from os import getenv
env_tamn_url = getenv('TAMN_URL')
TAMN_URL = env_tamn_url if env_tamn_url else 'http://localhost:3333/'
"""Default TAMN web service end-point URL.
"""
TAMN_DEFAULT_RADIUS = 50000
"""Default radius (m) used in TAMN algorithm.
"""
TAMN_DEFAULT_POLE_DISTANCE = 500000
"""Default pole distance (m) used in TAMN algorithm.
"""
[docs]def sort_frustum(frustum):
"""Sort frustum.
Args:
frustum (list): set of surface points originating from a target body center (for example:
[ [x1, y1, z1], [x2, y2, z2], [x3, y3, z3], [x4, y4, z4] ]).
Returns:
list:
"""
lons = []
z_values = []
n_vertices = len(frustum) # 4 for LINE detector
for j in range(n_vertices):
r, lon, lat = spice.reclat(frustum[j])
lons.append(lon)
z_values.append(frustum[j][2])
order = np.argsort(lons)
sorted_frustum = [frustum[i] for i in order]
if np.mean(z_values) < 0:
sorted_frustum.reverse()
return sorted_frustum
[docs]def in_frustum(vec, frustum):
"""Returns whether or not a vector is contained within a frustum.
Args:
vec (list): 3d-vector representing a surface point, expressed in cartesian coordinates.
frustum (list): set of surface points originating from a target body center.
Returns:
bool:
"""
n_vertices = len(frustum) # 4 for LINE detector
sorted_frustum = sort_frustum(frustum)
# sorted_frustum = frustum
inside = True
for j in range(n_vertices):
v1 = np.asarray(sorted_frustum[j])
v2 = np.asarray(sorted_frustum[(j + 1) % n_vertices])
vnorm = spice.vcrss(v1, v2)
vsep = spice.vsep(vnorm, vec)
if vsep >= spice.halfpi():
inside = False
return inside
[docs]def get_bbox(geometry):
"""Returns the boundary box coordinates of an input GeoJSON Geometry objects, and whether or not this geometry is
crossing the antimeridian, North Pole and South Pole.
Args:
geometry (GeoJSON Geometry): Input GeoJSON Geometry object.
Returns:
Tuple[List[float], bool, bool, bool]:
"""
bbox = None
antimeridian_crossing = None
npole_crossing = None
spole_crossing = None
if not geometry:
return bbox, antimeridian_crossing, npole_crossing, spole_crossing
# Get lon/lat coordinate tuples from input GeoJSON geometry
if geometry.type == 'LineString':
coordinates = geometry.coordinates
elif geometry.type == 'Polygon':
if len(geometry.coordinates) == 0:
print('Input Polygon does not contain any points.')
bbox = [0.0, 0.0, 0.0, 0.0]
return bbox, antimeridian_crossing, npole_crossing, spole_crossing
coordinates = geometry.coordinates[0]
else:
print(f'Invalid input GeoJSON geometry type: {geometry.type}. Only LineString or Polygon allowed.')
return bbox, antimeridian_crossing, npole_crossing, spole_crossing
# Determine whether or not input geometry is crossing the antimeridian and/or the pole(s)
#
n_points = len(coordinates) # last point is identical to first point Polygon geometry
longitudes = []
latitudes = []
for coordinate in coordinates:
longitudes.append(coordinate[0])
latitudes.append(coordinate[1])
longitudes = np.array(longitudes)
latitudes = np.array(latitudes)
# North and/or South pole crossing determination, for Polygon geometry only.
if geometry.type == 'Polygon':
frustum = []
for i in range(n_points-1): # exclude last, corresponding to first point.
frustum.append(spice.latrec(1.0, longitudes[i]*spice.rpd(), latitudes[i]*spice.rpd()))
npole_crossing = in_frustum([0.0, 0.0, 1.0], frustum)
spole_crossing = in_frustum([0.0, 0.0, -1.0], frustum)
# Antimeridian crossing determination
antimeridian_crossing = False
if npole_crossing or spole_crossing:
antimeridian_crossing = True
else:
for i in range(n_points-1):
if crossing(coordinates[i], coordinates[i+1])[0]:
antimeridian_crossing = True
break
# Derive boundary box coordinates
westernmost_longitude = np.min(longitudes)
easternmost_longitude = np.max(longitudes)
if npole_crossing or spole_crossing:
westernmost_longitude = -180.0
easternmost_longitude = 180.0
elif antimeridian_crossing:
westernmost_longitude = np.min(longitudes[np.where(longitudes > 0)])
easternmost_longitude = np.max(longitudes[np.where(longitudes < 0)])
minimum_latitude = np.min(latitudes)
maximum_latitude = np.max(latitudes)
if npole_crossing:
maximum_latitude = 90.0
if spole_crossing:
minimum_latitude = -90.0
bbox = [westernmost_longitude, minimum_latitude, easternmost_longitude, maximum_latitude]
return bbox, antimeridian_crossing, npole_crossing, spole_crossing
[docs]def crossing(p1, p2):
"""Returns whether or not two surface points, defining a line, crosses the antimeridian (+/ 180 degrees).
Args:
p1 (list): Latitudinal coordinates of first point defining a line ([lon, lat]).
p2 (list): Latitudinal coordinates of second point defining a line ([lon, lat]).
Returns:
bool:
"""
p0 = [0., 0.]
p1_x = np.cos(p1[0] * spice.rpd())
p2_x = np.cos(p2[0] * spice.rpd())
if p1_x < 0 and p2_x < 0:
p1_y = np.sin(p1[0] * spice.rpd())
p2_y = np.sin(p2[0] * spice.rpd())
if (p1_y > 0 and p2_y < 0) or (p1_y < 0 and p2_y > 0):
lat0 = 0.5 * (p1[1] + p2[1])
lon0 = p1_y / abs(p1_y) * 180.
p0 = [lon0, lat0]
return True, p0
return False, p0
[docs]def split_line(line):
"""Returns split line, which can be a collection lines.
Args:
line (list): Input line, which consists of two points latitudinal coordinates ([lon, lat]).
Returns:
List[list]:
"""
coords = line
lines = []
i0 = 0
for i in range(len(coords) - 1):
flag, p0 = crossing(coords[i], coords[i + 1])
if flag:
if i0 > 0:
line = []
line.append(p0_next)
for coord in coords[i0:i]:
line.append(coord)
else:
line = coords[i0:i]
line.append(p0)
if len(line) > 1: # exclude one-point line (workaround)
lines.append(line)
p0_next = [-p0[0], p0[1]] # first point of the next line
i0 = i + 1 # start index of next line
# add last line
if i0 > 0:
line = []
line.append(p0_next)
for coord in coords[i0:]:
line.append(coord)
lines.append(line)
else:
lines = [line]
return lines
[docs]def getSplitLineString(feature):
"""Returns split `GeoJSON Feature`_ of LineString geometry type.
Other geometry types are handled by the :func:`getSplitLineString <geogen.tamn.getSplitFeature>`
TAMN wrapper function.
Args:
feature (`GeoJSON Feature`_): Input GeoJSON Feature object of LineString geometry type.
Returns:
`GeoJSON Feature`_:
"""
if feature['geometry']['type'] != 'LineString':
return feature
coords = feature['geometry']['coordinates']
lines = split_line(coords)
feature['geometry'] = MultiLineString(lines)
return feature
[docs]def getSplitFeature(feature):
"""Returns split GeoJSON Feature geometry.
Wrapper to the TAMN web service. Feature of LineString geometry are handled independantly of TAMN by the
:func:`getSplitLineString <geogen.tamn.getSplitLineString>` function.
Args:
feature (`GeoJSON Feature`_): Input GeoJSON Feature object to split.
Returns:
`GeoJSON Feature`_:
"""
if feature['geometry'] == None:
print('Cannot split a null geometry.')
return feature
# Handle LineString geometries independently of TAMN.
#
if feature['geometry']['type'] == 'LineString':
return getSplitLineString(feature)
if len(feature['geometry']['coordinates']) == 0:
print('Cannot split a Polygon which does not contain any points.')
return feature
# Otherwise, let TAMN handle other geometry types.
#
# set TAMN web server end-point
request = TAMN_URL + 'split'
# convert input feature GeoJSON dict in WKT string
if not feature['geometry']['coordinates']:
print('No Polygon coordinates.')
return feature
wkt = convert.geojson_to_wkt(json.dumps(feature['geometry']))
data = json.dumps({"geometry": wkt, "radius": TAMN_DEFAULT_RADIUS, "pole_distance": TAMN_DEFAULT_POLE_DISTANCE})
# send split request to TAMN web service
r = requests.post(request, data=data, headers={'Content-Type': 'application/json'})
if r.status_code != 200:
print('[ERROR] POST failed - {code} {text}'.format(code=r.status_code, text=r.text))
return 0
# set new geometry
feature['geometry'] = json.loads(r.text)['geometry']
return feature
[docs]def splitGeoJSON(in_geojson_file, out_geojson_file):
"""Split geometry within an input GeoJSON file.
Args:
in_geojson_file: Input GeoJSON file path.
out_geojson_file: Output GeoJSON file path.
"""
# Load input GeoJSON file that can contain either only one Feature or a FeatureCollection.
with open(in_geojson_file) as f:
geojson_dict = json.load(f)
# If only one Feature, split and update feature geometry in geojson_dict
if geojson_dict['type'] == 'Feature':
splitFeature = getSplitFeature(geojson_dict)
geojson_dict['geometry'] = splitFeature
# If FeatureCollection, split and update each feature
elif geojson_dict['type'] == 'FeatureCollection':
splitFeatures = []
for feature in geojson_dict['features']:
splitFeatures.append(getSplitFeature(feature))
geojson_dict['features'] = splitFeatures
# If exists, update "antimeridian_split" property from false to true.
geojson_dict['coverage']['antimeridian_split'] = True
# Write geojson_dict input ouput GeoJSON file
with open(out_geojson_file, 'w') as f:
json.dump(geojson_dict, f)
[docs]def check_tamn_server():
"""Check connectivity of TAMN server application.
Returns:
Tuple[bool, int, str]:
Whether or not server application is working, server error code and message.
"""
server_ok = True
server_error = 200
server_message = ''
wkt = 'POLYGON ((179.0 0.0, -179.0 0.0, -179.0 1.0, 179.0 1.0, 179.0 0.0))'
data = json.dumps({"geometry": wkt, "radius": TAMN_DEFAULT_RADIUS, "pole_distance": TAMN_DEFAULT_POLE_DISTANCE})
try:
r = requests.post(TAMN_URL + 'split', data=data, headers={'Content-Type': 'application/json'})
if r.status_code != 200:
server_ok = False
server_error = r.status_code
server_message = r.text
except Exception:
server_ok = False
server_error = 404
server_message = 'Connection error.'
return server_ok, server_error, server_message