🔍 Code Extractor

class TextClusterer

Maturity: 51

A class that clusters similar documents based on their embeddings using various clustering algorithms (K-means, Agglomerative, DBSCAN) and optionally generates summaries for each cluster.

File:
/tf/active/vicechatdev/chromadb-cleanup/src/clustering/text_clusterer.py
Lines:
8 - 171
Complexity:
moderate

Purpose

TextClusterer provides document clustering functionality for grouping similar documents together based on their vector embeddings. It supports multiple clustering algorithms and can optionally summarize each cluster by combining document texts and creating a representative summary document. This is useful for document organization, information retrieval, and reducing redundancy in large document collections.

Source Code

class TextClusterer:
    """Class for clustering similar documents based on their embeddings."""
    
    def __init__(self, config: Config):
        """
        Initialize the TextClusterer with configuration.
        
        Args:
            config: Configuration object
        """
        self.config = config
        self.clustering_method = config.clustering_method
        self.num_clusters = config.num_clusters
        self.summarize_clusters = not config.skip_summarization
    
    def cluster(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Cluster documents based on their embeddings and optionally summarize each cluster.
        
        Args:
            documents: List of document dictionaries with 'id', 'text', and 'embedding' keys
            
        Returns:
            Documents with added 'cluster' field, and optionally with cluster summaries
        """
        if not documents:
            return []
        
        # Ensure all documents have embeddings
        if 'embedding' not in documents[0]:
            raise ValueError("Documents must have embeddings for clustering")
        
        # Extract embeddings as numpy array
        embeddings = np.array([doc['embedding'] for doc in documents])
        
        # Perform clustering based on the selected method
        if self.clustering_method == "kmeans":
            clusters = self._kmeans_clustering(embeddings)
        elif self.clustering_method == "agglomerative":
            clusters = self._agglomerative_clustering(embeddings)
        elif self.clustering_method == "dbscan":
            clusters = self._dbscan_clustering(embeddings)
        else:
            raise ValueError(f"Unknown clustering method: {self.clustering_method}")
        
        # Add cluster information to documents
        for i, doc in enumerate(documents):
            doc['cluster'] = int(clusters[i])
        
        # Group documents by cluster
        cluster_groups = {}
        for i, doc in enumerate(documents):
            cluster_id = int(clusters[i])
            if cluster_id not in cluster_groups:
                cluster_groups[cluster_id] = []
            cluster_groups[cluster_id].append(doc)
        
        # If summarization is enabled, create a summary for each cluster
        if self.summarize_clusters:
            return self._summarize_clusters(documents, cluster_groups)
        else:
            return documents
    
    def _summarize_clusters(self, documents: List[Dict[str, Any]], cluster_groups: Dict[int, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
        """
        Create summaries for each cluster.
        
        Args:
            documents: Original list of documents
            cluster_groups: Dictionary mapping cluster IDs to lists of documents
            
        Returns:
            List of documents with cluster summaries included
        """
        result = []
        
        # Process each cluster
        for cluster_id, docs in cluster_groups.items():
            if len(docs) <= 1:
                # If cluster has only one document, keep it as is
                result.extend(docs)
                continue
            
            # Combine the text content from all documents in the cluster
            combined_text = "\n\n".join([doc['text'] for doc in docs])
            
            # Create a summary
            summary = create_summary(combined_text, self.config)
            
            # Select a representative embedding for the summary
            # Using the document closest to the cluster centroid would be ideal,
            # but for simplicity we'll use the first document's embedding
            rep_embedding = docs[0]['embedding']
            
            # Create a summary document
            summary_doc = {
                'id': f"cluster_{cluster_id}_summary",
                'text': summary,
                'embedding': rep_embedding,
                'cluster': cluster_id,
                'metadata': {
                    'is_summary': True,
                    'cluster_id': str(cluster_id),
                    'summarized_docs': len(docs),
                    'source_ids': [doc['id'] for doc in docs]
                }
            }
            
            # Keep only the summary document for this cluster
            result.append(summary_doc)
        
        print(f"Created summaries for {len(cluster_groups)} clusters, resulting in {len(result)} documents")
        return result
    
    def _kmeans_clustering(self, embeddings: np.ndarray) -> np.ndarray:
        """
        Perform K-means clustering.
        
        Args:
            embeddings: Document embeddings as numpy array
            
        Returns:
            Array of cluster assignments
        """
        kmeans = KMeans(
            n_clusters=min(self.num_clusters, len(embeddings)),
            random_state=42,
            n_init=10
        )
        return kmeans.fit_predict(embeddings)
    
    def _agglomerative_clustering(self, embeddings: np.ndarray) -> np.ndarray:
        """
        Perform hierarchical agglomerative clustering.
        
        Args:
            embeddings: Document embeddings as numpy array
            
        Returns:
            Array of cluster assignments
        """
        clustering = AgglomerativeClustering(
            n_clusters=min(self.num_clusters, len(embeddings)),
            affinity='cosine',
            linkage='average'
        )
        return clustering.fit_predict(embeddings)
    
    def _dbscan_clustering(self, embeddings: np.ndarray) -> np.ndarray:
        """
        Perform DBSCAN clustering.
        
        Args:
            embeddings: Document embeddings as numpy array
            
        Returns:
            Array of cluster assignments
        """
        dbscan = DBSCAN(
            eps=0.3,
            min_samples=3,
            metric='cosine'
        )
        return dbscan.fit_predict(embeddings)

Parameters

Name Type Default Kind
bases - -

Parameter Details

config: A Config object containing clustering configuration parameters including clustering_method (str: 'kmeans', 'agglomerative', or 'dbscan'), num_clusters (int: desired number of clusters), and skip_summarization (bool: whether to skip cluster summarization). This config object controls all aspects of the clustering behavior.

Return Value

The constructor returns a TextClusterer instance. The main cluster() method returns a list of document dictionaries with added 'cluster' field (int) indicating cluster assignment. If summarization is enabled, it returns summary documents instead of original documents, where each summary document contains combined text from all documents in that cluster, metadata about the summarization, and the 'is_summary' flag set to True.

Class Interface

Methods

__init__(self, config: Config) -> None

Purpose: Initialize the TextClusterer with configuration settings for clustering method, number of clusters, and summarization behavior

Parameters:

  • config: Config object containing clustering_method, num_clusters, and skip_summarization attributes

Returns: None - initializes the instance

cluster(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]

Purpose: Main method to cluster documents based on their embeddings and optionally summarize each cluster

Parameters:

  • documents: List of document dictionaries, each must contain 'id', 'text', and 'embedding' keys. The 'embedding' should be a list or array of numerical values representing the document's vector embedding

Returns: List of document dictionaries with added 'cluster' field (int). If summarization is enabled, returns summary documents with 'metadata' containing 'is_summary', 'cluster_id', 'summarized_docs', and 'source_ids' fields

_summarize_clusters(self, documents: List[Dict[str, Any]], cluster_groups: Dict[int, List[Dict[str, Any]]]) -> List[Dict[str, Any]]

Purpose: Private method to create summaries for each cluster by combining document texts and generating representative summary documents

Parameters:

  • documents: Original list of documents with cluster assignments
  • cluster_groups: Dictionary mapping cluster IDs (int) to lists of documents belonging to that cluster

Returns: List of summary documents, one per cluster (except single-document clusters which are preserved as-is). Each summary document contains combined text, representative embedding, and metadata about the summarization

_kmeans_clustering(self, embeddings: np.ndarray) -> np.ndarray

Purpose: Private method to perform K-means clustering on document embeddings

Parameters:

  • embeddings: 2D numpy array where each row is a document embedding vector

Returns: 1D numpy array of cluster assignments (integers) for each document, with values from 0 to num_clusters-1

_agglomerative_clustering(self, embeddings: np.ndarray) -> np.ndarray

Purpose: Private method to perform hierarchical agglomerative clustering using cosine affinity and average linkage

Parameters:

  • embeddings: 2D numpy array where each row is a document embedding vector

Returns: 1D numpy array of cluster assignments (integers) for each document, with values from 0 to num_clusters-1

_dbscan_clustering(self, embeddings: np.ndarray) -> np.ndarray

Purpose: Private method to perform DBSCAN density-based clustering with cosine metric

Parameters:

  • embeddings: 2D numpy array where each row is a document embedding vector

Returns: 1D numpy array of cluster assignments (integers) for each document. Values range from 0 to n_clusters-1, with -1 indicating noise/outlier points

Attributes

Name Type Description Scope
config Config Configuration object containing all clustering settings instance
clustering_method str The clustering algorithm to use: 'kmeans', 'agglomerative', or 'dbscan' instance
num_clusters int The desired number of clusters for kmeans and agglomerative methods instance
summarize_clusters bool Whether to generate summaries for each cluster (inverse of skip_summarization config) instance

Dependencies

  • numpy
  • sklearn
  • typing

Required Imports

from typing import List, Dict, Any
import numpy as np
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from src.config import Config
from src.summarization.summarizer import create_summary

Usage Example

from src.config import Config
from text_clusterer import TextClusterer
import numpy as np

# Create configuration
config = Config()
config.clustering_method = 'kmeans'
config.num_clusters = 5
config.skip_summarization = False

# Initialize clusterer
clusterer = TextClusterer(config)

# Prepare documents with embeddings
documents = [
    {'id': 'doc1', 'text': 'Machine learning is fascinating', 'embedding': np.random.rand(768).tolist()},
    {'id': 'doc2', 'text': 'Deep learning uses neural networks', 'embedding': np.random.rand(768).tolist()},
    {'id': 'doc3', 'text': 'Python is a programming language', 'embedding': np.random.rand(768).tolist()}
]

# Perform clustering
clustered_docs = clusterer.cluster(documents)

# Access cluster assignments
for doc in clustered_docs:
    print(f"Document {doc['id']} is in cluster {doc['cluster']}")
    if doc.get('metadata', {}).get('is_summary'):
        print(f"This is a summary of {doc['metadata']['summarized_docs']} documents")

Best Practices

  • Always ensure documents have 'embedding' field before calling cluster() method, otherwise ValueError will be raised
  • The num_clusters parameter should be less than or equal to the number of documents to avoid errors
  • When using DBSCAN, some documents may be assigned to cluster -1 (noise/outliers)
  • If summarization is enabled, ensure the create_summary function is properly configured and available
  • The class is stateless after initialization - you can reuse the same instance for multiple clustering operations
  • For large document collections, consider the memory requirements of storing all embeddings in a numpy array
  • The representative embedding for cluster summaries is taken from the first document; for better results, consider using the centroid
  • Single-document clusters are preserved as-is when summarization is enabled
  • Cluster IDs are integers starting from 0 (or -1 for DBSCAN noise points)
  • The cluster() method modifies the input documents by adding the 'cluster' field

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class SimilarityCleaner 63.7% similar

    A document cleaning class that identifies and removes duplicate or highly similar documents based on embedding vector similarity, keeping only representative documents from each similarity group.

    From: /tf/active/vicechatdev/chromadb-cleanup/src/cleaners/similarity_cleaner.py
  • class MyEmbeddingFunction_v3 53.4% similar

    A custom embedding function class that generates embeddings for text documents using OpenAI's embedding models, with automatic text summarization and token limit handling for large documents.

    From: /tf/active/vicechatdev/offline_docstore_multi.py
  • class MyEmbeddingFunction_v1 52.7% similar

    A custom embedding function class that generates embeddings for documents using OpenAI's API, with built-in text summarization for long documents and token management.

    From: /tf/active/vicechatdev/OneCo_hybrid_RAG copy.py
  • class DocChatEmbeddingFunction 52.6% similar

    A custom ChromaDB embedding function that generates OpenAI embeddings with automatic text summarization for documents exceeding token limits.

    From: /tf/active/vicechatdev/docchat/document_indexer.py
  • class MyEmbeddingFunction_v2 52.2% similar

    A custom embedding function class that generates embeddings for text documents using OpenAI's embedding models, with automatic text summarization and token management for large documents.

    From: /tf/active/vicechatdev/offline_docstore_multi_vice.py
← Back to Browse