"""
Summary
-------
Functions for performing inference, such as masking events, and binary classification
of events
Notes
-----
Created by Andrew Player.
"""
import os
import matplotlib.pyplot as plt
import numpy as np
from osgeo import gdal
from PIL import Image
from tensorflow.keras import models
from insar_eventnet import io
[docs]def mask(
mask_model_path: str,
pres_model_path: str,
image_path: str,
output_image_path: str = None,
tile_size: int = 0,
crop_size: int = 0,
) -> np.ndarray:
"""
Generate a mask over potential events in a wrapped insar product.
Parameters:
-----------
mask_model_path : str
The path to the model to use for generating the event-mask.
pres_model_path : str
The path to the model that predicts the presence of an event in a mask.
image_path : str
The path to the InSAR product from ASF that should be masked.
output_image_path : str
The output path for the inferred mask image
tile_size : int
The width and height of the tiles that the image will be broken into, this needs
to match the input shape of the model.
crop_size : int, Optional
If the models output shape is different than the input shape, this value needs
to be equal to the output shape of the masking model and input shape of the
presence model.
Returns:
--------
mask_pred : np.ndarray(shape=(tile_size, tile_size) or (crop_size, crop_size))
The array containing the event-mask array as predicted by the model.
presence_guess : bool
True if there is an event else False.
"""
mask_model = models.load_model(mask_model_path)
pres_model = models.load_model(pres_model_path)
image, gdal_dataset = io.get_image_array(image_path)
mask_pred, pres_mask, pres_vals = mask_with_model(
mask_model=mask_model,
pres_model=pres_model,
arr_w=image,
tile_size=tile_size,
crop_size=crop_size,
)
presence_guess = np.mean(pres_mask) > 0.0
if output_image_path is not None:
img = Image.fromarray(mask_pred)
img.save(output_image_path)
out_dataset = gdal.Open(output_image_path, gdal.GA_Update)
out_dataset.SetGeoTransform(gdal_dataset.GetGeoTransform())
out_dataset.SetProjection(gdal_dataset.GetProjection())
return mask_pred, presence_guess
[docs]def mask_with_model(
mask_model, pres_model, arr_w: np.ndarray, tile_size: int, crop_size: int = 0
) -> np.ndarray:
"""
Use a keras model prediction to mask events in a wrapped interferogram.
Parameters
-----------
model_path : str
The path to the model to use for masking.
pres_model_path : str
The path to the model that predicts the presence of an event in a mask.
arr_w : np.ndarray
The wrapped interferogram array.
tile_size : int
The width and height of the tiles that the image will be broken into, this needs
to match the input shape of the model.
crop_size : int, Optional
If the models output shape is different than the input shape, this value needs
to be equal to the output shape of the masking model and input shape of the
presence model.
Returns
--------
mask : np.ndarray(shape=(tile_size, tile_size) or (crop_size, crop_size))
The array containing the event-mask array as predicted by the model.
pres_mask : np.ndarray(shape=(tile_size, tile_size) or (crop_size, crop_size))
An array containing tiles where the tile is all 1s if there is an event else 0s.
If even a single tile has 1s that means an event has been identified.
"""
tiled_arr_w, w_rows, w_cols = models.tile(
arr_w,
(tile_size, tile_size),
x_offset=0,
y_offset=0,
even_pad=True,
crop_size=crop_size,
)
zeros = tiled_arr_w == 0
if crop_size == 0:
crop_size = tile_size
# tiled_arr_w += np.pi
# tiled_arr_w /= (2*np.pi)
# tiled_arr_w[zeros] = 0
mask_tiles = mask_model.predict(tiled_arr_w, batch_size=1)
mask_tiles[zeros] = 0
rnd = mask_tiles >= 0.5
trnc = mask_tiles < 0.5
# # rnd2 = mask_tiles >= 0.5
mask_tiles[trnc] = 0
# # mask_tiles[rnd2] = 0.5
mask_tiles[rnd] = 1
pres_vals = pres_model.predict(mask_tiles, batch_size=1)
pres_tiles = np.zeros((w_rows * w_cols, tile_size, tile_size))
for index, val in enumerate(pres_vals):
if val >= 0.75:
pres_tiles[index] = 1
mask_tiles = mask_tiles.reshape((w_rows * w_cols, tile_size, tile_size))
mask = models.tiles_to_image(mask_tiles, w_rows, w_cols, arr_w.shape)
mask[arr_w == 0] = 0
pres_mask = models.tiles_to_image(pres_tiles, w_rows, w_cols, arr_w.shape)
return mask, pres_mask, pres_vals
[docs]def plot_results(wrapped, mask, presence_mask):
_, [axs_wrapped, axs_mask, axs_presence_mask] = plt.subplots(
1, 3, sharex=True, sharey=True
)
axs_wrapped.set_title("Wrapped")
axs_mask.set_title("Segmentation Mask")
axs_presence_mask.set_title("Presence Mask")
axs_wrapped.imshow(wrapped, origin="lower", cmap="jet")
axs_mask.imshow(mask, origin="lower", cmap="jet")
axs_presence_mask.imshow(presence_mask, origin="lower", cmap="jet")
plt.show()
def _test_images_in_dir(
mask_model,
pres_model,
directory,
tile_size,
crop_size,
save_images=False,
output_dir=None,
):
"""
Helper for _test_model(). Evaluates EventNet Models over a directory of real
interferograms.
Parameters
-----------
mask_model : Keras Model
The model for masking
pres_model : Keras Model
The model for binary classification.
directory : str
A directory containing interferogram tifs.
tile_size : int
The width and height of the tiles that the image will be broken into, this needs
to match the input shape of the model.
crop_size : int, Optional
If the models output shape is different than the input shape, this value needs
to be equal to the output shape of the masking model and input shape of the
presence model.
"""
positives = 0
negatives = 0
arr_uw = 0
print("---------------------------------------")
print("tag | label | guess | confidence")
print("---------------------------------------")
for filename in os.listdir(directory):
if "unw_phase" in filename:
try:
arr_uw, dataset = io.get_image_array(os.path.join(directory, filename))
arr_w = np.angle(np.exp(1j * (arr_uw)))
except ConnectionError:
print("Failed to connect to dataset server")
except Exception as e:
print(f"Failed to load unwrapped phase image: {filename} due to {e}")
continue
elif "wrapped" in filename:
try:
arr_w, dataset = io.get_image_array(os.path.join(directory, filename))
except ConnectionError:
print("Failed to connect to dataset server")
except Exception as e:
print(f"Failed to load unwrapped phase image: {filename} due to {e}")
continue
mask, pres_mask, pres_vals = mask_with_model(
mask_model=mask_model,
pres_model=pres_model,
arr_w=arr_w,
tile_size=tile_size,
crop_size=crop_size,
)
presence_guess = np.any(np.max(pres_vals) > 0.75)
tag = filename.split("_")[-3]
label = "Positive" if "Positives" in directory else "Negative"
guess = "Positive" if presence_guess else "Negative"
print(f"{tag} | {label} | {guess} |{np.max(pres_vals): 0.8f}")
plot_results(arr_w, mask, pres_mask)
if presence_guess:
positives += 1
else:
negatives += 1
if save_images:
filename = f"{output_dir}/{tag}_mask.tif"
img = Image.fromarray(mask)
img.save(filename)
out_dataset = gdal.Open(filename, gdal.GA_Update)
out_dataset.SetGeoTransform(dataset.GetGeoTransform())
out_dataset.SetProjection(dataset.GetProjection())
out_dataset.FlushCache()
return positives, negatives
[docs]def test_model(
mask_model_path,
pres_model_path,
images_dir,
tile_size,
crop_size,
save_images=False,
output_dir=None,
):
"""
Evaluate EventNet Models over a directory of real interferograms.
Parameters
-----------
model_path : str
The path to the model to use for masking.
pres_model_path : str
The path to the model that predicts the presence of an event in a mask.
images_dir : str
A directory containing Positives and Negatives directories which have their
respective tifs.
tile_size : int
The width and height of the tiles that the image will be broken into, this needs
to match the input shape of the model.
crop_size : int, Optional
If the models output shape is different than the input shape, this value needs
to be qual to the output shape of the masking model and input shape of the
presence model.
"""
try:
mask_model = models.load_model(mask_model_path)
pres_model = models.load_model(pres_model_path)
except FileNotFoundError:
print("Models do not exist and couldn't be loaded")
except Exception as e:
print(f"Caught {type(e)}: {e}")
return
positive_dir = os.path.join(images_dir, "Positives")
negative_dir = os.path.join(images_dir, "Negatives")
true_positives, false_negatives = _test_images_in_dir(
mask_model,
pres_model,
positive_dir,
tile_size,
crop_size,
save_images,
output_dir,
)
false_positives, true_negatives = _test_images_in_dir(
mask_model,
pres_model,
negative_dir,
tile_size,
crop_size,
save_images,
output_dir,
)
total = true_positives + false_positives + true_negatives + false_negatives
accuracy = 100 * (true_positives + true_negatives) / total
print(f"Num True Positives: {true_positives}")
print(f"Num False Positives: {false_positives}")
print(f"Num True Negatives: {true_negatives}")
print(f"Num False Negatives: {false_negatives}")
print(f"Total Predictions: {total}")
print(f"Accuracy: {accuracy}%")