Source code for sccellfie.communication.colocalization_scoring
import numpy as np
from scipy.spatial import distance
from scipy.stats import pearsonr
from sccellfie.expression.aggregation import AGG_FUNC
[docs]
def compute_local_colocalization_scores(adata, var1, var2, neighbors_radius, method='pairwise_concordance', spatial_key='X_spatial',
min_neighbors=3, threshold1=None, threshold2=None, score_key=None, inplace=True):
"""
Computes local colocalization scores between two variables for each spatial spot.
Parameters
----------
adata : AnnData
AnnData object containing expression data and spatial coordinates.
var1 : str
Name of first variable to analyze.
var2 : str
Name of second variable to analyze.
neighbors_radius : float
Radius for assigning a neighborhood of a spot
(neighbors within this radius are considered, and the sport is the center).
method : str, optional (default: 'pairwise_concordance')
Method to compute colocalization:
- 'correlation': Local Pearson correlation between var1 and var2 across spot & neighbors.
- 'concordance': Compute the fraction of spots where both genes are expressed above their thresholds.
- 'pairwise_concordance': Compute the fraction of spot pairs in the neighborhood where var1 and var2 are expressed above their thresholds in sport 1 and 2, respectively.
- 'cosine': Local cosine similarity between var1 and var2 across spot & neighbors.
- 'weighted_gmean': Local weighted geometric mean across spot & neighbors (weighted by distance).
- 'regularized_weighted_gmean': Local regularized and weighted geometric mean across spot & neighbors (weighted by distance).
spatial_key : str, optional (default: 'spatial')
Key in adata.obsm containing spatial coordinates
min_neighbors : int, optional (default: 3)
Minimum number of neighbors required for computing score. If less neighbors are found, score is NaN.
threshold1 : float, optional (default: None)
Threshold for var1. If None, the mean of var1 is used.
threshold2 : float, optional (default: None)
Threshold for var2. If None, the mean of var2 is used.
score_key : str, optional (default: None)
Key to store the computed colocalization scores in adata.obs. If None, a default key is used.
inplace : bool, optional (default: True)
If True, the computed scores are added to adata.obs. Otherwise, the scores are returned as a numpy array.
Returns
-------
numpy.ndarray
Array of colocalization scores for each spot
"""
# Extract spatial coordinates and expression values
coords = adata.obsm[spatial_key]
expr1 = adata[:, var1].X.toarray().flatten()
expr2 = adata[:, var2].X.toarray().flatten()
# Normalize expression values to [0,1] range
expr1_norm = (expr1 - np.min(expr1)) / (np.max(expr1) - np.min(expr1))
expr2_norm = (expr2 - np.min(expr2)) / (np.max(expr2) - np.min(expr2))
# Compute pairwise distances between spots
dist_matrix = distance.pdist(coords)
dist_matrix = distance.squareform(dist_matrix)
# Initialize scores array
scores = np.zeros(len(adata)) * np.nan
if method == 'correlation':
# Local Pearson correlation within radius for each spot
for i in range(len(adata)):
neighbors = np.where(dist_matrix[i] <= neighbors_radius)[0]
if len(neighbors) >= min_neighbors:
score, _ = pearsonr(expr1[neighbors], expr2[neighbors])
scores[i] = score
elif method == 'concordance':
if threshold1 is None:
threshold1 = AGG_FUNC['mean'](expr1, axis=None)
if threshold2 is None:
threshold2 = AGG_FUNC['mean'](expr2, axis=None)
# Compute local concordance of expression patterns
for i in range(len(adata)):
neighbors = np.where(dist_matrix[i] <= neighbors_radius)[0] # This should already include itself
if len(neighbors) >= min_neighbors:
# Compare if both genes are similarly high in neighborhood
concordant = np.sum(
((expr1[neighbors] > threshold1) & (expr2[neighbors] > threshold2))
)
scores[i] = concordant / len(neighbors)
elif method == 'pairwise_concordance':
if threshold1 is None:
threshold1 = AGG_FUNC['mean'](expr1, axis=None)
if threshold2 is None:
threshold2 = AGG_FUNC['mean'](expr2, axis=None)
# Compute pairwise concordance for each spot's neighborhood
for i in range(len(adata)):
neighbors = np.where(dist_matrix[i] <= neighbors_radius)[0]
if len(neighbors) >= min_neighbors:
concordant_pairs = 0
total_pairs = 0
# Compare each pair of spots in the neighborhood
for idx1 in neighbors:
for idx2 in neighbors:
# Check if var1 in first spot and var2 in second spot exceed thresholds
if expr1[idx1] > threshold1 and expr2[idx2] > threshold2:
concordant_pairs += 1
total_pairs += 1
scores[i] = concordant_pairs / total_pairs
elif method == 'cosine':
# Compute product of normalized intensities in neighborhood
for i in range(len(adata)):
neighbors = np.where(dist_matrix[i] <= neighbors_radius)[0]
if len(neighbors) >= min_neighbors:
# Weight by distance
weights = 1 / (1 + dist_matrix[i][neighbors])
weights = weights / np.sum(weights)
# Compute weighted product of normalized intensities
local_score = np.sum(
weights * expr1_norm[neighbors] * expr2_norm[neighbors]
) / (np.sqrt(np.sum(weights * expr1_norm[neighbors] * expr1_norm[neighbors])) * np.sqrt(
np.sum(weights * expr2_norm[neighbors] * expr2_norm[neighbors])))
scores[i] = local_score
elif method in ['weighted_gmean', 'regularized_weighted_gmean']:
# Compute product of normalized intensities in neighborhood
for i in range(len(adata)):
neighbors = np.where(dist_matrix[i] <= neighbors_radius)[0]
if len(neighbors) >= min_neighbors:
# Weight by distance
weights = 1 / (1 + dist_matrix[i][neighbors])
weights = weights / np.sum(weights)
# Compute weighted product of normalized intensities
local_score = np.sum(
weights * np.sqrt(expr1_norm[neighbors] * expr2_norm[neighbors])
)
scores[i] = local_score
if method == 'regularized_weighted_gmean':
scores = scores / (scores + AGG_FUNC['mean'](scores, axis=None))
else:
raise ValueError(f"Unknown method: {method}")
# Add scores to adata
if inplace:
if score_key is None:
score_key = f'{var1}^{var2}'
adata.obs[score_key] = scores
else:
return scores