Source code for sccellfie.external.tensor

import pandas as pd
import numpy as np
from tqdm import tqdm
from sccellfie.expression.aggregation import agg_expression_cells


[docs] def sccellfie_to_tensor(preprocessed_db, sample_key, celltype_key, score_type='metabolic_tasks', min_cells_per_group=1, agg_func='trimean', layer=None, gene_symbols=None, top_percent=10, exclude_zeros=False, use_raw=False, threshold=None, order_labels=None, sort_elements=True, context_order=None, fill_value=np.nan, verbose=True ): """ Converts scCellFie scores to format compatible with cell2cell's PreBuiltTensor constructor. This function builds a 3D tensor with dimensions: [Contexts/Samples, Cell Types, Metabolic Features] Parameters ---------- preprocessed_db : dict Output from run_sccellfie_pipeline containing 'adata' with metabolic_tasks and/or reactions attributes. sample_key : str Column name in adata.obs for grouping by samples/contexts. celltype_key : str Column name in adata.obs for cell type annotations. score_type : str, optional (default: 'metabolic_tasks') Which scCellFie scores to use. Options: 'metabolic_tasks', 'reactions'. min_cells_per_group : int, optional (default: 1) Minimum number of cells required per group (sample x celltype) to be included in analysis. agg_func : str, optional (default: 'trimean') Aggregation function to apply within cell groups. Options: 'mean', 'median', '25p', '75p', 'trimean', 'topmean', 'fraction_above'. layer : str, optional (default: None) Layer name to use for aggregation. If None, uses the main .X matrix. gene_symbols : str or list, optional (default: None) Specific features to include in analysis. If None, all features are used. top_percent : float, optional (default: 10) Percentage of top values for 'topmean' aggregation (0-100). exclude_zeros : bool, optional (default: False) Whether to exclude zeros when aggregating values. use_raw : bool, optional (default: False) Whether to use raw data for aggregation. threshold : float, optional (default: None) Expression threshold for 'fraction_above' aggregation. order_labels : list, optional (default: None) Labels for each dimension of the tensor. Default: ['Contexts', 'Cell Types', 'Metabolic Features'] sort_elements : bool, optional (default: True) Whether to alphabetically sort elements in each dimension. context_order : list, optional (default: None) Custom order for contexts. If provided, contexts won't be sorted. fill_value : float, optional (default: numpy.nan) Value to fill when a feature or cell type is missing in a context. verbose : bool, optional (default: True) Whether to print information about the analysis. Returns ------- prebuilt_tensor_args : dict A dictionary containing all arguments needed for PreBuiltTensor constructor: - 'tensor': numpy array with shape (n_contexts, n_celltypes, n_features) - 'order_names': list of lists with names for each dimension - 'order_labels': list of dimension labels - 'mask': mask for missing values (if applicable) - 'loc_nans': locations of NaN values Notes ----- This function aggregates single-cell metabolic scores into cell type-level summaries across different contexts (samples, conditions, timepoints, etc.) and creates a tensor suitable for tensor decomposition analysis. The aggregation is performed using scCellFie's robust aggregation methods, which handle various statistical measures and can exclude zeros or use specific thresholds. Examples -------- >>> # Convert scCellFie metabolic tasks to tensor format >>> tensor_args = sccellfie_to_tensor( ... preprocessed_db, ... sample_key='condition', ... celltype_key='cell_type', ... score_type='metabolic_tasks', ... agg_func='mean' ... ) >>> >>> # Create PreBuiltTensor >>> from cell2cell.tensor import PreBuiltTensor >>> tensor = PreBuiltTensor(**tensor_args) """ # Extract appropriate scCellFie data if 'adata' not in preprocessed_db: raise ValueError("preprocessed_db must contain 'adata' key") adata = preprocessed_db['adata'] # Select the appropriate scCellFie results if score_type == 'metabolic_tasks': if not hasattr(adata, 'metabolic_tasks'): raise ValueError("AnnData object must have 'metabolic_tasks' attribute. " "Run scCellFie pipeline first.") score_adata = adata.metabolic_tasks feature_type = 'metabolic_task' elif score_type == 'reactions': if not hasattr(adata, 'reactions'): raise ValueError("AnnData object must have 'reactions' attribute. " "Run scCellFie pipeline first.") score_adata = adata.reactions feature_type = 'reaction' else: raise ValueError("score_type must be either 'metabolic_tasks' or 'reactions'") if verbose: print(f"Using {score_type} with {score_adata.shape[1]} features and {score_adata.shape[0]} cells") # Validate required keys if sample_key not in score_adata.obs.columns: raise ValueError(f"'{sample_key}' not found in adata.obs") if celltype_key not in score_adata.obs.columns: raise ValueError(f"'{celltype_key}' not found in adata.obs") # Get unique elements and determine order samples = score_adata.obs[sample_key].unique() all_features = score_adata.var_names.tolist() # Apply gene_symbols filter if specified if gene_symbols is not None: if isinstance(gene_symbols, str): gene_symbols = [gene_symbols] features = [f for f in gene_symbols if f in all_features] if len(features) == 0: raise ValueError("None of the specified gene_symbols found in the data") else: features = all_features # Determine context order if context_order is None: contexts = sorted(samples) if sort_elements else list(samples) else: assert all([c in samples for c in context_order]), "context_order must contain all sample names" assert len(context_order) == len(samples), "Each sample must be in context_order exactly once" contexts = context_order # Aggregate data for each context and determine all cell types if verbose: print(f"Aggregating {feature_type} scores using '{agg_func}' method...") context_data = {} all_celltypes = set() for context in tqdm(contexts, desc='Processing contexts', disable=not verbose): # Subset data for this context context_mask = score_adata.obs[sample_key] == context context_adata = score_adata[context_mask, :].copy() # Check cell type counts celltype_counts = context_adata.obs[celltype_key].value_counts() valid_celltypes = celltype_counts[celltype_counts >= min_cells_per_group].index.tolist() if len(valid_celltypes) == 0: if verbose: print(f"Warning: No cell types with >= {min_cells_per_group} cells in context {context}") continue # Filter to valid cell types valid_mask = context_adata.obs[celltype_key].isin(valid_celltypes) context_adata = context_adata[valid_mask, :].copy() # Aggregate using scCellFie function try: agg_data = agg_expression_cells( adata=context_adata, groupby=celltype_key, layer=layer, gene_symbols=gene_symbols, agg_func=agg_func, top_percent=top_percent, exclude_zeros=exclude_zeros, use_raw=use_raw, threshold=threshold ) except Exception as e: print(f"Error aggregating context {context}: {e}") continue context_data[context] = agg_data all_celltypes.update(agg_data.index.tolist()) # Determine final element orders celltypes = sorted(all_celltypes) if sort_elements else list(all_celltypes) if sort_elements: features = sorted(features) if verbose: print(f"Building tensor with dimensions:") print(f" Contexts: {len(contexts)}") print(f" Cell Types: {len(celltypes)}") print(f" Features: {len(features)}") # Build 3D tensor: [contexts, celltypes, features] tensor_shape = (len(contexts), len(celltypes), len(features)) tensor = np.full(tensor_shape, fill_value, dtype=float) for ctx_idx, context in enumerate(tqdm(contexts, desc='Building tensor', disable=not verbose)): if context not in context_data: continue # This context will remain filled with fill_value agg_data = context_data[context] for ct_idx, celltype in enumerate(celltypes): for feat_idx, feature in enumerate(features): # Get score for this celltype-feature combination if celltype in agg_data.index and feature in agg_data.columns: score = agg_data.loc[celltype, feature] if pd.isna(score): tensor[ctx_idx, ct_idx, feat_idx] = fill_value else: tensor[ctx_idx, ct_idx, feat_idx] = score else: tensor[ctx_idx, ct_idx, feat_idx] = fill_value # Create mask and locate NaNs if np.isnan(fill_value): mask = (~np.isnan(tensor)).astype(int) loc_nans = (np.isnan(tensor)).astype(int) else: mask = None loc_nans = np.zeros(tensor.shape, dtype=int) # Default order labels if order_labels is None: order_labels = ['Contexts', 'Cell Types', f'{score_type.replace("_", " ").title()}'] # Prepare PreBuiltTensor arguments prebuilt_tensor_args = { 'tensor': tensor, 'order_names': [contexts, celltypes, features], 'order_labels': order_labels, 'mask': mask, 'loc_nans': loc_nans, } if verbose: print(f"\nTensor built successfully!") print(f"Shape: {tensor.shape}") print( f"Non-zero elements: {np.count_nonzero(~np.isnan(tensor) if np.isnan(fill_value) else tensor != fill_value)}") print(f"Fill value: {fill_value}") return prebuilt_tensor_args