Source code for scppin.analyzer

"""scPPIN analyzer class for stateful module detection."""

import igraph as ig
import numpy as np
import pandas as pd
from typing import Dict, Optional, Tuple, Union, List
import warnings

# Import internal helpers
from .graph import _build_graph, _load_edges_from_file
from .pvalues import _extract_pvalues
from .module import _detect_module, _validate_pvalues
from .core.network_utils import load_ppin as _load_ppin


def _normalize_gene_name(name: str, case_sensitive: bool = False) -> str:
    """
    Normalize gene name for consistent matching.
    
    Parameters
    ----------
    name : str
        Gene name to normalize
    case_sensitive : bool, optional
        If False, convert to uppercase (default: False)
        
    Returns
    -------
    str
        Normalized gene name
    """
    name = str(name).strip()
    if not case_sensitive:
        name = name.upper()
    return name


[docs] class scPPIN: """ scPPIN analyzer for detecting functional modules in protein-protein interaction networks. This class manages network, node weights (p-values), edge weights, and analysis results as object attributes, ensuring data consistency and providing a clean workflow. Attributes ---------- network : Optional[ig.Graph] Protein-protein interaction network filtered to genes with weights node_weights : Optional[Dict[str, float]] Node weights (p-values from differential expression) edge_weights : Dict[Tuple[str, str], float] Edge weights dictionary module : Optional[ig.Graph] Detected functional module Examples -------- >>> import scppin >>> >>> # Create analyzer >>> analyzer = scppin.scPPIN() >>> >>> # Load network with weights from CSV column >>> analyzer.load_network('edges.csv', weight_column='confidence') >>> >>> # Set node weights (p-values) >>> analyzer.set_node_weights({'TP53': 0.0001, 'MDM2': 0.001}) >>> >>> # Detect module >>> module = analyzer.detect_module(fdr=0.01) """
[docs] def __init__( self, network: Optional[ig.Graph] = None, node_weights: Optional[Dict[str, float]] = None, edge_weights: Optional[Dict[Tuple[str, str], float]] = None ): """ Initialize scPPIN analyzer. Parameters ---------- network : Optional[ig.Graph] Initial network (optional) node_weights : Optional[Dict[str, float]] Initial node weights (optional) edge_weights : Optional[Dict[Tuple[str, str], float]] Initial edge weights dictionary (optional) """ self.network = network self.node_weights = None # Will be set by set_node_weights if provided self.edge_weights: Dict[Tuple[str, str], float] = {} self.module: Optional[ig.Graph] = None # Normalize network nodes if network provided if self.network is not None: self._normalize_network_nodes() # Set node weights (which will normalize and filter network) if node_weights is not None: self.set_node_weights(node_weights) # Set edge weights (must be after network normalization and node weight filtering) if edge_weights is not None: if self.network is None: warnings.warn( 'Edge_weights provided but no network. Edge weights will be ignored. ' 'Load network first, then call set_edge_weights().' ) else: self.set_edge_weights(weights=edge_weights)
def _normalize_node_weights(self) -> None: """Normalize gene names in node_weights.""" if self.node_weights is None: return normalized = {} for gene, weight in self.node_weights.items(): norm_gene = _normalize_gene_name(gene) normalized[norm_gene] = weight self.node_weights = normalized def _filter_network_to_node_weights(self) -> None: """Filter network to genes with node weights.""" if self.network is None or self.node_weights is None: return # Nodes are already normalized, so we can use direct lookup genes_with_weights = set(self.node_weights.keys()) vertices_to_keep = [ v.index for v in self.network.vs if v['name'] in genes_with_weights ] if vertices_to_keep: self.network = self.network.subgraph(vertices_to_keep) else: warnings.warn("No nodes in network match node_weights after normalization")
[docs] def load_network( self, source: Union[str, List[Tuple], pd.DataFrame, ig.Graph], weight_column: Optional[str] = None, fmt: str = 'auto' ) -> 'scPPIN': """ Load network from file, list, DataFrame, or igraph graph. Parameters ---------- source : Union[str, List[Tuple], pd.DataFrame, ig.Graph] Network source: - String: Path to CSV/TXT/GraphML file - List: List of edge tuples - DataFrame: Edge list DataFrame - ig.Graph: Existing igraph graph weight_column : Optional[str] Column name in CSV/DataFrame to use as edge weights. If provided and source is file/DataFrame, loads weights from that column. Sets edge weights as 'weight' attribute on network edges. fmt : str, optional File format hint ('auto', 'csv', 'graphml', 'gml') (default: 'auto') Returns ------- scPPIN self (for method chaining) Examples -------- >>> analyzer = scppin.scPPIN() >>> analyzer.load_network('edges.csv') >>> analyzer.load_network('edges.csv', weight_column='confidence') >>> analyzer.load_network([('A', 'B'), ('B', 'C')]) """ # Handle igraph graph directly if isinstance(source, ig.Graph): self.network = source.copy() # Handle GraphML/GML files elif isinstance(source, str) and (fmt in ['graphml', 'gml'] or \ source.endswith(('.graphml', '.gml'))): if fmt == 'auto': # Auto-detect format from extension if source.endswith('.graphml'): file_fmt = 'graphml' elif source.endswith('.gml'): file_fmt = 'gml' else: file_fmt = 'graphml' # default else: file_fmt = fmt self.network = _load_ppin(source, fmt=file_fmt) # Handle CSV/TXT/list/DataFrame using build_graph else: # If weight_column specified and source is file/DataFrame, use it weights_param = weight_column if weight_column else None self.network = _build_graph(source, weights=weights_param, directed=False) # Normalize gene names in network if self.network is not None: self._normalize_network_nodes() # Filter to node_weights if already set (after normalization) if self.node_weights: self._filter_network_to_node_weights() # Extract edge weights if weight_column was used if weight_column and self.network is not None: self._extract_edge_weights_from_network(attr_name='weight') return self
def _normalize_network_nodes(self) -> None: """Normalize node names in network.""" if self.network is None: return # Batch update node names new_names = [_normalize_gene_name(v['name']) for v in self.network.vs] self.network.vs['name'] = new_names def _extract_edge_weights_from_network(self, attr_name: str = 'weight') -> None: """Extract edge weights from network attributes to self.edge_weights dict.""" if self.network is None: return # Nodes are already normalized, so we can use them directly self.edge_weights = {} node_names = self.network.vs['name'] try: weights = self.network.es[attr_name] for e in self.network.es: u_name = node_names[e.source] v_name = node_names[e.target] weight = weights[e.index] if weight is not None: self.edge_weights[(u_name, v_name)] = float(weight) except (KeyError, TypeError): # No weights attribute, leave edge_weights empty pass
[docs] def set_node_weights( self, weights: Union[Dict[str, float], object], groupby: Optional[str] = None, group: Optional[str] = None ) -> 'scPPIN': """ Set node weights (p-values) and filter network to genes with weights. Parameters ---------- weights : Union[Dict[str, float], AnnData] Node weights: - Dict: Dictionary mapping gene names to weights (p-values) - AnnData: Extract from rank_genes_groups (requires groupby/group) groupby : Optional[str] Key in adata.obs for grouping labels (required if weights is AnnData) group : Optional[str] Specific group to extract (required if weights is AnnData) Returns ------- scPPIN self (for method chaining) Examples -------- >>> analyzer.set_node_weights({'TP53': 0.0001, 'MDM2': 0.001}) >>> analyzer.set_node_weights(adata, groupby='louvain', group='0') Note ---- This method automatically: - Normalizes gene names for matching - Filters network to only include genes with weights """ # Handle AnnData input if hasattr(weights, 'var_names'): # AnnData object if groupby is None or group is None: raise ValueError("groupby and group required when weights is AnnData") weights_dict = _extract_pvalues(weights, groupby, group) elif isinstance(weights, dict): weights_dict = weights else: raise TypeError(f"weights must be dict or AnnData, got {type(weights)}") # Validate p-values early (fail fast) _validate_pvalues(weights_dict) # Store node weights self.node_weights = weights_dict # Always normalize gene names self._normalize_node_weights() # Always filter network to genes with weights if self.network is not None: self._filter_network_to_node_weights() return self
[docs] def set_edge_weights( self, weights: Dict[Tuple[str, str], float], attr_name: str = 'weight' ) -> 'scPPIN': """ Set edge weights from user-provided dictionary. Parameters ---------- weights : Dict[Tuple[str, str], float] User-provided edge weights dictionary. Sets these weights on network edges. Only edges in network are set (automatically filtered). attr_name : str, optional Edge attribute name to store weights (default: 'weight') Returns ------- scPPIN self (for method chaining) Examples -------- >>> # From dictionary >>> weights = {('TP53', 'MDM2'): 0.9, ('TP53', 'CDKN1A'): 0.8} >>> analyzer.set_edge_weights(weights=weights) """ if self.network is None: raise ValueError('Network must be loaded before setting edge weights. ' 'Call load_network() first.') if weights is None: raise ValueError('weights dictionary must be provided') filtered_weights = {} vertex_names = set(self.network.vs['name']) for (u, v), weight in weights.items(): # Normalize user input to match network node names u_norm = _normalize_gene_name(str(u)) v_norm = _normalize_gene_name(str(v)) # Only proceed if both vertices exist in network if (u_norm in vertex_names and v_norm in vertex_names): eid = self.network.get_eid(u_norm, v_norm, directed=False, error=False) if eid != -1: self.network.es[eid][attr_name] = float(weight) filtered_weights[(u_norm, v_norm)] = float(weight) self.edge_weights = filtered_weights if not filtered_weights: warnings.warn('No edges from weights dictionary matched network edges') return self
[docs] def detect_module( self, fdr: float = 0.01, edge_weight_attr: Optional[str] = None, c0: float = 0.01, normalization: Optional[str] = 'minmax', simplify: bool = True, validate: bool = True, use_max_prize_root: bool = False ) -> ig.Graph: """ Detect functional module using PCST optimization. Parameters ---------- fdr : float, optional False discovery rate threshold (default: 0.01) edge_weight_attr : Optional[str], optional Edge attribute name for weights (default: None = uniform costs). If None, uses uniform edge costs matching R implementation. c0 : Optional[float], optional Minimum edge cost (default: 0.01) normalization : Optional[str], optional Normalization method for edge weights: 'minmax', 'log1p', 'power', or None (default: 'minmax'). If None, uses weights directly without normalization (assumes weights are already in [0, 1] range). Only used when edge_weight_attr is provided. simplify : bool, optional Simplify network (default: True) validate : bool, optional Validate network (default: True) use_max_prize_root : bool, optional If True, use the node with highest prize as root (default: False) Returns ------- ig.Graph Detected functional module (also stored on ``self.module``) Examples -------- >>> # Default PCST >>> module = analyzer.detect_module(fdr=0.01) >>> >>> # PCST with edge weights >>> module = analyzer.detect_module(fdr=0.01, edge_weight_attr='weight') >>> >>> # PCST with max prize root (more deterministic) >>> module = analyzer.detect_module( ... fdr=0.01, use_max_prize_root=True ... ) """ if self.network is None: raise ValueError('Network must be loaded. Call load_network() first.') if self.node_weights is None: raise ValueError('Node weights must be set. Call set_node_weights() first.') # Use internal detect_module function self.module = _detect_module( self.network, self.node_weights, fdr=fdr, edge_weight_attr=edge_weight_attr, c0=c0, normalization=normalization, simplify=simplify, validate=validate, use_max_prize_root=use_max_prize_root ) return self.module
[docs] def plot_module(self, fdr: float = 0.01, **kwargs): """ Visualize detected module. Parameters ---------- fdr : float, optional FDR threshold for visualization (default: 0.01) **kwargs Additional plotting arguments passed to plot_functional_module Returns ------- matplotlib.figure.Figure Figure object """ if self.module is None: raise ValueError("No module detected. Call detect_module() first.") from .visualization.plotting import _plot_functional_module return _plot_functional_module(self.module, fdr=fdr, **kwargs)
[docs] def network_statistics(self, graph: Optional[ig.Graph] = None) -> Dict: """ Compute comprehensive network statistics. Parameters ---------- graph : Optional[ig.Graph], optional Network to analyze. If None, uses self.module if available, otherwise uses self.network (default: None) Returns ------- Dict Dictionary with network statistics including: - Basic stats: num_nodes, num_edges, density, num_components - Degree statistics: avg_degree, max_degree, min_degree - Clustering: avg_clustering_coefficient - Path metrics: avg_shortest_path_length, diameter (if connected) - Centrality: avg_degree_centrality, avg_betweenness_centrality Examples -------- >>> analyzer.detect_module(fdr=0.01) >>> stats = analyzer.network_statistics() # Statistics for module >>> print(f"Density: {stats['density']:.4f}") >>> print(f"Avg clustering: {stats['avg_clustering_coefficient']:.4f}") """ from .core.network_utils import network_statistics if graph is None: if self.module is not None: graph = self.module elif self.network is not None: graph = self.network else: raise ValueError("No network available. Load a network or detect a module first.") return network_statistics(graph)