Source code for netflow.pose.similarity

import itertools
import numpy as np
import pandas as pd

from collections import defaultdict as ddict

from .._logging import logger


[docs]def mutual_knn_indices(d, n_neighbors=None): """ Get indices of mutual k-nearest neighbors (nn). Parameters ---------- d : `numpy.ndarray`, (m, m) Symmetric distance matrix. n_neighbors : {`int`, `None`} Number of mutual nns to include (does not include self), ``n_neighbors > 0``. (Uses ``n_neighbors + 1``, since each obs is its closest neighbor). If `None`, all neighbors are used (same as k-nns since all neighbors are mutually included). Returns ------- kmnn_indices : `defaultdict[`list`]` of the form ``{m : [up to ``n_neighbors`` mutual nns]}`` Defaultdict keyed by row index referrencing the row indices of its mutual nns out of ``n_neighbors`` nns. Note, this does not include itself in output. """ indices, distances = get_knn_indices_distances(d, n_neighbors=n_neighbors) kmnn_indices = ddict(list) for ix, nbrs in enumerate(indices): for nbr in nbrs: if nbr <= ix: continue if ix in indices[nbr]: kmnn_indices[ix].append(nbr) kmnn_indices[nbr].append(ix) return kmnn_indices
[docs]def edges_from_mutual_knn_indices(kmnn): """ Convert mutual k-nns to a list of edges. Parameters ---------- kmnn : `defaultdict` The mutual k-nn indices as returned from ``mutual_knn_indices``. Returns ------- edges : `list` The list of edges corresponding to the mutual k-nns. """ edges = list(itertools.chain(*[itertools.product([ix], [k for k in nns if k > ix]) for ix, nns in kmnn.items()])) return edges
[docs]def mutual_knn_edges(d, n_neighbors=None): """ Get edges between indices of mutual k-nearest neighbors (nn) from distance matrix. .. note:: Self is not included as one of the k-nns. Parameters ---------- d : `numpy.ndarray`, (m, m) Symmetric distance matrix. n_neighbors : {`int`, `None`} Number of mutual nns to include (does not include self), ``n_neighbors > 0``. (Uses ``n_neighbors + 1``, since each obs is its closest neighbor). If `None`, all neighbors are used (same as k-nns since all neighbors are mutually included). Returns ------- edges : `list` The list of edges corresponding to the mutual k-nns. """ indices, distances = get_knn_indices_distances(d, n_neighbors=n_neighbors) edges = [] for ix, nbrs in enumerate(indices): for nbr in nbrs: if nbr <= ix: continue if ix in indices[nbr]: edges.append((ix, nbr)) return edges
[docs]def get_knn_indices_distances(d, n_neighbors=None): """ Get indices of and distances to k-nearest neighbors Parameters ---------- d : `numpy.ndarray`, (m, m) Symmetric distance matrix. n_neighbors : {`int`, `None`} K-th nearest neighbor (or number of nearest neighbors) to use for computing ``sigmas``, ``n_neighbors > 0``. (Uses ``n_neighbors + 1``, since each obs is its closest neighbor). If `None`, all neighbors are used. Returns ------- indices : `numpy.ndarray`, (m, n_neighbors) Matrix with indices of k-nearest neighbors in each row Note, this does not include itself in output) distances : `numpy.ndarray`, (m, n_neighbors) Matrix with distance to k-nearest neighbors Note, this does not include itself in output. """ if isinstance(d, (pd.DataFrame, pd.Series)): d = d.values n_neighbors = d.shape[0] - 1 if n_neighbors is None else n_neighbors sample_range = np.arange(d.shape[0])[:, None] indices = np.argpartition(d, n_neighbors, axis=1)[:, :n_neighbors + 1] indices = indices[sample_range, np.argsort(d[sample_range, indices])] # each row has indices of k nearest neighbors sorted in order of nearest neighbor (including self) distances = d[sample_range, indices] # each row has sorted distances to k nearest neighbors (including self) # exclude the first point (self - 0th neighbor) indices = indices[:, 1:] distances = distances[:, 1:] return indices, distances
[docs]def sigma_knn_(d, n_neighbors=None, method='mean', return_nn=False): """ Determine sigma for each obs as the distance to its k-th neighbor. Parameters ---------- d : `numpy.ndarray`, (n_observations, n_observations) Symmetric distance matrix. n_neighbors : {`int`, `None`} K-th nearest neighbor (or number of nearest neighbors) to use for computing ``sigmas``, ``n_neighbors > 0``. (Uses ``n_neighbors + 1``, since each obs is it's closest neighbor). If `None`, all neighbors are used. method : {'mean', 'median', 'max'} Indicate how to compute sigma. return_nn : `bool` If `True`, also return indices and distances of ``n_neighbors`` nearest neighbors. Options: - 'mean' : mean of distance to ``n_neighbors`` nearest neighbors - 'median' : median of distance to ``n_neighbors`` nearest neighbors - 'max' : distance to ``n_neighbors``-nearest neighbor Returns ------- sigmas : `numpy.ndarray`, (n_observations, ) The distance to the k-th nearest neighbor for all rows in ``d``. Sigmas represent the kernel width representing each data point's accessible neighbors. indices : `numpy.ndarray`, (n_observations, ) Indices of nearest neighbors where each row corresponds to an observation. Returned if ``return_nn`` is `True`. distances : `numpy.ndarray`, (n_observations, ``n_neighbors + 1``) Distances to nearest neighbors where each row corresponds to an obs. Returned if ``return_nn`` is `True`. """ indices, distances = get_knn_indices_distances(d, n_neighbors=n_neighbors) if method == 'mean': sigmas = np.mean(distances, axis=1) elif method == 'median': sigmas = np.median(distances, axis=1) elif method == 'max': sigmas = distances[:, -1] / 2 else: msg = f"{method!r} not recognized for value of method, must be one of ['mean', 'median', 'max']." raise AssertionError(msg) if return_nn: return sigmas, indices, distances return sigmas
[docs]def sigma_knn(keeper, key, label=None, n_neighbors=None, method='mean', return_nn=False): """ Set sigma for each obs as the distance to its k-th neighbor from keeper. Parameters ---------- keeper : `netflow.Keeper` The keeper object that stores the symmetric distance matrix of size (n_observations, n_observations). key : `str` The label used to reference the distance matrix stored in ``keeper.distances``, of size (n_observations, n_observations). label : `str` Label used to store resulting sigmas in ``keeper.misc['sigmas_'+label]``. If ``return_nn`` is `True`, nearest neighbor indices are stored in ``keeper.misc['nn_indices_'+label]`` and nearest neighbor distances are stored in ``keeper.misc['nn_distances_'+label]``. n_neighbors : {`int`, `None`} K-th nearest neighbor (or number of nearest neighbors) to use for computing ``sigmas``, ``n_neighbors > 0``. (Uses ``n_neighbors + 1``, since each obs is it's closest neighbor). If `None`, all neighbors are used. method : {'mean', 'median', 'max'} Indicate how to compute sigma. Options: - 'mean' : mean of distance to ``n_neighbors`` nearest neighbors - 'median' : median of distance to ``n_neighbors`` nearest neighbors - 'max' : distance to ``n_neighbors``-nearest neighbor return_nn : `bool` If `True`, also return/store indices and distances of ``n_neighbors`` nearest neighbors. Returns ------- sigmas : `numpy.ndarray`, (n_observations, ) The distance to the k-th nearest neighbor for all rows in ``d``. Sigmas represent the kernel width representing each data point's accessible neighbors. indices : `numpy.ndarray`, (n_observations, ) Indices of nearest neighbors where each row corresponds to an observation. Returned if ``return_nn`` is `True`. distances : `numpy.ndarray`, (n_observations, ``n_neighbors + 1``) Distances to nearest neighbors where each row corresponds to an obs. Returned if ``return_nn`` is `True`. """ d = keeper.distances[key] sigmas, indices, distances = sigma_knn_(d.data, n_neighbors=n_neighbors, method=method, return_nn=True) if label is None: if return_nn: return sigmas, indices, distances else: return sigmas else: label = label if label == '' else '_'+label keeper.add_misc(sigmas, 'sigmas'+label) if return_nn: keeper.add_misc(indices, 'nn_indices'+label) keeper.add_misc(distances, 'nn_distances'+label) return None
# this used to be def similarity_measure_()
[docs]def _distance_to_similarity(d, n_neighbors, method, sigmas=None, knn=False, indices=None): """ Convert distance matrix to symmetric similarity measure. .. math:: K = \sqrt{2\sigma_i\sigma_j / (\sigma_i^2 + \sigma_j^2)}\exp{-(x-y)^2 / (\sigma_x^2 + \sigma_y^2)}. Parameters ---------- d : `numpy.ndarray`, (n_observations, n_observations) Symmetric distance matrix. n_neighbors : {`int`, `None`} K-th nearest neighbor (or number of nearest neighbors) to use for computing ``sigmas``, ``n_neighbors > 0``. (Uses ``n_neighbors + 1``, since each obs is it's closest neighbor). If `None`, all neighbors are used. method : {`float`, `int`, 'mean', 'median', 'max', 'precomputed'} Indicate how to compute sigma. Options: - `float` : constant float to use as sigma - `int` : constant int to use as sigma - 'mean' : mean of distance to ``n_neighbors`` nearest neighbors - 'median' : median of distance to ``n_neighbors`` nearest neighbors - 'max' : distance to ``n_neighbors``-nearest neighbor - 'precomputed' : precomputed values passed to ``sigmas`` sigmas : `numpy.ndarray`, (n_observations, ) Option to provide precomputed sigmas , ignored unless ``method='precomputed'``. knn : `bool` If `True`, restrict similarity measure to be non-zero only between ``n_neighbors`` nearest neighbors. indices : `numpy.ndarray`, (n_observations, n_neighbors) Option to provide precomputed indices of ``n_neighbors`` nearest neighbors for each obs when ``method`` = 'precomputed' and ``knn`` = `True` Returns ------- K : `numpy.ndarray`, (n_observations, n_observations) Symmetric similarity measure. """ if isinstance(method, float): sigmas = np.array([method]*d.shape[0]) elif isinstance(method, int): sigmas = np.array([float(method)]*d.shape[0]) elif isinstance(method, str): if method == 'precomputed': if sigmas is None: msg = "When `method` is 'precomputed', `sigmas` must be provided." raise AssertionError(msg) if knn and (indices is None): msg = "When `method` is 'precomputed' and `knn` is `True`, `indices` must be provided." raise AssertionError(msg) else: if knn: sigmas, indices, distances = sigma_knn_(d, n_neighbors=n_neighbors, method=method, return_nn=True) else: sigmas = sigma_knn_(d, n_neighbors=n_neighbors, method=method, return_nn=False) else: raise TypeError("Uncrecognized type for `method`, must be `float`, `int`, or `str`.") sigmas_sq = np.power(sigmas, 2) Num = 2 * np.multiply.outer(sigmas, sigmas) Den = np.add.outer(sigmas_sq, sigmas_sq) K = np.sqrt(Num / Den) * np.exp(-np.power(d, 2) / (2 * Den)) # TO DO: integral may not sum to 1 # K = np.sqrt(Num / Den) * np.exp(-np.power(d, 2) / (Den)) # TO DO: integral may not sum to 1 if knn: # restrict to nearest neighbors (symmetric) mask = np.zeros(d.shape, dtype=bool) for i, row in enumerate(indices): mask[i, row] = True for j in row: if i not in set(indices[j]): K[j, i] = K[i, j] mask[j, i] = True # set all entries that are not nearest neighbors to zero K[~mask] = 0 # remove neglibible values: mask = K > 1e-11 K[~mask] = 0 # K = (K + K.T) / 2 return K
[docs]def distance_to_similarity(keeper, key, n_neighbors, method, label=None, sigmas=None, knn=False, indices=None): """ Convert distance matrix to symmetric similarity measure. Parameters ---------- keeper : `netflow.Keeper` The keeper object that stores the symmetric distance matrix of size (n_observations, n_observations). key : `str` The label used to reference the distance matrix stored in ``keeper.distances``, of size (n_observations, n_observations). n_neighbors : {`int`, `None`} K-th nearest neighbor (or number of nearest neighbors) to use for computing ``sigmas``, ``n_neighbors > 0``. (Uses ``n_neighbors + 1``, since each obs is it's closest neighbor). If `None`, all neighbors are used. method : {`float`, 'mean', 'median', 'max', 'precomputed'} Indicate how to compute sigma. Options: - `float` : constant float to use as sigma - `int` : constant int to use as sigma - 'mean' : mean of distance to ``n_neighbors`` nearest neighbors - 'median' : median of distance to ``n_neighbors`` nearest neighbors - 'max' : distance to ``n_neighbors``-nearest neighbor - 'precomputed' : precomputed values passed to ``sigmas`` label : `str` Label used to store resulting similarity matrix of size (n_observations, n_observations) in ``keeper.similarities``. sigmas : `str` Option to provide precomputed sigmas, ignored unless ``method='precomputed'``. If provided, the precomputed sigmas are extracted from ``keeper.misc[sigmas]`` as a `numpy.ndarray` of size (n_observations, ). knn : `bool` If `True`, restrict similarity measure to be non-zero only between ``n_neighbors`` nearest neighbors. indices : {`None`, `str`} Option to provide precomputed indices of ``n_neighbors`` nearest neighbors for each obs when ``method`` = 'precomputed' and ``knn`` = `True`. If provided, the indices are extracted from ``keeper.misc[indices]`` as a `numpy.ndarray` of size (n_observations, n_neighbors). Returns ------- K : `numpy.ndarray`, (n_observations, n_observations) Symmetric similarity measure. if ``label`` is not `None`, this is stored in ``keeper.similarities[label]`` instead of being returned. """ d = keeper.distances[key] if isinstance(method, str) and method=='precomputed': if sigmas is not None: sigmas = keeper.misc[sigmas] else: raise ValueError("Must pass sigmas key when method='precomputed'.") if indices is not None: indices = keeper.misc[indices] K = _distance_to_similarity(d.data, n_neighbors, method, sigmas=sigmas, knn=knn, indices=indices) if label is None: return K else: keeper.add_similarity(K, label) return None