Source code for sccellfie.expression.smoothing

import numpy as np
import scipy.sparse as sp

from tqdm import tqdm


def get_smoothing_matrix(adata, mode, neighbors_key='neighbors'):
    """
    Calculates the smoothing matrix S based on the nearest neighbor graph in adata.obsp.

    Parameters
    ----------
    adata : AnnData object
        Annotated data matrix containing the nearest neighbor graph.

    mode : str
        The mode for calculating the smoothing matrix. Can be either 'adjacency' or 'connectivity'.

    neighbors_key : str, optional (default: 'neighbors')
        The key in adata.uns where the information about the pre-run KNN analysis was stored.
        This key points to a dictionary containing the 'connectivities_key', 'distances_key',
        and 'params' from the analysis.

    Returns
    -------
    S : numpy.ndarray or scipy.sparse.csr_matrix
        The smoothing matrix S such that S @ X smoothes the signal X over neighbors.
        If the input matrix A is sparse, S will be returned as a scipy.sparse.csr_matrix.
        Otherwise, S will be a numpy.ndarray.

    Raises
    ------
    ValueError
        If an unknown mode is provided.
    """
    if mode == 'adjacency':
        distances_key = adata.uns[neighbors_key]['distances_key']
        A = (adata.obsp[distances_key] > 0).astype(int)
    elif mode == 'connectivity':
        connectivities_key = adata.uns[neighbors_key]['connectivities_key']
        A = adata.obsp[connectivities_key]
    else:
        raise ValueError(f'unknown mode {mode}')

    # Normalize the smoothing matrix
    if not sp.issparse(A):
        norm_vec = A.sum(axis=1)
        norm_vec[norm_vec == 0] = 1  # Avoid division by zero
        S = A / norm_vec[:, np.newaxis]
    else:
        # Memory-efficient sparse normalization
        A = A.tocsr()
        norm_vec = np.array(A.sum(axis=1)).flatten()
        norm_vec[norm_vec == 0] = 1  # Avoid division by zero

        # Use sparse diagonal matrix for row-wise scaling
        D_inv = sp.diags(1.0 / norm_vec, format='csr')
        S = D_inv @ A

    return S


[docs] def smooth_expression_knn(adata, key_added='smoothed_X', neighbors_key='neighbors', mode='connectivity', alpha=0.33, n_chunks=None, chunk_size=None, use_raw=False, disable_pbar=False): """ Smooths expression values based on KNNs of single cells using Scanpy. Parameters ---------- adata : AnnData object Annotated data matrix containing the expression data and nearest neighbor graph. key_added : str, optional (default: 'smoothed_X') The key in adata.layers where the smoothed expression matrix will be stored. neighbors_key : str, optional (default: 'neighbors') The key in adata.uns where the information about the pre-run KNN analysis was stored. This key points to a dictionary containing the 'connectivities_key', 'distances_key', and 'params' from the analysis. mode : str, optional (default: 'connectivity') The mode for calculating the smoothing matrix. Can be either 'adjacency' or 'connectivity'. alpha : float, optional (default: 0.33) The weight or fraction of the smoothed expression to use in the final expression matrix. The final expression matrix is computed as (1 - alpha) * X + alpha * (S @ X), where X is the original expression matrix and S is the smoothed matrix. n_chunks : int, optional (default: None) The number of chunks to split the cells into for processing. If not provided, chunk_size is used. chunk_size : int, optional (default: None) The size of each chunk of cells to process. If not provided, n_chunks is used. use_raw: bool, optional (default: False) Whether to use the raw data stored in adata.raw.X. disable_pbar: bool, optional (default: False) Whether to disable the progress bar. Returns ------- None The smoothed expression matrix is stored in adata.layers[key_added]. Notes ----- This function smoothes the expression values of single cells based on their K-nearest neighbors (KNNs) using the Scanpy package. The smoothing is performed by calculating a smoothing matrix S based on the nearest neighbor graph and then computing the smoothed expression as (1 - alpha) * X + alpha * (S @ X), where X is the original expression matrix. The smoothing is performed in chunks to reduce memory usage. The number of chunks or the chunk size can be specified using the n_chunks or chunk_size parameters, respectively. The smoothed expression matrix is stored in adata.layers[key_added]. """ # Get the connectivities matrix connectivities = adata.obsp[adata.uns[neighbors_key]['connectivities_key']] # Determine the number of chunks and chunk size n_cells = adata.n_obs if n_chunks is None and chunk_size is None: n_chunks = 1 # Default number of chunks if n_chunks is not None: chunk_size = int(np.ceil(n_cells / n_chunks)) else: n_chunks = int(np.ceil(n_cells / chunk_size)) # Initialize the smoothed expression matrix if use_raw: X = adata.raw.X else: X = adata.X if isinstance(X, sp.coo_matrix): X = X.tocsr() # Collect smoothed chunks for memory efficiency smoothed_chunks = [] # Iterate over chunks of cells for i in tqdm(range(n_chunks), disable=disable_pbar, desc='Smoothing Expression'): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, n_cells) # Get the connectivities for the current chunk chunk_connectivities = connectivities[start_idx:end_idx, :] # Find the unique neighbor indices for the current chunk neighbor_indices = np.unique(chunk_connectivities.nonzero()[1]) # Create a set of cell indices including the chunk and its neighbors chunk_and_neighbors = np.union1d(np.arange(start_idx, end_idx), neighbor_indices) # Subset the adata based on the chunk and neighbor indices subset_adata = adata[chunk_and_neighbors, :] # Get the smoothing matrix for the current chunk and its neighbors smoothing_mat = get_smoothing_matrix(subset_adata, mode, neighbors_key=neighbors_key) # Get the expression data for the current chunk and its neighbors chunk_expression = X[chunk_and_neighbors, :] # Compute the expression data purely based on cell neighbors chunk_smoothed = smoothing_mat @ chunk_expression # Extract the smoothed expression for the cells in the current chunk chunk_indices = np.arange(start_idx, end_idx) subset_mapping = dict(zip(chunk_and_neighbors, range(len(chunk_and_neighbors)))) smoothed_chunk_indices = [subset_mapping[i] for i in chunk_indices] # Smooth by alpha if sp.issparse(X): X_chunk = X[chunk_indices, :] smoothed_result = (1. - alpha) * X_chunk + alpha * chunk_smoothed[smoothed_chunk_indices, :] else: smoothed_result = (1. - alpha) * X[chunk_indices, :] + alpha * chunk_smoothed[smoothed_chunk_indices, :] smoothed_chunks.append(smoothed_result) # Store the smoothed expression matrix in adata.layers if sp.issparse(X): smoothed_matrix = sp.vstack(smoothed_chunks, format='csr') else: smoothed_matrix = np.vstack(smoothed_chunks) adata.layers[key_added] = smoothed_matrix