"""
Summary
-------
Functions for simulated-deformation interferogram generation for training datasets.
References
----------
Functions taken from https://github.com/matthew-gaddes/SyInterferoPy.
-----
Created by Andrew Player.
"""
import random
from time import perf_counter
import numpy as np
[docs]class Okada:
"""
Class that models surface deformation.
Based off of:
https://github.com/matthew-gaddes/SyInterferoPy
and
Okada, Surface deformation due to shear and tensile faults in a half-space,
Bulletin of the Seismological Society of America (1985): 1135-1154
"""
def __init__(self, source, source_xy_m, tile_size, **kwargs):
np.seterr(divide="ignore")
self.source_type = source
self.source_x = source_xy_m[0]
self.source_y = source_xy_m[1]
self.tile_size = tile_size
self.params = kwargs
self.gen_coordiantes()
self.los_vector = np.array([[0.38213591], [-0.08150437], [0.92050485]])
# Lames Constants and Poisson Ratio
self.lames_mu = 2.3e10 # μ
self.lames_lambda = 2.3e10 # λ
self.nu = self.lames_lambda / (
2 * (self.lames_lambda + self.lames_mu)
) # Poisson Ratio: ν
self.length = kwargs["length"]
self.strike = np.deg2rad(kwargs["strike"])
# δ
self.dip = np.deg2rad(kwargs["dip"])
# Parameters Depending on Type of Event
if source == "quake":
self.opening = 0
self.slip = kwargs["slip"]
self.rake = np.deg2rad(kwargs["rake"])
self.width = kwargs["bottom_depth"] - kwargs["top_depth"]
self.depth = np.mean((kwargs["bottom_depth"], kwargs["top_depth"]))
elif source == "dyke":
self.opening = kwargs["opening"]
self.slip = 0
self.rake = np.deg2rad(0)
self.width = kwargs["bottom_depth"] - kwargs["top_depth"]
self.depth = np.mean((kwargs["bottom_depth"], kwargs["top_depth"]))
elif source == "sill":
self.opening = kwargs["opening"]
self.slip = 0
self.rake = np.deg2rad(0)
self.depth = kwargs["depth"]
self.width = kwargs["width"]
else:
raise Exception(
f"'Source' must be either 'quake', 'dyke', or 'sill', but is set to {source}. Exiting."
)
# Components in North and East Directions
self.east = (
self.grid_x
- self.source_x
+ np.cos(self.strike) * np.cos(self.dip) * (self.width / 2)
)
self.north = (
self.grid_y
- self.source_y
- np.sin(self.strike) * np.cos(self.dip) * (self.width / 2)
)
# ξ is okada_x
self.okada_x = (
np.cos(self.strike) * self.north
+ np.sin(self.strike) * self.east
+ (self.length / 2)
)
self.okada_y = (
np.sin(self.strike) * self.north
- np.cos(self.strike) * self.east
+ (np.cos(self.dip) * self.width)
)
self.d = self.depth + np.sin(self.dip) * (self.width / 2)
self.q = (self.okada_y * np.sin(self.dip)) - (self.d * np.cos(self.dip))
# η
self.eta = (self.okada_y * np.cos(self.dip)) + (self.d * np.sin(self.dip))
self.compute_I_components()
self.U1 = np.cos(self.rake) * self.slip
self.U2 = np.sin(self.rake) * self.slip
self.U3 = self.opening
self.compute_displacement()
[docs] def gen_coordiantes(self):
x_axis, y_axis = np.meshgrid( # Coordinate Axes
np.arange(0, self.tile_size)
* 90, # ((45990 * (self.tile_size / 512)) / (self.tile_size - 1)), # 90m/pixel in x direction
np.arange(0, self.tile_size)
* 90, # ((45990 * (self.tile_size / 512)) / (self.tile_size - 1)) # 90m/pixel in y direction
)
y_axis = np.flipud(y_axis)
ij_bases = np.vstack( # Coordinate System Basis i and j
(np.ravel(x_axis)[np.newaxis], np.ravel(y_axis)[np.newaxis])
)
ijk_bases = np.vstack( # Coordinate System Basis i, j, and k
(ij_bases, np.zeros((1, ij_bases.shape[1])))
)
self.x_axis_shape = x_axis.shape
self.y_axis_shape = y_axis.shape
self.grid_x = ijk_bases[0,]
self.grid_y = ijk_bases[1, :]
[docs] def chinnery(self, f):
"""
Method of combining the different components of displacement.
"""
return (
f(0, 0) - f(self.width, 0) - f(0, self.length) + f(self.width, self.length)
)
[docs] def compute_displacement(self):
"""
Compute the displacements in all directions from all of the sources and combine them.
"""
if self.source_type == "quake":
U_1 = (self.U1 / (2 * np.pi)) * self.chinnery(self.strike_slip_displacement)
U_2 = (self.U2 / (2 * np.pi)) * self.chinnery(self.dip_slip_displacement)
else:
U_1 = 0
U_2 = 0
if self.source_type == "sill" or self.source_type == "dyke":
U_3 = (self.U3 / (2 * np.pi)) * self.chinnery(self.tensile_displacement)
else:
U_3 = 0
okada_array = -U_1 - U_2 + U_3
displacement_array = np.zeros((3, self.tile_size * self.tile_size))
displacement_array[0] = (
np.sin(self.strike) * okada_array[0] - np.cos(self.strike) * okada_array[1]
)
displacement_array[1] = (
np.cos(self.strike) * okada_array[0] + np.sin(self.strike) * okada_array[1]
)
displacement_array[2] = okada_array[2]
self.displacement = displacement_array
shapes = (self.x_axis_shape[0], self.x_axis_shape[1])
x_grid = (
np.reshape(
self.displacement[0,],
shapes,
)
* self.los_vector[0, 0]
)
y_grid = (
np.reshape(
self.displacement[1,],
shapes,
)
* self.los_vector[1, 0]
)
z_grid = (
np.reshape(
self.displacement[2,],
shapes,
)
* self.los_vector[2, 0]
)
self.los_displacement = x_grid + y_grid + z_grid
[docs] def update_params_WL(self, W, L):
xi = self.okada_x - L
eta = self.eta - W
self.R = np.sqrt(np.square(xi) + np.square(eta) + np.square(self.q))
self.X = np.sqrt(np.square(xi) + np.square(self.q))
self.y_tilda = (eta * np.cos(self.dip)) + (self.q * np.sin(self.dip))
self.d_tilda = (eta * np.sin(self.dip)) - (self.q * np.cos(self.dip))
return xi, eta
[docs] def compute_I_1(self, W, L):
xi = self.okada_x - L
if W != 0 and L != 0:
I_5 = self.I_5_WL
elif W != 0:
I_5 = self.I_5_W
elif L != 0:
I_5 = self.I_5_L
else:
I_5 = self.I_5
if np.cos(self.dip) > 10e-8:
return (1 - 2 * self.nu) * (
-xi / (np.cos(self.dip) * (self.R + self.d_tilda))
) - np.tan(self.dip) * I_5
else:
return -((1 - 2 * self.nu) / 2) * (
(xi * self.q) / np.square((self.R + self.d_tilda))
)
[docs] def compute_I_2(self, W, L):
eta = self.eta - W
if W != 0 and L != 0:
I_3 = self.I_3_WL
elif W != 0:
I_3 = self.I_3_W
elif L != 0:
I_3 = self.I_3_L
else:
I_3 = self.I_3
return (1 - 2 * self.nu) * -np.log(self.R + eta) - I_3
[docs] def compute_I_3(self, W, L):
eta = self.eta - W
if W != 0 and L != 0:
I_4 = self.I_4_WL
elif W != 0:
I_4 = self.I_4_W
elif L != 0:
I_4 = self.I_4_L
else:
I_4 = self.I_4
if np.cos(self.dip) > 10e-8:
return (1 - 2 * self.nu) * (
(1 / (np.cos(self.dip))) * (self.y_tilda / (self.R + self.d_tilda))
- np.log(self.R + eta)
) + np.tan(self.dip) * I_4
else:
return ((1 - 2 * self.nu) / 2) * (
(eta / (self.R + self.d_tilda))
+ ((self.y_tilda * self.q) / np.square(self.R + self.d_tilda))
- np.log(self.R + eta)
)
[docs] def compute_I_4(self, W, L):
eta = self.eta - W
if np.cos(self.dip) > 10e-8:
return (
(1 - 2 * self.nu)
* (1 / np.cos(self.dip))
* (
np.log(self.R + self.d_tilda)
- np.sin(self.dip) * np.log(self.R + eta)
)
)
else:
return -(1 - 2 * self.nu) * (self.q / (self.R + self.d_tilda))
[docs] def compute_I_5(self, W, L):
xi = self.okada_x - L
eta = self.eta - W
if np.cos(self.dip) > 10e-8:
a = eta * (self.X + (self.q * np.cos(self.dip))) + (
self.X * (self.R + self.X) * np.sin(self.dip)
)
b = xi * (self.R + self.X) * np.cos(self.dip)
return (1 - 2 * self.nu) * 2 * np.arctan(a / b) / np.cos(self.dip)
else:
return -(1 - 2 * self.nu) * (
(xi * np.sin(self.dip)) / (self.R + self.d_tilda)
)
[docs] def compute_I_components(self):
"""
Precompute all necessary I components to avoid doing it more than once.
"""
self.update_params_WL(0, 0)
self.I_5 = self.compute_I_5(0, 0)
self.I_4 = self.compute_I_4(0, 0)
self.I_3 = self.compute_I_3(0, 0)
self.I_2 = self.compute_I_2(0, 0)
self.I_1 = self.compute_I_1(0, 0)
self.update_params_WL(self.width, 0)
self.I_5_W = self.compute_I_5(self.width, 0)
self.I_4_W = self.compute_I_4(self.width, 0)
self.I_3_W = self.compute_I_3(self.width, 0)
self.I_2_W = self.compute_I_2(self.width, 0)
self.I_1_W = self.compute_I_1(self.width, 0)
self.update_params_WL(0, self.length)
self.I_5_L = self.compute_I_5(0, self.length)
self.I_4_L = self.compute_I_4(0, self.length)
self.I_3_L = self.compute_I_3(0, self.length)
self.I_2_L = self.compute_I_2(0, self.length)
self.I_1_L = self.compute_I_1(0, self.length)
self.update_params_WL(self.width, self.length)
self.I_5_WL = self.compute_I_5(self.width, self.length)
self.I_4_WL = self.compute_I_4(self.width, self.length)
self.I_3_WL = self.compute_I_3(self.width, self.length)
self.I_2_WL = self.compute_I_2(self.width, self.length)
self.I_1_WL = self.compute_I_1(self.width, self.length)
[docs] def get_Is(self, W, L):
"""
Get the I components corresponding to W and L
"""
if W != 0 and L != 0:
I_1 = self.I_1_WL
I_2 = self.I_2_WL
I_3 = self.I_3_WL
I_4 = self.I_4_WL
I_5 = self.I_5_WL
elif W != 0:
I_1 = self.I_1_W
I_2 = self.I_2_W
I_3 = self.I_3_W
I_4 = self.I_4_W
I_5 = self.I_5_W
elif L != 0:
I_1 = self.I_1_L
I_2 = self.I_2_L
I_3 = self.I_3_L
I_4 = self.I_4_L
I_5 = self.I_5_L
else:
I_1 = self.I_1
I_2 = self.I_2
I_3 = self.I_3
I_4 = self.I_4
I_5 = self.I_5
return I_1, I_2, I_3, I_4, I_5
[docs] def strike_slip_displacement(self, W, L):
xi, eta = self.update_params_WL(W, L)
I_1, I_2, _, I_4, _ = self.get_Is(W, L)
q_re = self.q / (self.R + eta)
q_rre = (1 / self.R) * q_re
arctan = np.arctan((xi * eta) / (self.q * self.R))
x_direction = (xi * q_rre) + arctan + (I_1 * np.sin(self.dip))
y_direction = (
(self.y_tilda * q_rre)
+ (np.cos(self.dip) * q_re)
+ (I_2 * np.sin(self.dip))
)
z_direction = (
(self.d_tilda * q_rre)
+ (np.sin(self.dip) * q_re)
+ (I_4 * np.sin(self.dip))
)
displacements = np.asarray([x_direction, y_direction, z_direction])
return displacements
[docs] def dip_slip_displacement(self, W, L):
xi, eta = self.update_params_WL(W, L)
I_1, _, I_3, _, I_5 = self.get_Is(W, L)
sc_dip = np.sin(self.dip) * np.cos(self.dip)
q_rrx = self.q / (self.R * (self.R + xi))
arctan = np.arctan((xi * eta) / (self.q * self.R))
x_direction = (self.q / self.R) - I_3 * sc_dip
y_direction = (
(self.y_tilda * q_rrx) + (np.cos(self.dip) * arctan) - (I_1 * sc_dip)
)
z_direction = (
(self.d_tilda * q_rrx) + (np.sin(self.dip) * arctan) - (I_5 * sc_dip)
)
displacements = np.asarray([x_direction, y_direction, z_direction])
return displacements
[docs] def tensile_displacement(self, W, L):
xi, eta = self.update_params_WL(W, L)
I_1, _, I_3, _, I_5 = self.get_Is(W, L)
s_d = np.square(np.sin(self.dip))
q_rre = self.q / (self.R * (self.R + eta))
q_rrx = self.q / (self.R * (self.R + xi))
arctan = np.arctan((xi * eta) / (self.q * self.R))
q_rre_arctan = (xi * q_rre) - arctan
x_direction = (self.q * q_rre) - (I_3 * s_d)
y_direction = (
-(self.d_tilda * q_rrx) - (np.sin(self.dip) * q_rre_arctan) - (I_1 * s_d)
)
z_direction = (
(self.y_tilda * q_rrx) + (np.cos(self.dip) * q_rre_arctan) - (I_5 * s_d)
)
displacements = np.asarray([x_direction, y_direction, z_direction])
return displacements
[docs]def atmosphere_turb(n_atms, lons_mg, lats_mg, mean_m=0.02, difference=True):
"""
A function to create synthetic turbulent atmospheres based on the methods in Lohman Simons 2005, or using Andy Hooper and Lin Shen's fft method.
Note that due to memory issues, when using the covariance (Lohman) method, largers ones are made by interpolateing smaller ones.
Can return atmsopheres for an individual acquisition, or as the difference of two (as per an interferogram). Units are in metres.
Parameters
----------
n_atms : int
number of atmospheres to generate
lons_mg : rank 2 array
longitudes of the bottom left corner of each pixel.
lats_mg : rank 2 array
latitudes of the bottom left corner of each pixel.
method : string
'fft' or 'cov'. Cov for the Lohmans Simons (sp?) method, fft for Andy
Hooper/Lin Shen's fft method (which is much faster). Currently no way to set
length scale using fft method.
mean_m : float
average max or min value of atmospheres that are created. e.g. if 3 atmospheres
have max values of 0.02m, 0.03m, and 0.04m, their mean would be 0.03cm.
water_mask : rank 2 array
If supplied, this is applied to the atmospheres generated, convering them to masked arrays.
difference : boolean
If difference, two atmospheres are generated and subtracted from each other to
make a single atmosphere.
verbose : boolean
Controls info printed to screen when running.
cov_Lc : float
length scale of correlation, in metres. If smaller, noise is patchier, and if
larger, smoother.
cov_interpolate_threshold : int
if n_pixs is greater than this, images will be generated at size so that the
total number of pixels doesn't exceed this. e.g. if set to 1e4 (10000, the
default) and images are 120*120, they will be generated at 100*100 then
upsampled to 120*120.
Returns
-------
ph_turbs : r3 array
n_atms x n_pixs x n_pixs, UNITS ARE M. Note that if a water_mask is provided, this is applied and a masked array is returned.
"""
def lon_lat_to_ijk(lons_mg, lats_mg):
"""
Given a meshgrid of the lons and lats of the lower left corner of each pixel,
find their distances (in metres) from the lower left corner.
Parameters
-------
lons_mg : rank 2 array
longitudes of the lower left of each pixel.
lats_mg : rank 2 array
latitudes of the lower left of each pixel.
Returns
-------
ijk : rank 2 array
3x lots. The distance of each pixel from the lower left corner of the image in metres.
pixel_spacing : dict
size of each pixel (ie also the spacing between them) in 'x' and 'y' direction.
"""
from geopy import distance
ny, nx = lons_mg.shape
pixel_spacing = {}
pixel_spacing["x"] = distance.distance(
(lats_mg[-1, 0], lons_mg[-1, 0]), (lats_mg[-1, 0], lons_mg[-1, 1])
).meters
pixel_spacing["y"] = distance.distance(
(lats_mg[-1, 0], lons_mg[-1, 0]), (lats_mg[-2, 0], lons_mg[-1, 0])
).meters
X, Y = np.meshgrid(
pixel_spacing["x"] * np.arange(0, nx), pixel_spacing["y"] * np.arange(0, ny)
)
Y = np.flipud(Y)
ij = np.vstack((np.ravel(X)[np.newaxis], np.ravel(Y)[np.newaxis]))
ijk = np.vstack((ij, np.zeros((1, ij.shape[1]))))
return ijk, pixel_spacing
def generate_correlated_noise_fft(nx, ny, std_long, sp):
"""A function to create synthetic turbulent troposphere delay using an FFT approach.
The power of the turbulence is tuned by the weather model at the longer wavelengths.
Parameters
-------
nx : int
width of troposphere
Ny : int
length of troposphere
std_long : float
standard deviation of the weather model at the longer wavelengths. Default = ?
sp : int
pixel spacing in km
Returns
-------
APS : float
2D array, Ny * nx, units are m.
"""
np.seterr(divide="ignore")
cut_off_freq = 1 / 50
x, y = np.arange(0, int(nx / 2)), np.arange(0, int(ny / 2))
freq_x = np.divide(x, nx * sp)
freq_y = np.divide(y, ny * sp)
Y, X = np.meshgrid(freq_x, freq_y)
freq = np.sqrt((X * X + Y * Y) / 2)
ix = freq < 2 / 3
log_power = np.log10(freq) * -11 / 3
log_power[ix] = np.log10(freq[ix]) * -8 / 3 - np.log10(2 / 3)
ix = freq < cut_off_freq
bin_power = np.power(10, log_power)
bin_power[ix] = 0
APS_power = np.zeros((ny, nx))
APS_power[0 : int(ny / 2), 0 : int(nx / 2)] = bin_power
APS_power[0 : int(ny / 2), int(np.ceil(nx / 2)) :] = np.fliplr(bin_power)
APS_power[int(np.ceil(ny / 2)) :, 0 : int(nx / 2)] = np.flipud(bin_power)
APS_power[int(np.ceil(ny / 2)) :, int(np.ceil(nx / 2)) :] = np.fliplr(
np.flipud(bin_power)
)
APS_filt = np.sqrt(APS_power)
x = np.random.randn(ny, nx)
y_tmp = np.fft.fft2(x)
y_tmp2 = np.multiply(y_tmp, APS_filt)
y = np.fft.ifft2(y_tmp2)
APS = np.real(y)
APS = APS / np.std(APS) * std_long
APS = APS * 0.01
return APS
def rescale_atmosphere(atm, atm_mean=0.02, atm_sigma=0.005):
"""
a function to rescale a 2d atmosphere with any scale to a mean centered
one with a min and max value drawn from a normal distribution.
Parameters
----------
atm : rank 2 array
a single atmosphere.
atm_mean : float
average max or min value of atmospheres that are created, in metres. e.g. if 3 atmospheres have max values of 0.02m, 0.03m, and 0.04m, their mean would be 0.03m
atm_sigma : float
standard deviation of Gaussian distribution used to generate atmosphere strengths.
Returns
-------
atm : rank 2 array
a single atmosphere, rescaled to have a maximum signal of around that set by mean_m
"""
atm -= np.mean(atm)
atm_strength = (atm_sigma * np.random.randn(1)) + atm_mean
if np.abs(np.min(atm)) > np.abs(np.max(atm)):
atm *= atm_strength / np.abs(np.min(atm))
else:
atm *= atm_strength / np.max(atm)
return atm
ny, nx = lons_mg.shape
ph_turbs = np.zeros((n_atms, ny, nx))
_, pixel_spacing = lon_lat_to_ijk(lons_mg, lats_mg)
for i in range(n_atms):
ph_turbs[i, :, :] = generate_correlated_noise_fft(
nx,
ny,
std_long=1,
sp=0.0009 * np.mean((pixel_spacing["x"], pixel_spacing["y"])),
)
ph_turbs_m = np.zeros(ph_turbs.shape)
for atm_n, atm in enumerate(ph_turbs):
ph_turbs_m[atm_n,] = rescale_atmosphere(atm, mean_m)
ph_turbs_m = ph_turbs_m[:, : lons_mg.shape[0], : lons_mg.shape[1]]
return ph_turbs_m
[docs]def gen_fake_topo(size: int = 512, alt_scale_min: int = 0, alt_scale_max: int = 500):
"""
Generate fake topography (a dem in meters) for generating simulated atmospheric topographic error.
Parameters
-----------
size : int
The size n for the (n, n) dimension array that is returned.
alt_scale_min : int
The minimum altitude scaling value for the generated perlin noise.
alt_scale_max : int
The maximum altitude scaling value for the generated perlin noise.
Returns
--------
dem : np.ndarray
The array that is meant to be used as a simulated dem with values in meters.
"""
from insar_eventnet.synthetic_interferogram import _generate_perlin
dem = np.zeros((size, size))
dem = _generate_perlin(dem.shape[0]) * alt_scale_max
neg_indices = dem < np.max(dem) / 1.75
dem[neg_indices] = 0
non_zero_indices = dem > 0
dem[non_zero_indices] = dem[non_zero_indices] - np.min(dem[non_zero_indices])
return dem
[docs]def atm_topo_simulate(
dem_m: np.ndarray,
strength_mean: float = 56.0,
strength_var: float = 2.0,
):
"""`
Generate simulated topographic atmospheric error.
Parameters
-----------
dem_m : np.ndarray
An array containing either real dem values (in meters) or simulated ones.
strength_mean : float
The mean strength/magnitude of the error.
strength_var : float
The maximum variation from strength_mean of the magnitude of the error.
difference : bool
Whether the error should come from the difference of 2 aquisitions or just 1.
Returns
--------
ph_turb : np.ndarray
The array containing the turbulent atmospheric error.
"""
import numpy as np
import numpy.ma as ma
# Sentinel-1 Wavelength in meters
s1_lambda = 0.0547
dem_km = 0.01 * dem_m
ph_topo_aq1 = (
strength_mean + strength_var * np.random.randn(1)
) * dem_km # this is the delay for one acquisition
ph_topo_aq2 = (
strength_mean + strength_var * np.random.randn(1)
) * dem_km # and for another
ph_topo_m = (
(ph_topo_aq1 - ph_topo_aq2) / (4 * np.pi)
) * s1_lambda # interferogram is the difference, converted from rad to meters
if np.max(ph_topo_m) < 0:
ph_topo_m -= np.max(ph_topo_m)
else:
ph_topo_m -= np.min(ph_topo_m)
ph_topo_m = ma.array(ph_topo_m, mask=ma.getmask(dem_m))
ph_topo_m -= ma.mean(ph_topo_m)
return ph_topo_m
[docs]def aps_simulate(size: int = 512):
"""
Generate simulated turbulent atmospheric error.
Parameters
-----------
size : int
The size n for the (n, n) dimension array that is returned.
Returns
--------
ph_turb : np.ndarray
The array containing the turbulent atmospheric error.
"""
pixel_size_degs = 1 / 3600
scaled_size = pixel_size_degs * size
lons = np.arange(0.0, 0.0 + scaled_size, pixel_size_degs)
lats = np.arange(0.0, 0.0 + scaled_size, pixel_size_degs)
lons_mg = np.repeat(lons[np.newaxis, :], len(lats), axis=0)
lats_mg = np.repeat(lats[::-1, np.newaxis], len(lons), axis=1)
ph_turb = atmosphere_turb(1, lons_mg, lats_mg, mean_m=0.02)
return ph_turb[0,]
[docs]def coherence_mask_simulate(size: int = 512, threshold: float = 0.3):
"""
Generate simulated incoherence to be masked out.
Parameters
-----------
size : int
The size n for the (n, n) dimension array that is returned.
threshold : float
The maximum value of coherence to be masked to zeros.
Returns
--------
mask_coh : np.ndarray
The masked coherence array.
"""
pixel_size_degs = 1 / 3600
lons = np.arange(0.0, 0.0 + (pixel_size_degs * size), pixel_size_degs)
lats = np.arange(0.0, 0.0 + (pixel_size_degs * size), pixel_size_degs)
lons_mg = np.repeat(lons[np.newaxis, :], len(lats), axis=0)
lats_mg = np.repeat(lats[::-1, np.newaxis], len(lons), axis=1)
mask_coh_values = atmosphere_turb(1, lons_mg, lats_mg, mean_m=0.01)
mask_coh_values = (mask_coh_values - np.min(mask_coh_values)) / np.max(
mask_coh_values - np.min(mask_coh_values)
)
mask_coh = np.where(
mask_coh_values > threshold, np.ones(lons_mg.shape), np.zeros(lons_mg.shape)
)
return mask_coh
[docs]def gen_gaussian_noise(
seed: int = 0,
tile_size: int = 512,
noise_level: float = 90 * np.pi,
threshold: float = 90 * np.pi / 4,
):
if seed != 0:
np.random.seed(seed)
noise_grids = np.random.uniform(
-noise_level, noise_level, size=(2, tile_size, tile_size)
)
inconsistancy = np.abs(noise_grids[1]) >= threshold
noise_grids[1][inconsistancy] = 0
index = noise_grids[1] == 0
noise_grids[0][index] = 0
return noise_grids[0]
[docs]def gen_sim_noise(
seed: int = 0,
tile_size: int = 512,
gaussian_only: bool = False,
atmosphere_scalar: float = 90 * np.pi,
):
"""
Generate a wrapped interferogram along with an event-mask simulating a noisy interferogram with no deformation.
Parameters
-----------
seed : int, Optional
A seed for the random functions. For the same seed, with all other values the same
as well, the interferogram generation will have the same results. If left at 0,
the results will be different every time.
tile_size : int, Optional
The desired dimensional size of the interferogram pairs. This should match
the input shape of the model.
log : bool, Optional
If true, the function will log various relevant values in the console.
atmosphere_scalar : float, Optional
Scale factor for the intensity of atmospheric noise.
Returns
--------
masked_grid : np.ndarray(shape=(tile_size, tile_size))
An array representing a mask over the simulated deformation which simulates masking an event.
wrapped_grid : np.ndarray(shape=(tile_size, tile_size)
The wrapped interferogram.
presence : [1] or [0]
[1] if the image contains an event else [0]
"""
if seed != 0:
np.random.seed(seed)
presence = np.asarray([0])
if gaussian_only:
threshold = 90 * np.pi
noise_grid_full = gen_gaussian_noise(seed, tile_size, threshold=threshold)
noise_grid_inconsistent = gen_gaussian_noise(
seed, tile_size, threshold=(threshold * np.random.random() / 2)
)
threshold = np.random.random() / 2
coherence_mask = coherence_mask_simulate(tile_size, threshold=threshold)
coh_indices = coherence_mask[0, 0:tile_size, 0:tile_size] == 0
noise_grid = noise_grid_full
noise_grid[coh_indices] = noise_grid_inconsistent[coh_indices]
wrapped_grid = np.angle(np.exp(1j * (noise_grid)))
masked_grid = np.zeros((tile_size, tile_size))
phase = masked_grid
else:
simulated_topography = gen_fake_topo(
size=tile_size, alt_scale_min=50, alt_scale_max=100
)
turb_phase = aps_simulate(tile_size) * atmosphere_scalar
topo_phase = (
atm_topo_simulate(simulated_topography) * atmosphere_scalar
) # aps_simulate(tile_size) * atmosphere_scalar +
noise_grid_full = gen_gaussian_noise(seed, tile_size, threshold=90 * np.pi)
coherence_mask = coherence_mask_simulate(tile_size, threshold=0.3)
coh_indices = coherence_mask[0, 0:tile_size, 0:tile_size] == 0
phase = turb_phase + topo_phase
phase[coh_indices] = noise_grid_full[coh_indices]
wrapped_grid = np.angle(np.exp(1j * (phase)))
masked_grid = np.zeros((tile_size, tile_size))
# zeros = wrapped_grid == 0
# wrapped_grid += np.pi
# wrapped_grid /= (2 * np.pi)
# wrapped_grid[zeros] = 0
return phase, masked_grid, wrapped_grid, presence
[docs]def gen_simulated_time_series(
n_interferograms: int = 32,
tile_size: int = 512,
seed: int = 0,
atmosphere_scalar: float = 90 * np.pi,
noise_only: bool = False,
):
"""
Generate a time-series of interferograms with simulated deformation. Correlated by a common dem.
Parameters
-----------
seed : int, Optional
A seed for the random functions. For the same seed, with all other values the same
as well, the interferogram generation will have the same results. If left at 0,
the results will be different every time.
tile_size : int, Optional
The desired dimensional size of the interferogram pairs. This should match
the input shape of the model.
log : bool, Optional
If true, the function will log various relevant values in the console.
atmosphere_scalar : float, Optional
Scale factor for the intensity of atmospheric noise.
Returns
--------
masked_grid : np.ndarray(shape=(tile_size, tile_size))
An array representing a mask over the simulated deformation which simulates masking an event.
wrapped_grid : np.ndarray(shape=(tile_size, tile_size)
The wrapped interferogram.
presence : [1] or [0]
[1] if the image contains an event else [0]
"""
if seed != 0:
np.random.seed(seed)
simulated_topography = gen_fake_topo(size=tile_size, alt_scale_max=100)
event_type = "quake"
axes_max = (tile_size) * 90
random_nums = np.random.rand(13)
scalar = 100 * np.pi
mask = np.zeros((tile_size, tile_size))
if not noise_only:
axes_max = (tile_size) * 90
source_x = axes_max // ((random_nums[0] * 10) + 1)
source_y = axes_max // ((random_nums[1] * 10) + 1)
length = 1000 + 3000 * random_nums[2]
top_depth = 6000 + 5000 * random_nums[3]
depth = 2000 + 2000 * random_nums[3]
kwargs = {
"strike": 180 * random_nums[6],
"dip": [45, 90][int(random_nums[7] < 0.5)],
"length": length,
"rake": [-90, -90][int(random_nums[7] < 0.5)],
"slip": 3,
"top_depth": top_depth,
"bottom_depth": top_depth + (top_depth * 2 + 10000 * random_nums[8]),
"width": depth / 4,
"depth": depth,
"opening": 5,
}
if event_type == "dyke":
kwargs["dip"] = 90
Event = Okada(event_type, (source_x, source_y), tile_size=tile_size, **kwargs)
los_displacement = Event.los_displacement
phase = scalar * los_displacement
mask[np.abs(phase) > np.pi] = 1
else:
los_displacement = np.zeros((tile_size, tile_size))
phases = np.zeros((n_interferograms, 2, tile_size, tile_size))
inflection = (
random.random() * 0.6 + 0.2
) # set the inflection point to a point between 0.2 and 0.8
inflection_rate = (
random.random() * 2 + 2
) # set the inflection rate to be between 2 and 4
inflection_point = n_interferograms * inflection
step_size = (
inflection_point + (n_interferograms - inflection_point) * inflection_rate
)
displacement = los_displacement / step_size
for i in range(n_interferograms):
topo_phase = np.abs(
atm_topo_simulate(simulated_topography) * atmosphere_scalar * 0.15 * np.pi
)
turb_phase = aps_simulate(tile_size) * atmosphere_scalar * 0.3
displacement_step = displacement * i
if i > inflection_point:
displacement_step = displacement_step + displacement * inflection_rate * (
i - inflection_point
)
phase_step = scalar * displacement_step * i + topo_phase + turb_phase
wrapped_phase_step = np.angle(np.exp(1j * (phase_step)))
phase_step = (phase_step + np.abs(np.min(phase_step))) / np.max(
(phase_step + np.abs(np.min(phase_step)))
) # Normalize
phases[i] = [phase_step, wrapped_phase_step]
return phases, mask