Source code for geodepy.ntv2reader

"""
GeodePy - Python Geodesy Package
ntv2reader Module

Adapted from Jaimie Dodd's ntv2reader.py for AUSGeoid ntv2 files (2018-11-12).
Expanded (2021-03-21) for the more general case to include horizontal transformation grids
"""

import struct
from datetime import datetime as dt
import numpy as np


[docs] class NTv2Grid(object): def __init__( self, num_orec, num_srec, num_file, gs_type, version, system_f, system_t, major_f, minor_f, major_t, minor_t, file_path, ): ''' NTv2 Grid Parameters :param num_orec: Number of header identifiers :param num_srec: Number of sub-header idents :param num_file: Number of subgrids in file :param gs_type: grid shift type :param version: grid file version :param system_f: reference system :param system_t: reference system :param major_f: semi major of from system :param minor_f: semi minor of from system :param major_t: semi major of to system :param minor_t: semi minor of to system :param file_path: full path to ntv2 gsb file ''' self.num_orec = num_orec # Number of header identifiers self.num_srec = num_srec # Number of sub-header idents self.num_file = num_file # Number of subgrids in file self.gs_type = gs_type # grid shift type self.version = version # grid file version self.system_f = system_f # reference system self.system_t = system_t # reference system self.major_f = major_f # semi major of from system self.minor_f = minor_f # semi minor of from system self.major_t = major_t # semi major of to system self.minor_t = minor_t # semi minor of to system self.subgrids = {} self.file_path = file_path
[docs] class SubGrid(object): def __init__( self, sub_name, parent, created, updated, s_lat, n_lat, e_long, w_long, lat_inc, long_inc, gs_count, ): ''' Sub Grid Parameters :param sub_name: subgrid name :param parent: parent name :param created: date created :param updated: date modified :param s_lat: south latitude extents :param n_lat: north latitude extents :param e_long: east longitude extents :param w_long: west longitude extents :param lat_inc: latitude increment :param long_inc: longitude increment :param gs_count: total nodes in subgrid ''' self.sub_name = sub_name # subgrid name self.parent = parent # parent name self.created = created # date created self.updated = updated # date modified self.s_lat = s_lat # south latitude extents self.n_lat = n_lat # north latitude extents self.e_long = e_long # east longitude extents self.w_long = w_long # west longitude extents self.lat_inc = lat_inc # latitude increment self.long_inc = long_inc # longitude increment self.gs_count = gs_count # total nodes in subgrid
[docs] def ntv2_bilinear(self, lat, lon, num_cols, row, col, f, start_byte): """ Function to perform bicubic interpolation of the four ntv2 fields at a point of interest. :param lat: latitude of the point of interest (arc-seconds) :param lon: longitude of the point of interest (negative arc-seconds) :param num_cols: number of columns in grid :param row: row number of point to the bottom right of the point of interest :param col: column number of point to the bottom right of the point of interest :param f: variable for open file NTv2 being read as binary :param start_byte: start index of subgrid :return: Four field tuple of interpolation results at point of interest. """ # | | # --o-----o-- # |4 |3 # | *P | # --o-----o-- # |2 |1 # # o - Node position. # Determine position in the file of the four surrounding nodes pos1 = row * num_cols + col pos2 = pos1 + 1 pos3 = pos1 + num_cols pos4 = pos3 + 1 # Navigate to start of subgrid f.seek(start_byte, 1) # Navigate to start of posA node f.seek(16 * pos1, 1) # Read in values for nodes 1 and 2 node_1 = read_node(f) node_2 = read_node(f) # Navigate to beginning of node 3 f.seek(16 * (pos3 - pos2 - 1), 1) # Read in values for nodes 3 and 4 node_3 = read_node(f) node_4 = read_node(f) # Determine latitude and longitude of node 1 lat1 = self.s_lat + row * self.lat_inc long1 = self.e_long + col * self.long_inc # Determine interpolation scale factors x = (lon - long1) / self.long_inc y = (lat - lat1) / self.lat_inc field_1 = round( bilinear_interpolation(node_1[0], node_2[0], node_3[0], node_4[0], x, y), 6 ) field_2 = round( bilinear_interpolation(node_1[1], node_2[1], node_3[1], node_4[1], x, y), 6 ) field_3 = round( bilinear_interpolation(node_1[2], node_2[2], node_3[2], node_4[2], x, y), 6 ) field_4 = round( bilinear_interpolation(node_1[3], node_2[3], node_3[3], node_4[3], x, y), 6 ) return field_1, field_2, field_3, field_4
[docs] def ntv2_bicubic(self, lat, lon, num_cols, row, col, f, start_byte): """ Function to perform bicubic interpolation of the four ntv2 fields at a point of interest. :param lat: latitude of the point of interest (arc-seconds) :param lon: longitude of the point of interest (negative arc-seconds) :param num_cols: number of columns in grid :param row: row number of point to the bottom right of the point of interest :param col: column number of point to the bottom right of the point of interest :param f: variable for open file NTv2 being read as binary :param start_byte: start index of subgrid :return: Four field tuple of interpolation results at point of interest. """ # | | | | # --o-----o-----o-----o-- # |11 |12 |13 |14 # | | | | # --o-----o-----o-----o-- # |10 |3 |4 |15 # | | *P | | # --o-----o-----o-----o-- # |9 |2 |1 |16 # | | | | # --o-----o-----o-----o-- # |8 |7 |6 |5 # # o - Node position. # Determine position in the file of the sixteen surrounding nodes pos1 = row * num_cols + col pos2 = pos1 + 1 pos3 = pos2 + num_cols pos4 = pos3 - 1 pos5 = pos4 - 2 * num_cols - 1 pos6 = pos5 + 1 pos7 = pos6 + 1 pos8 = pos7 + 1 pos9 = pos8 + num_cols pos10 = pos9 + num_cols pos11 = pos10 + num_cols pos12 = pos11 - 1 pos13 = pos12 - 1 pos14 = pos13 - 1 pos15 = pos14 - num_cols pos16 = pos15 - num_cols # Navigate to start of subgrid f.seek(start_byte, 1) # Navigate to start of pos1 node f.seek(16 * pos5, 1) # Read in values for nodes 5-8 node_5 = read_node(f) node_6 = read_node(f) node_7 = read_node(f) node_8 = read_node(f) # Navigate to start of pos16 node f.seek(16 * (pos16 - pos8 - 1), 1) # Read in values for nodes 16, 1, 2, and 9 node_16 = read_node(f) node_1 = read_node(f) node_2 = read_node(f) node_9 = read_node(f) # Navigate to start of pos15 node f.seek(16 * (pos15 - pos9 - 1), 1) # Read in values for nodes 15, 3, 4 and 10 node_15 = read_node(f) node_4 = read_node(f) node_3 = read_node(f) node_10 = read_node(f) # Navigate to start of pos14 node f.seek(16 * (pos14 - pos10 - 1), 1) # Read in values for nodes 11, 12, 13 and 14 node_14 = read_node(f) node_13 = read_node(f) node_12 = read_node(f) node_11 = read_node(f) # Determine latitude and longitude of node 1 lat1 = self.s_lat + row * self.lat_inc long1 = self.e_long + col * self.long_inc # Determine interpolation scale factors x = round((lon - long1) / self.long_inc, 6) y = round((lat - lat1) / self.lat_inc, 6) # Call bicubic interpolation function to interpolate ntv2 fields. field_1 = round( bicubic_interpolation( node_1[0], node_2[0], node_3[0], node_4[0], node_5[0], node_6[0], node_7[0], node_8[0], node_9[0], node_10[0], node_11[0], node_12[0], node_13[0], node_14[0], node_15[0], node_16[0], x, y, ), 6, ) field_2 = round( bicubic_interpolation( node_1[1], node_2[1], node_3[1], node_4[1], node_5[1], node_6[1], node_7[1], node_8[1], node_9[1], node_10[1], node_11[1], node_12[1], node_13[1], node_14[1], node_15[1], node_16[1], x, y, ), 6, ) field_3 = round( bicubic_interpolation( node_1[2], node_2[2], node_3[2], node_4[2], node_5[2], node_6[2], node_7[2], node_8[2], node_9[2], node_10[2], node_11[2], node_12[2], node_13[2], node_14[2], node_15[2], node_16[2], x, y, ), 6, ) field_4 = round( bicubic_interpolation( node_1[3], node_2[3], node_3[3], node_4[3], node_5[3], node_6[3], node_7[3], node_8[3], node_9[3], node_10[3], node_11[3], node_12[3], node_13[3], node_14[3], node_15[3], node_16[3], x, y, ), 6, ) return field_1, field_2, field_3, field_4
[docs] def read_node(f): """ Function to read in values of nodes :param f: variable for open file NTv2 being read as binary :return: Tuple containing the four ntv2 fields at the grid node. """ # field_1: shift lat / geoid separation (N) byte = f.read(4) field_1 = struct.unpack("f", byte)[0] # field 2: shift lon / deflection in prime meridian value (Xi) byte = f.read(4) field_2 = struct.unpack("f", byte)[0] # field 3: reliability lat / deflection in prime vertical value (Eta) byte = f.read(4) field_3 = struct.unpack("f", byte)[0] # field 4: reliability lon / NA byte = f.read(4) field_4 = struct.unpack("f", byte)[0] return field_1, field_2, field_3, field_4
[docs] def bilinear_interpolation(n1, n2, n3, n4, x, y): """ Bilinear interpolation of value for point of interest (P). :param n1: value at node 1 :param n2: value at node 2 :param n3: value at node 3 :param n4: value at node 4 :param x: interpolation scale factor for x axis :param y: interpolation scale factor for y axis :return: Value at node P """ a0 = n1 a1 = n2 - n1 a2 = n3 - n1 a3 = n1 + n4 - n2 - n3 p = a0 + (a1 * x) + (a2 * y) + (a3 * x * y) return p
[docs] def bicubic_interpolation( n1, n2, n3, n4, n5, n6, n7, n8, n9, n10, n11, n12, n13, n14, n15, n16, x, y ): """ Bicubic interpolation of value for point of interest (P). :param n1: value at node 1 :param n2: value at node 2 :param n3: value at node 3 :param n4: value at node 4 :param n5: value at node 5 :param n6: value at node 6 :param n7: value at node 7 :param n8: value at node 8 :param n9: value at node 9 :param n10: value at node 10 :param n11: value at node 11 :param n12: value at node 12 :param n13: value at node 13 :param n14: value at node 14 :param n15: value at node 16 :param n16: value at node 17 :param x: interpolation scale factor for x axis :param y: interpolation scale factor for y axis :return: Value at node P """ # Define the inverse of the coefficient matrix cinv = np.array( [ [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0], [-3, 0, 0, 3, 0, 0, 0, 0, -2, 0, 0, -1, 0, 0, 0, 0], [2, 0, 0, -2, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0], [0, 0, 0, 0, -3, 0, 0, 3, 0, 0, 0, 0, -2, 0, 0, -1], [0, 0, 0, 0, 2, 0, 0, -2, 0, 0, 0, 0, 1, 0, 0, 1], [-3, 3, 0, 0, -2, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, -3, 3, 0, 0, -2, -1, 0, 0], [9, -9, 9, -9, 6, 3, -3, -6, 6, -6, -3, 3, 4, 2, 1, 2], [-6, 6, -6, 6, -4, -2, 2, 4, -3, 3, 3, -3, -2, -1, -1, -2], [2, -2, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 2, -2, 0, 0, 1, 1, 0, 0], [-6, 6, -6, 6, -3, -3, 3, 3, -4, 4, 2, -2, -2, -2, -1, -1], [4, -4, 4, -4, 2, 2, -2, -2, 2, -2, -2, 2, 1, 1, 1, 1], ] ) # Define x parameters # Function values x1 = n1 x2 = n2 x3 = n3 x4 = n4 # X Derivatives x5 = (n2 - n16) / 2 x6 = (n9 - n1) / 2 x7 = (n10 - n4) / 2 x8 = (n3 - n15) / 2 # Y Derivatives x9 = (n4 - n6) / 2 x10 = (n3 - n7) / 2 x11 = (n12 - n2) / 2 x12 = (n13 - n1) / 2 # Cross Derivatives (XY) x13 = (n3 - n7 - n15 + n5) / 4 x14 = (n10 - n8 - n4 + n6) / 4 x15 = (n11 - n9 - n13 + n1) / 4 x16 = (n12 - n2 - n14 + n16) / 4 # Create array from x parameters xarr = np.array( [x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, x13, x14, x15, x16] ) # Multiply the inverse of the coefficient matrix by the array of x values to give array of alpha values alpha = np.matmul(cinv, xarr) # Calculate value at the point of interest n_p = 0 for i in range(0, 4): for j in range(0, 4): n_p = n_p + alpha[i * 4 + j] * x**i * y**j return n_p
[docs] def read_ntv2_file(ntv2_gsb_file): """ Function to read an ntv2 gsb file and create grid & subgrid objects :param ntv2_gsb_file: full path to ntv2 file :return: Ntv2 grid object """ with open(ntv2_gsb_file, "rb") as f: # NUM_OREC f.seek(8, 1) byte = f.read(4) num_orec = int.from_bytes(byte, byteorder="little") f.seek(4, 1) # NUM_SREC f.seek(8, 1) byte = f.read(4) num_srec = int.from_bytes(byte, byteorder="little") f.seek(4, 1) # NUM_FILE f.seek(8, 1) byte = f.read(4) num_file = int.from_bytes(byte, byteorder="little") f.seek(4, 1) # GS_TYPE f.seek(8, 1) byte = f.read(8) gs_type = byte.decode("utf8").strip("\x00").strip() # VERSION f.seek(8, 1) byte = f.read(8) version = byte.decode("utf8").strip("\x00").strip() # SYSTEM_F f.seek(8, 1) byte = f.read(8) system_f = byte.decode("utf8").strip("\x00").strip() # SYSTEM_T f.seek(8, 1) byte = f.read(8) system_t = byte.decode("utf8").strip("\x00").strip() # MAJOR_F f.seek(8, 1) byte = f.read(8) major_f = struct.unpack("d", byte)[0] # MINOR_F f.seek(8, 1) byte = f.read(8) minor_f = struct.unpack("d", byte)[0] # MAJOR_T f.seek(8, 1) byte = f.read(8) major_t = struct.unpack("d", byte)[0] # MINOR_T f.seek(8, 1) byte = f.read(8) minor_t = struct.unpack("d", byte)[0] grid = NTv2Grid( num_orec=num_orec, num_srec=num_srec, num_file=num_file, gs_type=gs_type, version=version, system_f=system_f, system_t=system_t, major_f=major_f, minor_f=minor_f, major_t=major_t, minor_t=minor_t, file_path=ntv2_gsb_file, ) # read subgrids for i in range(0, grid.num_file): # SUB_NAME f.seek(8, 1) byte = f.read(8) sub_name = byte.decode("utf").strip("\x00").strip() # PARENT f.seek(8, 1) byte = f.read(8) parent = byte.decode("utf").strip("\x00").strip() # CREATED f.seek(8, 1) byte = f.read(8) created = dt.strptime( byte.decode("utf").strip("\x00").strip(), "%d%m%Y" ).strftime("%d/%m/%Y") # UPDATED f.seek(8, 1) byte = f.read(8) updated = dt.strptime( byte.decode("utf").strip("\x00").strip(), "%d%m%Y" ).strftime("%d/%m/%Y") # S_LAT f.seek(8, 1) byte = f.read(8) s_lat = round(struct.unpack("d", byte)[0], 3) # N_LAT f.seek(8, 1) byte = f.read(8) n_lat = round(struct.unpack("d", byte)[0], 3) # E_LONG f.seek(8, 1) byte = f.read(8) e_long = round(struct.unpack("d", byte)[0], 3) # W_LONG f.seek(8, 1) byte = f.read(8) w_long = round(struct.unpack("d", byte)[0], 3) # LAT_INC f.seek(8, 1) byte = f.read(8) lat_inc = round(struct.unpack("d", byte)[0], 6) # LONG_INC f.seek(8, 1) byte = f.read(8) long_inc = round(struct.unpack("d", byte)[0], 6) # GS_COUNT f.seek(8, 1) byte = f.read(4) gs_count = int.from_bytes(byte, byteorder="little") f.seek(4, 1) # skip ahead to next subgrid f.seek(16 * gs_count, 1) grid.subgrids[sub_name] = SubGrid( sub_name=sub_name, parent=parent, created=created, updated=updated, s_lat=s_lat, n_lat=n_lat, e_long=e_long, w_long=w_long, lat_inc=lat_inc, long_inc=long_inc, gs_count=gs_count, ) return grid
[docs] def interpolate_ntv2(grid_object, lat, lon, method="bicubic"): """ Function to interpolate Ntv2Grid objects :param grid_object: Ntv2Grid object :param lat: latitude (decimal degrees) :param lon: longitude (decimal degrees) :param method: interpolation strategy, bicubic or bilinear :return: Tuple of four ntv2 fields """ interpolation_methods = {"bicubic", "bilinear"} if method not in interpolation_methods: raise ValueError(f'interpolation method "{method}" not supported') # convert decimal degrees to arc-seconds lat *= 3600 lon *= -3600 # determine subgrid for point of interest in_subgrids = set() for sg in grid_object.subgrids.values(): if sg.s_lat <= lat < sg.n_lat and sg.e_long <= lon < sg.w_long: in_subgrids.add(sg.sub_name) # return null fields if no subgrid found, else choose subgrid with finest resolution if len(in_subgrids) == 0: return None, None, None, None else: inc = None in_grid = None for sg in in_subgrids: if not inc: inc = grid_object.subgrids[sg].lat_inc in_grid = grid_object.subgrids[sg] else: if grid_object.subgrids[sg].lat_inc < inc: inc = grid_object.subgrids[sg].lat_inc in_grid = grid_object.subgrids[sg] # Determine number of columns in grid, and row and column of node to bottom right of # point of interest, then call relevant interpolation method function # determine number of columns num_cols = 1 + int((in_grid.w_long - in_grid.e_long) / in_grid.long_inc) # determine row and col numbers of node below right of point row = int((lat - in_grid.s_lat) / in_grid.lat_inc) col = int((lon - in_grid.e_long) / in_grid.long_inc) # locate data in gsb_file skip_bytes = 176 # grid header length with open(grid_object.file_path, "rb") as f: for sg in grid_object.subgrids.values(): skip_bytes += 176 # subgrid header length if sg.sub_name == in_grid.sub_name: if method == "bilinear": results = in_grid.ntv2_bilinear( lat, lon, num_cols, row, col, f, skip_bytes ) elif method == "bicubic": results = in_grid.ntv2_bicubic( lat, lon, num_cols, row, col, f, skip_bytes ) else: skip_bytes += sg.gs_count * 16 return results