Skip to content

Commit

Permalink
Merge pull request #33 from fcomitani/to_be_merged
Browse files Browse the repository at this point in the history
simpsom clean up
  • Loading branch information
fcomitani committed Jul 12, 2023
2 parents abd67e9 + a025e57 commit bec2400
Show file tree
Hide file tree
Showing 163 changed files with 238 additions and 229 deletions.
11 changes: 0 additions & 11 deletions .travis.yml

This file was deleted.

103 changes: 67 additions & 36 deletions simpsom/distances.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
from types import ModuleType
from typing import Optional

import numpy as np
from loguru import logger
Expand All @@ -18,33 +19,63 @@ def __init__(self, xp: ModuleType = None) -> None:

self.xp = xp

def euclidean_distance(self, x: np.ndarray, w: np.ndarray) -> float:
"""Calculate the L2 distance between two arrays.
def _euclidean_squared_distance_part(self, x: np.ndarray, w: np.ndarray,
w_flat_sq: Optional[np.ndarray] = None) -> float:
""" Calculate the partial squared L2 distance.
Args:
x (array): first array.
w (array): second array.
Returns:
(float): the euclidean distance between two
provided arrays
(float): the partial L2 squared distance between two
provided arrays
"""

w_flat = w.reshape(-1, w.shape[2])
if w_flat_sq is None:
w_flat_sq = self.xp.power(w_flat, 2).sum(axis=1, keepdims=True)
cross_term = self.xp.dot(x, w_flat.T)
return -2 * cross_term + w_flat_sq.T

def _euclidean_squared_distance(self, x: np.ndarray, w: np.ndarray,
w_flat_sq: Optional[np.ndarray] = None) -> float:
"""Calculate the full squared L2 distance.
Args:
x (array): first array.
w (array): second array.
Returns:
(float): the full L2 squared distance between two
provided arrays
"""
x_sq = self.xp.power(x, 2).sum(axis=1, keepdims=True)
return self._euclidean_squared_distance_part(x, w, w_flat_sq) + x_sq

w_flat = w.reshape(-1, w.shape[2])
w_flat_sq = self.xp.power(w_flat, 2).sum(axis=1, keepdims=True)
def euclidean_distance(self, x: np.ndarray, w: np.ndarray, w_flat_sq: np.ndarray) -> float:
"""Calculate the L2 distance between two arrays.
result = x_sq + w_flat_sq.T - 2 * self.xp.dot(x, w_flat.T)
Args:
x(array): first array.
w(array): second array.
return self.xp.nan_to_num(self.xp.sqrt(result))
Returns:
(float): the euclidean distance between two
provided arrays
"""
return self.xp.nan_to_num(
self.xp.sqrt(
self._euclidean_squared_distance(x, w, w_flat_sq)
)
)

def cosine_distance(self, x: np.ndarray, w: np.ndarray) -> float:
def cosine_distance(self, x: np.ndarray, w: np.ndarray, w_flat_sq: np.ndarray) -> float:
"""Calculate the cosine distance between two arrays.
Args:
x (array): first array.
w (array): second array.
x(array): first array.
w(array): second array.
Returns:
(float): the euclidean distance between two
Expand All @@ -54,22 +85,21 @@ def cosine_distance(self, x: np.ndarray, w: np.ndarray) -> float:
x_sq = self.xp.power(x, 2).sum(axis=1, keepdims=True)

w_flat = w.reshape(-1, w.shape[2])
w_flat_sq = self.xp.power(w_flat, 2).sum(axis=1, keepdims=True)

similarity = self.xp.nan_to_num(
self.xp.dot(x, w_flat.T) / self.xp.sqrt(x_sq * w_flat_sq.T))
similarity = self.xp.nan_to_num(self.xp.dot(
x, w_flat.T) / self.xp.sqrt(x_sq * w_flat_sq.T))

return 1 - similarity

def manhattan_distance(self, x: np.ndarray, w: np.ndarray) -> float:
"""Calculate Manhattan distance between two arrays.
Args:
x (array): first array.
w (array): second array.
x(array): first array.
w(array): second array.
Returns:
(float): the manhattan distance
(float): the manhattan distance
between two provided arrays.
"""

Expand All @@ -92,53 +122,54 @@ def manhattan_distance(self, x: np.ndarray, w: np.ndarray) -> float:

else:
d = self.xp.linalg.norm(
x[:, self.xp.newaxis, self.xp.newaxis, :] - w[self.xp.newaxis, :, :, :],
x[:, self.xp.newaxis, self.xp.newaxis, :] -
w[self.xp.newaxis, :, :, :],
ord=1,
axis=3
)

return d.reshape(x.shape[0], w.shape[0] * w.shape[1])

def batchpairdist(self, x: np.ndarray, w: np.ndarray, metric: str) -> np.ndarray:
def batchpairdist(self, x: np.ndarray, w: np.ndarray, sq: np.ndarray, metric: str) -> np.ndarray:
""" Calculates distances betweens points in batches. Two array-like objects
must be provided, distances will be calculated between all points in the
must be provided, distances will be calculated between all points in the
first array and all those in the second array.
Args:
a (array): first array.
b (array): second array.
metric (string): distance metric.
Accepted metrics are euclidean, manhattan, and cosine (default "euclidean").
a(array): first array.
b(array): second array.
metric(string): distance metric.
Accepted metrics are euclidean, manhattan, and cosine(default "euclidean").
Returns:
d (array or list): the calculated distances.
d(array or list): the calculated distances.
"""

if metric == "euclidean":
return self.euclidean_distance(x, w)
return self.euclidean_distance(x, w, sq)

elif metric == "cosine":
return self.cosine_distance(x, w)
return self.cosine_distance(x, w, sq)

elif metric == "manhattan":
return self.manhattan_distance(x, w)

logger.error("Available metrics are: " + \
logger.error("Available metrics are: " +
"\"euclidean\", \"cosine\" and \"manhattan\"")
sys.exit(1)

def pairdist(self, a: np.ndarray, b: np.ndarray, metric: str) -> np.ndarray:
""" Calculates distances betweens points. Two array-like objects
must be provided, distances will be calculated between all points in the
must be provided, distances will be calculated between all points in the
first array and all those in the second array.
Args:
a (array): first array.
b (array): second array.
metric (string): distance metric.
Accepted metrics are euclidean, manhattan, and cosine (default "euclidean").
a(array): first array.
b(array): second array.
metric(string): distance metric.
Accepted metrics are euclidean, manhattan, and cosine(default "euclidean").
Returns:
d (array or list): the calculated distances.
d(array or list): the calculated distances.
"""

if metric == "euclidean":
Expand All @@ -151,9 +182,9 @@ def pairdist(self, a: np.ndarray, b: np.ndarray, metric: str) -> np.ndarray:
(b / self.xp.linalg.norm(b, axis=1)[:, None]).T)

elif metric == "manhattan":
func = lambda x, y: self.xp.sum(self.xp.abs(x.T - y), axis=-1)
def func(x, y): return self.xp.sum(self.xp.abs(x.T - y), axis=-1)
return self.xp.stack([func(a[i], b) for i in range(a.shape[0])])

logger.error("Available metrics are: " + \
logger.error("Available metrics are: " +
"\"euclidean\", \"cosine\" and \"manhattan\"")
sys.exit(1)
33 changes: 17 additions & 16 deletions simpsom/neighborhoods.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@


class Neighborhoods:
""" Container class with functions to calculate neihgborhoods. """
""" Container class with functions to calculate neighborhoods. """

def __init__(self, xp: ModuleType = None) -> None:
def __init__(self, xp: ModuleType, xx: np.ndarray, yy: np.ndarray, pbc_func: Union[Callable, None]) -> None:
""" Instantiate the Neighborhoods class.
Args:
xp (numpy or cupy): the numeric labrary to use
to calculate distances.
xx (array): x coordinates in the grid mesh.
yy (array): y coordinates in the grid mesh.
pbc_function (Callable): function to extend a distance
function to account for pbc, as defined in polygons
"""

self.xp = xp
self.xx = xx
self.yy = yy
self.pbc_func = pbc_func

def gaussian(self, c: np.ndarray, n: np.ndarray,
denominator: float) -> np.ndarray:
Expand Down Expand Up @@ -60,31 +67,25 @@ def bubble(self, c: np.ndarray, n: np.ndarray,

return self.xp.abs(n - c) < threshold

def neighborhood_caller(self, center: Tuple[np.ndarray], sigma: float,
xx: np.ndarray, yy: np.ndarray,
neigh_func: str, pbc_func: Union[Callable, None] = None) -> np.ndarray:
"""Returns a neighborhood selection on any 2d topology.
def neighborhood_caller(self, neigh_func: str, center: Tuple[np.ndarray], sigma: float) -> np.ndarray:
""" Returns a neighborhood selection on any 2d topology.
Args:
center (Tuple[np.ndarray]): index of the center point along the xx yy grid.
sigma (float): standard deviation/size coefficient.
xx (array): x coordinates in the grid mesh.
yy (array): y coordinates in the grid mesh.
nigh_func (str): neighborhood specific distance function name
(choose among 'gaussian', 'mexican_hat' or 'bubble')
pbc_function (Callable): function to extend a distance
function to account for pbc, as defined in polygons
Returns:
(array): the resulting neighborhood matrix.
"""

d = 2 * sigma ** 2

nx = xx[self.xp.newaxis, :, :]
ny = yy[self.xp.newaxis, :, :]
cx = xx.T[center][:, self.xp.newaxis, self.xp.newaxis]
cy = yy.T[center][:, self.xp.newaxis, self.xp.newaxis]
nx = self.xx[self.xp.newaxis, :, :]
ny = self.yy[self.xp.newaxis, :, :]
cx = self.xx.T[center][:, self.xp.newaxis, self.xp.newaxis]
cy = self.yy.T[center][:, self.xp.newaxis, self.xp.newaxis]

if neigh_func == 'gaussian':
shape_fun = lambda x, y: self.gaussian(x, y, denominator=d)
Expand All @@ -97,8 +98,8 @@ def neighborhood_caller(self, center: Tuple[np.ndarray], sigma: float,
"Choose among 'gaussian', 'mexican_hat' or 'bubble'.")
raise ValueError

if pbc_func is not None:
px, py = pbc_func((cx, cy), (nx, ny), (nx.shape[2], nx.shape[1]), shape_fun, self.xp)
if self.pbc_func is not None:
px, py = self.pbc_func((cx, cy), (nx, ny), (nx.shape[2], nx.shape[1]), shape_fun, self.xp)
else:
px = shape_fun(cx, nx)
py = shape_fun(cy, ny)
Expand Down
Loading

0 comments on commit bec2400

Please sign in to comment.