Source code for netflow.methods.classes

"""
Classes to compute distances.
"""

from pathlib import Path

from collections import defaultdict
from functools import partial
from itertools import combinations
import matplotlib.pyplot as plt
import multiprocessing as mp
import networkx as nx
import numpy as np
import pandas as pd
import scipy.spatial as ss
import scipy.stats as sc_stats
from tqdm import tqdm

# from .._logging import logger
from .._logging import _gen_logger, set_verbose
from ..checks import *
from .metrics import pairwise_observation_euc_distances, pairwise_observation_wass_distances

import netflow.utils as utl
# from importlib import reload
# reload(utl)
# kendall_tau_ = utl.kendall_tau_
# clustermap = utl.clustermap

# from ..utils import compute_graph_distances, heat_kernel, compute_edge_weights, get_times, \
#     construct_anisotropic_laplacian_matrix, clustermap, spearmanr_, kendall_tau_, stack_triu_, \
#     stack_triu_where_, dispersion_

# import netflow.utils as utl
# from importlib import reload
# reload(utl)
# # heat_kernel = utl.heat_kernel
# construct_anisotropic_laplacian_matrix = utl.construct_anisotropic_laplacian_matrix

logger = _gen_logger(__name__)

[docs]class InfoNet: """ A class to compute information flow on a network and correlation between network modules Parameters ---------- keeper : `netflow.Keeper` The keeper object that stores the data of size (n_features, n_observations). graph_key : 'str' The key to the graph in the graph keeper that should be used. (Does not have to include all features in the data) layer : `str` The key to the data in the data keeper that should be used. """
[docs] def __init__(self, keeper, graph_key, layer, verbose=None): # logger.msg("INFONET HAS BEEN ENTERED") if verbose is not None: set_verbose(logger, verbose) self.data = keeper.data[layer] check_matrix_no_nan(self.data.data) logger.debug(f"Loaded data: {self.data.data.shape}") if graph_key is not None: G = keeper.graphs[graph_key].copy() # keeper.misc[graph_key].copy() check_connected_graph(G) check_graph_no_self_loops(G) logger.debug(f"Loaded graph: {G}") if not nx.get_node_attributes(G, name='name'): # G = nx.convert_node_labels_to_integers(G, label_attribute='name') # Note: need nodes to line up with order in data nx.set_node_attributes(G, {k: k for k in G}, name='name') # nx.relabel_nodes(G, {k: ix for ix, k in enumerate(self.data.feature_labels)}, copy=False) nx.relabel_nodes(G, dict(zip(self.data.feature_labels, range(self.data.num_features))), copy=False) self.G = G self.name2node = {v: k for k, v in nx.get_node_attributes(self.G, "name").items()} # self.data = data.rename(index=self.name2node) # self.features = data.index.tolist()[:] # self.observations = data.columns.tolist()[:] self.features = [self.name2node[i] for i in self.data.feature_labels if i in self.name2node] # HERE!!! # assert self.features == list(range(len(self.G))), "Graph features are not ordered according to data features." assert all([self.data.feature_labels[i] == self.G.nodes[i]['name'] for i in self.G]), \ "Features in the graph are not ordered according to the data features." assert max(list(self.G)) == len(self.features) - 1, f"Features in the graph are not consecutively ordered." else: self.name2node = dict(zip(self.data.feature_labels, range(len(self.data.feature_labels)))) self.features = self.data.feature_labels self.keeper = keeper self.observations = self.data.observation_labels self.meta = {} # can be used to reference data to be used for subsequent analysis self.outdir = keeper.outdir # outdir if (outdir is None or isinstance(outdir, Path)) else Path(outdir) if self.outdir is not None: if self.outdir.is_dir(): # store list of names of saved files self.filenames = [k.name for k in self.outdir.iterdir()] else: logger.msg(f"Creating directory {self.outdir}.") self.outdir.mkdir() self.filenames = []
[docs] def invariant_measures(self, label='IM'): """ Compute the invariant measure for each observation using data as node weights. Parameters ---------- label : `str` Label of key used to store invariant measures in the data keeper. Returns ------- Makes the invariant measures attribute of size (n_observations, n_observations) in ``keeper.data[label]`` available. """ check_matrix_nonnegative(self.data.data) IM = utl.invariant_measure(self.data.data, G=self.G) self.keeper.add_data(IM, label) raise NotImplementedError("Not yet implemented")
[docs] def spearmanr(self, data, **kwargs): """ Calculate a Spearman correlation coefficient with associated p-value using scipy.stats.spearmanr. Parameters ---------- data : `numpy.ndarray`, (n_observations, n_features) 2-D array containing multiple variables and observations. **kwargs : `dict` Optional key-word arguments passed to `scipy.stats.spearmanr`. Returns ------- R : `pandas.DataFrame` Spearman correlation matrix. The correlation matrix is square with length equal to total number of variables (columns or rows). pvalue : `float` The p-value for a hypothesis test whose null hypotheisis is that two sets of data are uncorrelated. See documentation for scipy.stats.spearmanr for alternative hypotheses. ``p`` has the same shape as ``R``. """ # stats = sc_stats.spearmanr(data, axis=0, **kwargs) # R = pd.DataFrame(data=stats.correlation, index=data.columns.copy(), columns=data.columns.copy()) # p = pd.DataFrame(data=stats.pvalue, index=data.columns.copy(), columns=data.columns.copy()) R, p = utl.spearmanr_(data, **kwargs) return R, p
[docs] def kendall_tau(self, data, **kwargs): """ Calculate Kendall's tau correlation coefficient with associated p-value using scipy.stats.kendalltau. Parameters ---------- data : `numpy.ndarray`, (n_observations, n_features) 2-D array containing multiple variables and observations. **kwargs : `dict` Optional key-word arguments passed to `scipy.stats.kendalltau`. Returns ------- R : `pandas.DataFrame` Spearman correlation matrix. The correlation matrix is square with length equal to total number of variables (columns or rows). pvalue : `float` The p-value for a hypothesis test whose null hypotheisis is that two sets of data are uncorrelated. See documentation for `scipy.stats.spearmanr` for alternative hypotheses. ``p`` has the same shape as ``R``. """ R, p = utl.kendall_tau_(data, **kwargs) return R, p
[docs] def stack_triu(self, df, name=None): """ Stack the upper triangular entries of the dataframe above the diagonal Note, this is useful for symmetric dataframes like correlations or distances. Parameters ---------- df : pandas DataFrame Dataframe to stack. Note, upper triangular entries are taken from ``df`` as provided, with no check that the rows and columns are symmetric. name : str Optional name of pandas Series output ``df_stacked``. Returns ------- df_stacked : pandas Series The stacked upper triangular entries above the diagonal of the dataframe. """ # df_stacked = df.stack()[np.triu(np.ones(df.shape).astype(bool), 1).reshape(df.size)] # df_stacked.name = name df_stacked = utl.stack_triu_(df, name=name) return df_stacked
[docs] def stack_triu_where(self, df, condition, name=None): """ Stack the upper triangular entries of the dataframe above the diagonal where the condition is `True` Note, this is useful for symmetric dataframes like correlations or distances. Parameters ---------- df : pandas DataFrame Dataframe to stack. Note, upper triangular entries are taken from ``df`` as provided, with no check that the rows and columns are symmetric. condition : pandas DataFrame Boolean dataframe of the same size and order of rows and columns as `df` indicating values, where `True`, to include in the stacked dataframe. name : `str`, optional Name of pandas Series output ``df_stacked``. Returns ------- df_stacked : pandas Series The stacked upper triangular entries above the diagonal of the dataframe, where ``condition`` is `True`. """ # df_stacked = df.stack()[np.triu(condition.astype(bool), 1).reshape(df.size)] # df_stacked.name = name df_stacked = utl.stack_triu_where_(df, condition, name=name) return df_stacked
[docs] def dispersion(self, data, axis=0): """ Data dispersion computed as the absolute value of the variance-to-mean ratio where the variance and mean is computed on the values over the requested axis. Parameters ---------- data : `pandas.DataFrame` Data used to compute dispersion. axis : {0, 1} Axis on which the variance and mean is applied on computed. Options: - 0 : for each column, apply function to the values over the index - 1 : for each index, apply function to the values over the columns Returns ------- vmr : `pandas.Series` Variance-to-mean ratio (vmr) quantifying the disperion. """ raise NotImplementedError # vmr = np.abs(data.var(axis=axis) / data.mean(axis=axis)) vmr = utl.dispersion_(data, axis=axis) return vmr
[docs] def value_counter(self, values): """ returns dictionary with the number of times each value appears. Parameters ---------- values : iterable List of values. Returns ------- counter : `defaultdict` [value, `int`] Dictionary of the form {value : count} with the number of times each value appears in the iterable. """ counter = defaultdict(int) for k in values: counter[k] += 1 return counter
[docs] def weighted_observation_network(self, observation, weight='weight', data=None, **kwargs): """ return weighted graph by observation feature. Parameters ---------- observation : `str` Name of observation to use. weight : `str` Name of node attribute that the observation feature is saved to in the returned graph, default = 'weight' data : {`None`, `str`, `pandas.DataFrame`} Specify which data the observation weights should be taken from. data options: - `None` : The original data is used. - `str` : ``data`` is is expected to be a key in ``self.meta`` and the observation weights are taken from the data in the correesponding dict-value. - `pandas.DataFrame` : The dataframe is used to select the observation weights, where the observation is expected to be one of the columns and the rows are expected to be labelled by the node indices, :math:`0, 1, ..., n-1` where :math:`n` is the number of nodes in the graph. kwargs : `dict`, optional Specify arguments passed to computing edge weights. Returns ------- G : networkx graph The observation-specific node-weighted graph. """ raise NotImplementedError G = self.G.copy() if data is None: s_data = self.data[observation].to_dict() elif isinstance(data, str): s_data = self.meta[data][observation].to_dict() else: # pandas DataFrame s_data = data[observation].to_dict() nx.set_node_attributes(G, s_data, name=weight) utl.compute_edge_weights(G, n_weight=weight, e_weight=weight, **kwargs) return G
[docs] def compute_graph_distances(self, G=None, weight='weight'): """ compute graph distances Parameters ---------- G : `networkx.Graph` The graph with nodes assumed to be labeled consecutively from :math:`0, 1, ..., n-1` where :math:`n` is the number of nodes. weight : `str`, optional Edge attribute of weights used for computing the weighted hop distance. If `None`, compute the unweighted distance. That is, rather than minimizing the sum of weights over all paths between every two nodes, minimize the number of edges. Returns ------- dist : `numpy.ndarray`, (n, n) A matrix of node-pairwise graph distances between the :math:`n` nodes. """ G = self.G if G is None else G dist = utl.compute_graph_distances(G, weight=weight) return dist
[docs] def neighborhood(self, node, include_self=False): """ return neighborhood of the node. Parameters ---------- node : `int` Node of interest. include_self : `bool` If `True`, include ``node`` in its neighborhood. Returns ------- neighborhood : `list` List of nodes in the neighborhood of ``node``. """ neighborhood = list(self.G.neighbors(node)) if include_self: neighborhood = neighborhood + [node] return neighborhood
[docs] def neighborhood_profiles(self, node, include_self=False): """ return profiles on the neighborhood of the node. Parameters ---------- node : `int` Node in the graph to compute the neighborhood on. include_self : `bool` If `True`, add node in neighborhood which will result in computing normalized profile over the neighborhood. If `False`, node is not included in neighborhood which results in computing the transition distribution over the neighborhood. Returns ------- neighborhood : `list` List of nodes in the neighborhood of ``node``. sub_profiles : `pandas.DataFrame` The profiles over the node neighborhood. """ # neighborhood = list(self.G.neighbors(node)) # if include_self: # neighborhood = neighborhood + [node] # if data is None: # s_data = self.data # elif isinstance(data, str): # s_data = self.meta[data] # else: # pandas DataFrame # s_data = data # s_data = self.data.data if data is None else data neighborhood = self.neighborhood(node, include_self=include_self) # sub_profiles = s_data.loc[neighborhood].copy() sub_profiles = self.data.data[neighborhood, :].copy() return neighborhood, sub_profiles
[docs] def pairwise_observation_neighborhood_wass_distance(self, node, include_self=False, graph_distances=None, proc=mp.cpu_count(), chunksize=None, measure_cutoff=1e-6, solvr=None): """ Compute observation-pairwise Wasserstein distances between the profiles over node neighborhood. Parameters ---------- node : `int` Node in the graph to compute the neighborhood on. include_self : `bool` If `True`, add node in neighborhood which will result in computing normalized profile over the neighborhood. If `False`, node is not included in neighborhood which results in computing the transition distribution over the neighborhood. graph_distances : `numpy.ndarray`, (n, n) A matrix of node-pairwise graph distances between the n nodes (ordered from :math:`0, 1, ..., n-1`). If `None`, use hop distance. measure_cutoff : `float` Threshold for treating values in profiles as zero, default = 1e-6. proc : `int` Number of processor used for multiprocessing. (Default value = cpu_count()). chunksize : `int` Chunksize to allocate for multiprocessing. solvr : `str` Solver to pass to POT library for computing Wasserstein distance. Returns ------- wd : `pandas.Series` Wasserstein distances between pairwise observations """ # check_matrix_nonnegative(self.data.data) nbhd, profiles = self.neighborhood_profiles(node, include_self=include_self) if graph_distances is None: logger.debug("Computing graph hop distances.") graph_distances = self.compute_graph_distances(weight=None) # graph_distances = graph_distances[np.ix_(profiles.index.tolist(), profiles.index.tolist())] graph_distances = graph_distances[np.ix_(nbhd, nbhd)] wd = pairwise_observation_wass_distances(pd.DataFrame(data=profiles, columns=self.observations), # (n_features, n_observations) graph_distances, proc=proc, chunksize=chunksize, measure_cutoff=measure_cutoff, solvr=solvr, flag=f"node {node}") return wd
[docs] def pairwise_observation_neighborhood_euc_distance(self, node, include_self=False, metric='euclidean', normalize=False, **kwargs): """ Compute observation-pairwise Euclidean distances between the profiles over node neighborhood. Parameters ---------- profiles : `pandas.DataFrame` (n_features, n_observations) Profiles that Euclidean distance is computed between. node : `int` Node in the graph to compute the neighborhood on. include_self : `bool` If `True`, include node in neighborhood. If `False`, node is not included in neighborhood. metric : `str` The metric used to compute the distance, passed to scipy.spatial.distance.cdist. normalize : `bool` If `True`, normalize neighborhood profiles to sum to 1. **kwargs : `dict` Extra arguments to metric, passed to scipy.spatial.distance.cdist. Returns ------- ed : `pandas.Series` Euclidean distances between pairwise observations """ nbhd, profiles = self.neighborhood_profiles(node,include_self=include_self) if normalize: profiles = profiles / profiles.sum(axis=0) ed = pairwise_observation_euc_distances(pd.DataFrame(data=profiles, columns=self.observations), metric=metric, **kwargs) return ed
[docs] def anisotropic_laplacian_matrix(self, observation=None, use_spectral_gap=False, data=None): """ returns transpose of the anisotropic random-walk Laplacian matrix Parameters ---------- observation : {`None`, `str`} If provided, use observation-weighted graph to construct the Laplacian. Otherwise, if `None`, The Laplacian is constructed from the unweighted graph, treated with uniform weights equal to 1. use_spectral_gap : `bool` Option to use spectral gap. data : {`None`, `str`, `pandas.DataFrame`} Specify which data the observation profile should be taken from. Note, this is ignored if ``observation`` is `None`. data options: - `None` : The original data is used. - `str` : `data` is is expected to be a key in ``self.meta`` and the observation profile is taken from the data in the correesponding dict-value. - `pandas.DataFrame` : The dataframe is used to select the observation profile, where columns are observations and the rows are expected to be labelled by the node indices, :math:`0, 1, ..., n-1` where :math:`n` is the number of nodes in the graph. Returns ------- Lrw Transpose of the random-walk graph Laplacian matrix. """ G = self.G.copy() if observation is None: nx.set_node_weights(G, 1., name='weight') else: G = self.weighted_observation_network(observation, weight='weight', data=data) laplacian = utl.construct_anisotropic_laplacian_matrix(G, 'weight', use_spectral_gap=use_spectral_gap) return laplacian
[docs] def diffuse_profile(self, observation, times=None, t_min=-1.5, t_max=2.5, n_t=10, log_time=True, # graph_distances=None, laplacian=None, do_save=True): """ diffuse profile from original data. ..todo: get laplacian for observation or hop distance for all Parameters ---------- observation : `str` Observation profile to use. times : {`None`, array} Array of times to evaluate the diffusion simulation. Note, If given, ``t_min``, ``t_max`` and ``n_t`` are ignored. t_min : `float` First time point to evaluate the diffusion simulation. Note, ``t_min`` is ignored if ``times`` is not `None`. t_max : `float` Last time point to evaluate the diffusion simulation. Note, ``t_max`` must be greater than ``t_min``, i.e, ``t_max`` > ``t_min``. Note, ``t_max`` is ignored if ``times`` is not `None`. n_t : `int` Number of time points to generate. Note, ``n_t`` is ignored if ``times`` is not `None`. log_time : `bool` If `True`, return ``n_t`` numbers spaced evenly on a log scale, where the time sequence starts at ``10 ** t_min``, ends with ``10 ** t_max``, and the sequence of times if of the form ``10 ** t`` where ``t`` is the ``n_t`` evenly spaced points between (and including) ``t_min`` and ``t_max``. For example, ``_get_times(t_min=1, t_max=3, n_t=3, log_time=True) = array([10 ** 1, 10 ** 2, 10 ** 3]) = array([10., 100., 1000.])``. If `False`, return ``n_t`` numbers evenly spaced on a linear scale, where the sequence starts at ``t_min`` and ends with ``t_max``. For example, ``_get_times(t_min=1, t_max=3, n_t=3, log_time=False) = array([1. ,2., 3.])``. graph_distances : `numpy.ndarray`, (n, n) A matrix of node-pairwise graph distances between the :math:`n` nodes ordered by the rows in ``object.profiles``. laplacian : `numpy.ndarray`, (n, n) The transpose of the graph Laplacian matrix where the :math:`n` rows and columns ordered by the rows in ``object.profiles``. filename : `str` If not `None`, save results. do_save : `bool` If `True`, save diffused profile to `self.outdir` / 'diffused_profiles' / 'diffused_profile_{``observation``}.csv' Returns ------- profiles : `pandas.DataFrame` Diffused profiles where each row is a time and each column is a feature name. """ if do_save: if not (self.outdir / 'diffused_profiles').is_dir(): (self.outdir / 'diffused_profiles').mkdir() times = utl.get_times(times=times, t_min=t_min, t_max=t_max, n_t=n_t, log_time=log_time) times_with_zero = np.insert(times, 0, 0.0) profile = self.data[observation].loc[list(range(len(self.G)))].values # profile = profile[:, 0] # logger.msg(f"Profile shape = {profile.shape}.") profiles = np.inf * np.ones([len(times_with_zero), len(self.G)]) profiles[0] = profile for time_index in tqdm(range(len(times)), desc="Diffusing profile", colour="green", leave=False): logger.debug("---------------------------------") logger.debug("Step %s / %s", str(time_index), str(len(times))) logger.debug("Computing diffusion time 10^{:.1f}".format(np.log10(times[time_index]))) logger.debug("Computing measures") profile = utl.heat_kernel(profile, laplacian, times_with_zero[time_index + 1] - times_with_zero[time_index]) profiles[time_index+1] = profile profiles = pd.DataFrame(data=profiles, index=times_with_zero, # columns=list(range(len(self.G))), columns=[self.G.nodes[k]['name'] for k in range(len(self.G))]) if do_save: profiles.to_csv(self.outdir / 'diffused_profiles' / 'diffused_profile_{}.csv'.format(observation), header=True, index=True) return profiles
[docs] def diffuse_multiple_profiles(self, observations=None, times=None, t_min=-1.5, t_max=2.5, n_t=10, log_time=True, # graph_distances=None, laplacian=None, use_spectral_gap=False, do_plot=False, **plot_kwargs): """ diffuse multiple observation profiles from original data # get laplacian for observation or hop distance for all Parameters ---------- observations : {`None`, `list`} Observations to iterate over. If `None`, use all observations in data. times : {`None`, array} Array of times to evaluate the diffusion simulation. Note, If given, ``t_min``, ``t_max`` and ``n_t`` are ignored. t_min : `float` First time point to evaluate the diffusion simulation. Note, ``t_min`` is ignored if ``times`` is not `None`. t_max : `float` Last time point to evaluate the diffusion simulation. Note, ``t_max`` must be greater than ``t_min``, i.e, ``t_max`` > ``t_min``. Note, ``t_max`` is ignored if ``times`` is not `None`. n_t : `int` Number of time points to generate. Note, ``n_t`` is ignored if ``times`` is not `None`. log_time : `bool` If `True`, return ``n_t`` numbers spaced evenly on a log scale, where the time sequence starts at ``10 ** t_min``, ends with ``10 ** t_max``, and the sequence of times if of the form ``10 ** t`` where ``t`` is the ``n_t`` evenly spaced points between (and including) ``t_min`` and ``t_max``. For example, ``_get_times(t_min=1, t_max=3, n_t=3, log_time=True) = array([10 ** 1, 10 ** 2, 10 ** 3]) = array([10., 100., 1000.])``. If `False`, return ``n_t`` numbers evenly spaced on a linear scale, where the sequence starts at ``t_min`` and ends with ``t_max``. For example, ``_get_times(t_min=1, t_max=3, n_t=3, log_time=False) = array([1. ,2., 3.])``. graph_distances : `numpy.ndarray`, (n, n) A matrix of node-pairwise graph distances between the :math:`n` nodes ordered by the rows in ``object.profiles``. laplacian : `numpy.ndarray`, (n, n) The transpose of the graph Laplacian matrix where the :math:`n` rows and columns ordered by the rows in ``object.profiles``. If `None`, the observation-specific Laplacian is used. use_spectral_gap : `bool` Option to use spectral gap. Note, This is ignored if ``laplacian`` is provided and not `None`. filename : `str` If not `None`, do_plot : `bool` If `True`, plot diffused profiles for each observation. **plot_kwargs : `dict` Key-word arguments passed to ``plot_profiles`` (should not include ``title``). Notes ----- Side-effects : - Saves pandas DataFrame of the diffused profiles, where each row is a time and each column is a feature name, for each observation to the file name ``self.outdir`` / 'diffused_profiles' / 'diffused_profile_{``observation``}.csv' - If ``do_plot`` is `True`, plots the diffused profile for each observation. """ if observations is None: observations = self.data.columns.tolist() for observation in tqdm(observations, desc='Diffusing observation profiles'): if laplacian is None: s_laplacian = self.anisotropic_laplacian_matrix(observation=observation, use_spectral_gap=use_spectral_gap) else: s_laplacian = laplacian profiles = self.diffuse_profile(observation, times=times, t_min=t_min, t_max=t_max, n_t=n_t, log_time=log_time, laplacian=s_laplacian, do_save=True) if do_plot: ax = self.plot_profiles(profiles, title=observation+": diffused profile", **plot_kwargs)
[docs] def load_diffused_profile(self, observation): """ loads observation diffused profile Parameters ---------- observation : `str` Observation profile to load. Returns ------- diffused_profiles : `pandas.DataFrame` Diffused profiles where each row is a time and each column is a feature name. """ diffused_profiles = pd.read_csv(self.outdir / 'diffused_profiles' / 'diffused_profile_{}.csv'.format(observation), header=0, index_col=0) return diffused_profiles
[docs] def load_diffused_timepoint_profile(self, time, observations=None): """ loads observation diffused profile. Parameters ---------- time : `float` Timepoint in diffusion simulation to select. observations : {`None`, `list`} List of observations to iterate over. If `None`, all observations in ``self.data`` are used. Returns ------- diffused_profiles : `pandas.DataFrame` Diffused profiles at ``time``, for all ``observations``, where rows are nodes and columns are observations. """ if observations is None: observations = self.data.columns.tolist() diffused_profiles = [] for observation in tqdm(observations, desc='Loading diffused profiles at specified time'): profile = self.load_diffused_profile(observation) profile = profile.loc[time] profile.name = observation diffused_profiles.append(profile) diffused_profiles = pd.concat(diffused_profiles, axis=1) return diffused_profiles
[docs] def plot_profiles(self, profiles, ylog=False, ax=None, figsize=(5.3, 4), title="", lw=1.3, marker_size=2, **plot_kwargs): """ plot profiles, with rows as times and columns as features """ if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize) else: fig = ax.get_figure() times = profiles.index.tolist()[:] if times[0] < 1e-6: times[0] = 1e-6 for profile in profiles.values.T: if all(profile > 0): color = "olive" # "C0" else: color = "tan" # "C1" ax.plot(np.log10(times), profile, '-o', c=color, lw=lw, ms=marker_size, **plot_kwargs) if ylog: ax.set_xscale("symlog") ax.axhline(0, ls="--", c="k") ax.axis([np.log10(times[0]), np.log10(times[-1]), np.min(profiles.values), np.max(profiles.values)]) ax.set_xlabel(r"$log_{10}(t)$") ax.set_ylabel("diffused value") ax.set_title(title) return ax
[docs] def multiple_pairwise_observation_neighborhood_wass_distance(self, nodes=None, include_self=False, graph_distances=None, label='pw_obs_nbhd_wass_dist', desc='Computing pairwise 1-hop distances', profiles_desc='t0', proc=mp.cpu_count(), chunksize=None, measure_cutoff=1e-6, solvr=None): """ Compute observation-pairwise Wasserstein distances between the profiles over node neighborhood for all nodes. Parameters ---------- nodes : {`None`, `list` [`str`])} List of nodes to compute neighborhood distances on. If `None`, all genes with at least 2 neighbors is used. include_self : `bool` If `True`, add node in neighborhood which will result in computing normalized profile over the neighborhood. If `False`, node is not included in neighborhood which results in computing the transition distribution over the neighborhood. graph_distances : `numpy.ndarray`, (n, n) A matrix of node-pairwise graph distances between the :math:`n` nodes (ordered from :math:`0, 1, ..., n-1`). If `None`, use hop distance. label : str Label that resulting Wasserstein distances are saved in ``keeper.misc``. desc : `str` Description for progress bar. profiles_desc : str, default = 't0' Description of profiles used in name of file to store results. measure_cutoff : `float` Threshold for treating values in profiles as zero, default = 1e-6. proc : `int` Number of processor used for multiprocessing. (Default value = cpu_count()). chunksize : `int` Chunksize to allocate for multiprocessing. solvr : `str` Solver to pass to POT library for computing Wasserstein distance. Returns ------- wds : `pandas.DataFrame` Wasserstein distances between pairwise observations where rows are observation-pairs and columns are node names. This is saved in ``keeper.misc`` with the key ``label``. Notes ----- If ``object.outdir`` is not `None`, Wasserstein distances are saved to file every 10 iterations. Before starting the computation, check if the file exists. If so, load and remove already computed nodes from the iteration. Wasserstein distances are computed for the remaining nodes, combined with the previously computed and saved results before saving and returning the combined results. Only nodes with at least 2 neighbors are included, as leaf nodes will all have the same Wassserstein distance and do not provide any further information. To do: specify if nodes in input are ids or node names and check that loaded data has correct type int or str for nodes """ # check_matrix_nonnegative(self.data.data) if nodes is None: nodes = [k for k in self.G if len(list(self.G.neighbors(k)))>1] else: nodes = [self.name2node[k] for k in nodes if self.G.degree(self.name2node[k]) > 1] if self.outdir is None: fname = None wds_prior = None else: # fname = self.outdir / f"wass_dist_observation_pairwise_1hop_nbhd_profiles_{profiles_desc}_with{'' if include_self else 'out'}_self.csv" # fname = f"wass_dist_observation_pairwise_1hop_nbhd_profiles_{profiles_desc}_with{'' if include_self else 'out'}_self.csv" fname = f"{label}.csv" self.filenames.append(fname) if (self.outdir / fname).is_file(): wds_prior = pd.read_csv(self.outdir / fname, header=0, index_col=(0, 1)) # logger.msg(f"Loaded saved observation pairwise 1-hop neighborhood Wasserstein distances from {profiles_desc} profiles of size {wds_prior.shape}.") logger.msg(f"Loaded saved observation pairwise 1-hop neighborhood Wasserstein distances from {label} profiles of size {wds_prior.shape}.") n_orig = len(nodes) nodes = [k for k in nodes if self.G.nodes[k]['name'] not in wds_prior.columns] n_update = len(nodes) if n_update < n_orig: if n_update == 0: # logger.msg(f"Loading observation pairwise 1-hop neighborhood Wasserstein distances from {profiles_desc} profiles.") # return wds_prior self.keeper.add_misc(wds_prior, label) return None # logger.msg(f"Computing observation pairwise 1-hop neighborhood Wasserstein distances from {profiles_desc} profiles on {n_update}/{n_orig} nodes.") logger.msg(f"Computing observation pairwise 1-hop neighborhood Wasserstein distances from {label} profiles on {n_update}/{n_orig} nodes.") else: wds_prior = None wds = [] # logger.msg(f"Computing Wasserstein distances on {len(nodes)} neighborhoods.") for ix, node in tqdm(enumerate(nodes), desc=desc, colour='yellow', total=len(nodes)): tmp = self.pairwise_observation_neighborhood_wass_distance(node, include_self=include_self, graph_distances=graph_distances, proc=proc, chunksize=chunksize, measure_cutoff=measure_cutoff, solvr=solvr) tmp.name = node wds.append(tmp) if (ix % 10 == 0) and (self.outdir is not None): wds_tmp = pd.concat(wds, axis=1) wds_tmp = wds_tmp.rename(columns=nx.get_node_attributes(self.G, 'name')) if wds_prior is not None: wds_prior = pd.concat([wds_prior, wds_tmp], axis=1) else: wds_prior = wds_tmp wds_prior.to_csv(str(self.outdir / fname), header=True, index=True) wds = [] if wds: wds = pd.concat(wds, axis=1) wds = wds.rename(columns=nx.get_node_attributes(self.G, 'name')) else: wds = None if wds_prior is not None: wds = pd.concat([wds_prior, wds], axis=1) if self.outdir is not None: wds.to_csv(str(self.outdir / fname), header=True, index=True) # logger.msg(f"Observation pairwise 1-hop neighborhood Wasserstein distances on {profiles_desc} saved to {str(fname)}.") logger.msg(f"Observation pairwise 1-hop neighborhood Wasserstein distances on {label} saved to {str(fname)}.") self.keeper.add_misc(wds, label) # return wds return None
[docs] def multiple_pairwise_observation_neighborhood_euc_distance(self, nodes=None, include_self=False, label='pw_obs_nbhd_euc_dist', desc='Computing pairwise 1-hop distances', profiles_desc='t0', metric='euclidean', normalize=False, **kwargs): """ Compute observation-pairwise Euclidean distances between the profiles over node neighborhood for all nodes. Parameters ---------- nodes : {`None`, `list`, [`int`]} List of nodes to compute neighborhood distances on. If `None`, all nodes with at least 2 neighbors are used. include_self : `bool` If `True`, add node in neighborhood which will result in computing normalized profile over the neighborhood. If `False`, node is not included in neighborhood which results in computing the transition distribution over the neighborhood. label : str Label that resulting Euclidean distances are saved in ``keeper.misc``. desc : `str` Description for progress bar. profiles_desc : `str`, default = "t0" Description of profiles used in name of file to store results. metric : `str` The metric used to compute the distance, passed to scipy.spatial.distance.cdist. normalize : `bool` If `True`, normalize neighborhood profiles to sum to 1. **kwargs : `dict` Extra arguments to metric, passed to `scipy.spatial.distance.cdist`. Returns ------- eds : `pandas.DataFrame` Euclidean distances between pairwise observations where rows are observation-pairs and columns are node names. This is saved in ``keeper.misc`` with the key ``label``. Notes ----- If ``object.outdir`` is not `None`, Euclidean distances are saved to file. Before starting the computation, check if the file exists. If so, load and remove already computed nodes from the iteration. Wasserstein distances are computed for the remaining nodes, combined with the previously computed and saved results before saving and returning the combined results. """ if nodes is None: nodes = [k for k in self.G if len(list(self.G.neighbors(k)))>1] else: nodes = [self.name2node[k] for k in nodes if self.G.degree(self.name2node[k]) > 1] if self.outdir is None: fname = None eds_prior = None else: # if normalize: # fname = f"euc_dist_observation_pairwise_1hop_nbhd_profiles_{profiles_desc}_with{'' if include_self else 'out'}_self_normalized.csv" # else: # fname = f"euc_dist_observation_pairwise_1hop_nbhd_profiles_{profiles_desc}_with{'' if include_self else 'out'}_self.csv" fname = f"{label}.csv" self.filenames.append(fname) if (self.outdir / fname).is_file(): eds_prior = pd.read_csv(self.outdir / fname, header=0, index_col=(0, 1)) # logger.msg(f"Loaded saved observation pairwise 1-hop neighborhood Euclidean distances from {profiles_desc} profiles of size {eds_prior.shape}.") logger.msg(f"Loaded saved observation pairwise 1-hop neighborhood Euclidean distances from {label} profiles of size {eds_prior.shape}.") n_orig = len(nodes) nodes = [k for k in nodes if self.G.nodes[k]['name'] not in eds_prior.columns] n_update = len(nodes) if n_update < n_orig: if n_update == 0: # return eds_prior self.keeper.add_misc(eds_prior, label) return None # logger.msg(f"Computing observation pairwise 1-hop neighborhood Euclidean distances from {profiles_desc} profiles on {n_update}/{n_orig} nodes.") logger.msg(f"Computing observation pairwise 1-hop neighborhood Euclidean distances from {label} profiles on {n_update}/{n_orig} nodes.") else: eds_prior = None eds = [] for node in tqdm(nodes, desc=desc, colour='yellow', total=len(nodes)): tmp = self.pairwise_observation_neighborhood_euc_distance(node, include_self=include_self, metric=metric, normalize=normalize, **kwargs) tmp.name = node eds.append(tmp) if eds: eds = pd.concat(eds, axis=1) eds = eds.rename(columns=nx.get_node_attributes(self.G, 'name')) else: eds = None if eds_prior is not None: eds = pd.concat([eds_prior, eds], axis=1) if self.outdir is not None: eds.to_csv(self.outdir / fname, header=True, index=True) # logger.msg(f"Observation pairwise 1-hop neighborhood Euclidean distances on {profiles_desc} saved to {str(fname)}.") logger.msg(f"Observation pairwise 1-hop neighborhood Euclidean distances on {label} saved to {str(fname)}.") self.keeper.add_misc(eds, label) # return eds return None
[docs] def pairwise_observation_profile_wass_distance(self, features=None, graph_distances=None, label='wass_dist_observation_pairwise_profiles_t0', desc='Computing pairwise profile Wasserstein distances', proc=mp.cpu_count(), chunksize=None, measure_cutoff=1e-6, solvr=None): """ Compute observation-pairwise Wasserstein distances between the profiles over selected features. Parameters ---------- features : {`None`, `list` [`str`])} List of features to compute profile distances on. If `None`, all features are used. graph_distances : `numpy.ndarray`, (n, n) A matrix of node-pairwise graph distances between the :math:`n` nodes (ordered from :math:`0, 1, ..., n-1`). If `None`, use hop distance. label : str Label that resulting Wasserstein distances are saved in ``keeper.distances`` and name of file to store stacked results. desc : `str` Description for progress bar. measure_cutoff : `float` Threshold for treating values in profiles as zero, default = 1e-6. proc : `int` Number of processor used for multiprocessing. (Default value = cpu_count()). chunksize : `int` Chunksize to allocate for multiprocessing. solvr : `str` Solver to pass to POT library for computing Wasserstein distance. Returns ------- wds : `pandas.DataFrame` Wasserstein distances between pairwise profiles where rows are observation-pairs and columns are node names. This is saved in ``keeper.distances`` with the key ``label``. Notes ----- If ``object.outdir`` is not `None`, Wasserstein distances are saved to file every 10 iterations. Before starting the computation, check if the file exists. If so, load and remove already computed nodes from the iteration. Wasserstein distances are computed for the remaining nodes, combined with the previously computed and saved results before saving and returning the combined results. Only nodes with at least 2 neighbors are included, as leaf nodes will all have the same Wassserstein distance and do not provide any further information. To do: specify if nodes in input are ids or node names and check that loaded data has correct type int or str for nodes SAVED TO DISTANCES """ # check_matrix_nonnegative(self.data.data) if features is None: features = list(self.G) else: features = [self.name2node[k] for k in features] pw_obs = list(combinations(self.observations, 2)) logger.info(f">>> computing {len(pw_obs)} pw-obs distances") if self.outdir is None: fname = None wds_prior = None else: # fname = self.outdir / f"wass_dist_observation_pairwise_1hop_nbhd_profiles_{profiles_desc}_with{'' if include_self else 'out'}_self.csv" fname = f"{label}.csv" self.filenames.append(fname) if (self.outdir / fname).is_file(): wds_prior = pd.read_csv(self.outdir / fname, header=0, index_col=(0, 1)) logger.msg(f"Loaded saved observation pairwise profile Wasserstein distances from {label} profiles of size {wds_prior.shape}.") n_orig = len(pw_obs) # logger.msg(f">>> n_orig = {n_orig}") # pw_obs = [k for k in pw_obs if k not in set(wds_prior.index)] # n_update = len(pw_obs) # logger.msg(f">>> n_update / n_orig = {n_update} / {n_orig}.") # if n_update < n_orig: # if n_update == 0: if wds_prior.shape[0] == len(pw_obs): if True: # logger.msg(f"Loading observation pairwise profile Wasserstein distances from {label}.csv.") # return wds_prior # self.keeper.add_misc(wds_prior, label) logger.msg(f">>> WD TO UPDATE") if isinstance(wds_prior, pd.DataFrame): wds_prior = wds_prior[wds_prior.columns[0]] logger.msg(f">>> WD to stacked distance") self.keeper.add_stacked_distance(wds_prior, label) logger.msg(f">>> stacked distance updated - should return now.") return None # logger.msg(f"Computing observation pairwise profile Wasserstein distances between {n_update}/{n_orig} pairwise observations.") else: wds_prior = None profiles = self.data.subset(features=features) if graph_distances is None: logger.msg("Computing graph hop distances.") # TO DO: SHOULDN"T COMPUTE ALL DISTANCES HERE graph_distances = self.compute_graph_distances(weight=None) graph_distances = graph_distances[np.ix_(features, features)] # logger.msg(f">>> {len(pw_obs)} pw-obs left to compute") wds = pairwise_observation_wass_distances(profiles, graph_distances, proc=proc, pairwise_obs_list=pw_obs, chunksize=chunksize, measure_cutoff=measure_cutoff, solvr=solvr, flag=f"pairwise-profiles") wds.name = 'WD' if wds_prior is not None: wds = pd.concat([wds_prior, wds], axis=0) # only save if new wds were computed: if self.outdir is not None: wds.to_csv(str(self.outdir / fname), header=True, index=True) logger.msg(f"Observation pairwise profile Wasserstein distances saved to {str(fname)}.") # logger.msg(f">>> wds made it to here with shape {wds.shape}.") ## self.keeper.add_misc(wds, label) # ensure wds is a Series and not a DataFrame # logger.msg(f">>> wds is a {type(wds)}") if isinstance(wds, pd.DataFrame): wds = wds[wds.columns[0]] # logger.msg(f">>> wds is now a {type(wds)}") # logger.msg(f">>> ABOUT TO SAVE WDS TO KEEPER AS STACKED DISTANCE: type={type(wds)}, size={wds.shape}.") # logger.msg(f">>> {help(self.keeper.add_stacked_distance)}.") self.keeper.add_stacked_distance(wds, label) # return wds return None
[docs] def pairwise_observation_profile_euc_distance(self, features=None, label='euc_dist_observation_pairwise_profiles_t0', desc='Computing pairwise profile Euclidean distances', metric='euclidean', normalize=False, **kwargs): """ Compute observation-pairwise Euclidean distances between the profiles over selected features. Parameters ---------- features : {`None`, `list`, [`int`]} List of features to compute profile distances on. If `None`, all features are used. label : str Label that resulting Euclidean distances are saved in ``keeper.distances`` and name of file to store stacked results. desc : `str` Description for progress bar. metric : `str` The metric used to compute the distance, passed to scipy.spatial.distance.cdist. normalize : `bool` If `True`, normalize neighborhood profiles to sum to 1. **kwargs : `dict` Extra arguments to metric, passed to `scipy.spatial.distance.cdist`. Returns ------- eds : `pandas.DataFrame` Euclidean distances between pairwise observations where rows are observation-pairs and columns are node names. This is saved in ``keeper.distances`` with the key ``label``. Notes ----- If ``object.outdir`` is not `None`, Euclidean distances are saved to file. Before starting the computation, check if the file exists. If so, load and remove already computed nodes from the iteration. Euclidean distances are computed for the remaining nodes, combined with the previously computed and saved results before saving and returning the combined results. SAVES TO DISTANCES """ if features is None: features = list(self.features) # list(self.G) else: features = [self.name2node[k] for k in features] pw_obs = list(combinations(self.observations, 2)) if self.outdir is None: fname = None eds_prior = None else: if normalize: fname = f"{label}_self_normalized.csv" else: fname = f"{label}.csv" self.filenames.append(fname) if (self.outdir / fname).is_file(): eds_prior = pd.read_csv(self.outdir / fname, header=0, index_col=(0, 1)) logger.msg(f"Loaded saved observation pairwise profile Euclidean distances from {label} profiles of size {eds_prior.shape}.") n_orig = len(pw_obs) # pw_obs = [k for k in pw_obs if k not in set(eds_prior.index)] # n_update = len(pw_obs) # if n_update < n_orig: # if n_update == 0: if eds_prior.shape[0] == n_orig: if True: # return eds_prior # self.keeper.add_misc(eds_prior, label) if isinstance(eds_prior, pd.DataFrame): eds_prior = eds_prior[eds_prior.columns[0]] self.keeper.add_stacked_distance(eds_prior, label) return None # logger.msg(f"Computing observation pairwise profile Euclidean distances between {n_update}/{n_orig} pairwise observations.") # else: # eds_prior = None # if len(pw_obs)>0: profiles = self.data.subset(features=features) if normalize: profiles = profiles / profiles.sum(axis=0) eds = pairwise_observation_euc_distances(profiles, metric=metric, **kwargs) # line 1025 eds.name = 'ED' # if self.outdir is not None: # eds.to_csv(self.outdir / fname, header=True, index=True) # logger.msg(f"Observation pairwise profile Euclidean distances saved to {str(fname)}.") # if eds_prior is not None: # eds = pd.concat([eds_prior, eds], axis=0) # if self.outdir is not None: # eds.to_csv(self.outdir / fname, header=True, index=True) # logger.msg(f"Observation pairwise profile Euclidean distances saved to {str(fname)}.") # ensure wds is a Series and not a DataFrame if isinstance(eds, pd.DataFrame): eds = eds[eds.columns[0]] # logger.msg(f">>> Euc - type(eds) = {type(eds)}") # logger.msg(f">>> ABOUT TO SAVE EUC TO KEEPER AS STACKED DISTANCE: type={type(eds)}, size={eds.shape}.") self.keeper.add_stacked_distance(eds, label) # return eds return None
# if __name__ == '__main__': # G = nx.karate_club_graph() # data = pd.DataFrame(data=np.abs(np.random.randn(len(G), 9)) + 0.001, # # data=np.array([[1.]*len(G), [2.]*len(G), [3.]*len(G), # # [4.]*len(G), [5.]*len(G), [6.]*len(G), # # [7.]*len(G), [8.]*len(G), [9.]*len(G)]).transpose(), index=list(G), # columns=[f"x{k}" for k in range(9)]) # logger.msg(f"Data is size {data.shape[0]} x {data.shape[1]}.") # inet = InfoNet(G, data, './') # g = inet.weighted_observation_network('x2', weight='weight') # print(nx.get_node_attributes(g, 'weight')) # dhop = inet.compute_graph_distances(weight=None) # print(dhop.shape) # # print(inet.neighborhood_profiles(16, include_self=False)) # # logger.msg(f"Wass = {wass_distance(('x1', 'x2'), data, dhop, measure_cutoff=1e-6)}.") # # wds16 = inet.pairwise_observation_neighborhood_wass_distance(16, include_self=False, graph_distances=dhop) # , graph_distances=dhop, proc=mp.cpu_count(), chunksize=None) # # print(wds16) # wds, eds = [], [] # for gn in [k for k in G if len(list(G.neighbors(k)))>1]: # tmp = inet.pairwise_observation_neighborhood_wass_distance(gn, include_self=False, graph_distances=dhop, measure_cutoff=1e-6) # tmp.name = gn # wds.append(tmp) # tmp = inet.pairwise_observation_neighborhood_euc_distance(gn, include_self=False, metric='euclidean') # tmp.name=gn # eds.append(tmp) # wds = pd.concat(wds, axis=1) # eds = pd.concat(eds, axis=1) # print("ranges: ", wds.max().max(), wds.min().min(), eds.max().max(), eds.min().min()) # print(wds.head()) # wds_a = inet.multiple_pairwise_observation_neighborhood_wass_distance(data=None, # graph_distances=dhop, desc='Computing pairwise 1-hop distances', # profiles_desc='t0', # proc=mp.cpu_count(), chunksize=None, # measure_cutoff=1e-6, solvr=None) # print(f"And automated wds: ") # print("ranges: ", wds_a.max().max(), wds_a.min().min()) # print(wds_a.head()) # # print(wds.corr(method='spearman')) # # import seaborn as sns # # import matplotlib.pyplot as plt # # sns.clustermap(wds.corr(method='spearman'), method='ward', cmap='RdBu', center=0) # # plt.gcf().suptitle('Spearman corr wds'); # # sns.clustermap(eds.corr(method='spearman'), method='ward', cmap='RdBu', center=0) # # plt.gcf().suptitle('Spearman corr eds'); # # sns.clustermap(1. - (wds / eds), method='ward', cmap='RdBu', center=0) # # plt.gcf().suptitle('Observation curvatures'); # # controlled distance ranges # # wds2 = wds / wds.max().max() # # eds2 = eds / eds.max().max() # # sns.clustermap(1. - (wds2 / eds2), method='ward', cmap='RdBu', center=0) # # plt.gcf().suptitle('Observation scaled curvatures'); # plt.show() # # compute feature distance # import seaborn as sns # import netflow.pseudotime as nfp # import netflow.utils as nfu # print("Computing feature distances:") # wf = pd.Series(data=nfp.norm_features(wds, method='L2'), # index=wds.index.copy()) # print(wf.shape, wf, sep='\n') # A_wf = nfu.unstack_triu_(wf, index=data.columns.tolist()) # print(A_wf) # sns.heatmap(A_wf) # plt.show()