class DataAnalysisService
Service class for managing data analysis operations within document sections, integrating with SmartStat components for statistical analysis, dataset processing, and visualization generation.
/tf/active/vicechatdev/vice_ai/data_analysis_service.py
34 - 482
complex
Purpose
DataAnalysisService provides a comprehensive framework for handling data analysis workflows in document sections. It manages analysis sessions, processes dataset uploads (CSV/Excel), executes statistical analysis requests through an integrated statistical agent, generates visualizations, and maintains conversation history. The service creates and manages analysis artifacts (plots, datasets, reports) in organized directories and supports both SmartStat-powered analysis and fallback responses when SmartStat is unavailable. It integrates with a hybrid RAG engine for enhanced statistical context and supports exporting analysis results for document generation.
Source Code
class DataAnalysisService:
"""Service for handling data analysis operations within document sections"""
def __init__(self, config, hybrid_rag_engine=None):
self.config = config
self.hybrid_rag_engine = hybrid_rag_engine
# Initialize SmartStat config if available
if SMARTSTAT_AVAILABLE:
self.smartstat_config = SmartStatConfig()
else:
self.smartstat_config = None
# Create directories for analysis artifacts
self.analysis_dir = Path(config.get('ANALYSIS_FOLDER', 'analysis_data'))
self.plots_dir = self.analysis_dir / 'plots'
self.data_dir = self.analysis_dir / 'datasets'
self.reports_dir = self.analysis_dir / 'reports'
for directory in [self.analysis_dir, self.plots_dir, self.data_dir, self.reports_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Store sessions in memory for now (could be extended to database)
self.sessions = {}
# Import SmartStat components if available
try:
import sys
smartstat_path = str(Path(__file__).parent.parent / 'smartstat_full')
# Temporarily modify sys.path to prioritize smartstat_full
original_path = sys.path.copy()
if smartstat_path not in sys.path:
sys.path.insert(0, smartstat_path) # Insert at beginning for priority
from statistical_agent import StatisticalAgent
from data_processor import DataProcessor
from script_executor import ScriptExecutor
# Restore original sys.path
sys.path = original_path
# Use SmartStat config if available, otherwise create a simple config
if self.smartstat_config:
smartstat_config = self.smartstat_config
else:
class SimpleConfig:
def __init__(self, config_dict):
self.__dict__.update(config_dict)
def get(self, key, default=None):
return getattr(self, key, default)
smartstat_config = SimpleConfig(config)
# Initialize SmartStat components
self.statistical_agent = StatisticalAgent(smartstat_config)
self.data_processor = DataProcessor(smartstat_config)
self.script_executor = ScriptExecutor(smartstat_config)
logger.info("SmartStat components initialized successfully")
except ImportError as e:
logger.warning(f"Could not import SmartStat components: {e}")
self.statistical_agent = None
self.data_processor = None
self.script_executor = None
def create_analysis_session(self, section_id: str, document_id: str,
user_id: str, title: str) -> DataAnalysisSession:
"""Create a new data analysis session for a document section"""
session_id = str(uuid.uuid4())
session = DataAnalysisSession(
session_id=session_id,
section_id=section_id,
document_id=document_id,
user_id=user_id,
title=title,
status=AnalysisStatus.PENDING
)
self.sessions[session_id] = session
logger.info(f"Created analysis session {session_id} for section {section_id}")
return session
def get_analysis_session(self, session_id: str) -> Optional[DataAnalysisSession]:
"""Get an existing analysis session"""
return self.sessions.get(session_id)
def get_session_by_section(self, section_id: str) -> Optional[DataAnalysisSession]:
"""Get analysis session by section ID"""
for session in self.sessions.values():
if session.section_id == section_id:
return session
return None
def upload_dataset(self, session_id: str, file_path: str,
original_filename: str) -> Dict[str, Any]:
"""Upload and process a dataset for analysis"""
session = self.get_analysis_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
try:
# Read the dataset
if original_filename.endswith('.csv'):
df = pd.read_csv(file_path)
elif original_filename.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
else:
raise ValueError("Unsupported file format. Please upload CSV or Excel files.")
# Save processed dataset
dataset_path = self.data_dir / f"{session_id}_dataset.csv"
df.to_csv(dataset_path, index=False)
# Create data source
session.data_source = DataSource(
source_type=DataSourceType.FILE_UPLOAD,
file_path=str(dataset_path),
parameters={'original_filename': original_filename}
)
session.status = AnalysisStatus.DATA_LOADED
session.updated_at = datetime.now()
# Generate data summary
summary = {
'rows': len(df),
'columns': len(df.columns),
'column_names': df.columns.tolist(),
'dtypes': df.dtypes.astype(str).to_dict(),
'missing_values': df.isnull().sum().to_dict(),
'summary_stats': df.describe().to_dict()
}
# Add initial message about dataset
session.add_message(
role='assistant',
content=f"Dataset uploaded successfully!\n\n"
f"**Dataset Summary:**\n"
f"- Rows: {summary['rows']:,}\n"
f"- Columns: {summary['columns']}\n"
f"- Column names: {', '.join(summary['column_names'][:5])}{'...' if len(summary['column_names']) > 5 else ''}\n\n"
f"What type of analysis would you like to perform?",
analysis_data={'dataset_summary': summary}
)
return {
'success': True,
'dataset_summary': summary,
'message': 'Dataset uploaded and processed successfully'
}
except Exception as e:
session.status = AnalysisStatus.FAILED
logger.error(f"Error uploading dataset for session {session_id}: {e}")
raise e
async def process_analysis_request(self, session_id: str, user_message: str) -> Dict[str, Any]:
"""Process a user's analysis request using the statistical agent"""
session = self.get_analysis_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
# Add user message
session.add_message('user', user_message)
if not self.statistical_agent:
# Fallback response if SmartStat is not available
response = "Statistical analysis engine is not available. Please ensure SmartStat components are properly installed."
session.add_message('assistant', response)
return {'response': response, 'plots': [], 'data': {}}
try:
session.status = AnalysisStatus.PROCESSING
# Load dataset if available
df = None
if session.data_source and session.data_source.file_path:
df = pd.read_csv(session.data_source.file_path)
# Use RAG engine for enhanced responses if available
context = ""
if self.hybrid_rag_engine:
# Get relevant context about statistical methods
context_query = f"statistical analysis {user_message}"
rag_result = await self.hybrid_rag_engine.process_query(
query=context_query,
collections=['statistics', 'data_science'] if hasattr(self.hybrid_rag_engine, 'collections') else []
)
if rag_result.get('response'):
context = f"Statistical Context: {rag_result['response']}\n\n"
# Generate analysis response
if df is not None:
analysis_prompt = f"""
{context}
Dataset Information:
- Shape: {df.shape}
- Columns: {', '.join(df.columns.tolist())}
- Data types: {df.dtypes.to_dict()}
User Request: {user_message}
Please provide specific statistical analysis recommendations and code to address this request.
Focus on actionable insights and appropriate statistical methods.
"""
else:
analysis_prompt = f"""
{context}
User Request: {user_message}
The user hasn't uploaded a dataset yet. Please provide guidance on what type of data they need
and what analysis can be performed once they upload it.
"""
# Generate response using statistical agent or fallback
if hasattr(self.statistical_agent, 'generate_analysis_response'):
response = await self.statistical_agent.generate_analysis_response(analysis_prompt)
else:
# Simple fallback response
response = self._generate_fallback_response(user_message, df)
# Execute any code if requested and data is available
plots = []
analysis_data = {}
if df is not None and any(keyword in user_message.lower() for keyword in ['plot', 'chart', 'graph', 'visualize']):
plots = await self._generate_basic_plots(session_id, df, user_message)
session.status = AnalysisStatus.COMPLETED
session.add_message('assistant', response, {'plots': plots, 'data': analysis_data})
return {
'response': response,
'plots': plots,
'data': analysis_data
}
except Exception as e:
session.status = AnalysisStatus.FAILED
error_msg = f"Error processing analysis request: {str(e)}"
session.add_message('assistant', error_msg)
logger.error(f"Analysis error for session {session_id}: {e}")
return {'response': error_msg, 'plots': [], 'data': {}}
def _generate_fallback_response(self, user_message: str, df: pd.DataFrame = None) -> str:
"""Generate a basic fallback response when statistical agent is not available"""
if df is None:
return """To get started with data analysis, please upload your dataset (CSV or Excel format).
Once uploaded, I can help you with:
- Descriptive statistics and data exploration
- Correlation analysis
- Basic visualizations (histograms, scatter plots, box plots)
- Statistical tests (t-tests, ANOVA, chi-square)
- Regression analysis
What type of analysis are you interested in?"""
# Basic analysis based on keywords
message_lower = user_message.lower()
if any(word in message_lower for word in ['summary', 'describe', 'overview']):
return f"""**Dataset Summary:**
**Shape:** {df.shape[0]} rows × {df.shape[1]} columns
**Columns:** {', '.join(df.columns.tolist())}
**Numerical Summary:**
{df.describe().to_string()}
**Missing Values:**
{df.isnull().sum().to_string()}
Would you like me to create some visualizations or perform specific statistical tests?"""
elif any(word in message_lower for word in ['correlation', 'correlate']):
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
corr = df[numeric_cols].corr()
return f"""**Correlation Analysis:**
{corr.to_string()}
The correlation matrix shows relationships between numerical variables. Values closer to 1 or -1 indicate stronger relationships.
Would you like me to create a correlation heatmap?"""
else:
return "Not enough numerical columns for correlation analysis. Please upload data with multiple numerical variables."
else:
return f"""I can help you analyze your dataset ({df.shape[0]} rows × {df.shape[1]} columns).
**Available analyses:**
- Descriptive statistics
- Correlation analysis
- Data visualization
- Statistical tests
- Regression analysis
Please specify what type of analysis you'd like to perform, or ask about specific variables: {', '.join(df.columns[:5])}{'...' if len(df.columns) > 5 else ''}"""
async def _generate_basic_plots(self, session_id: str, df: pd.DataFrame, request: str) -> List[str]:
"""Generate basic plots for the dataset"""
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('default')
plots = []
try:
# Create a simple histogram for numerical columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
# Distribution plot
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()
for i, col in enumerate(numeric_cols[:4]):
if i < 4:
axes[i].hist(df[col].dropna(), bins=30, alpha=0.7)
axes[i].set_title(f'Distribution of {col}')
axes[i].set_xlabel(col)
axes[i].set_ylabel('Frequency')
# Hide empty subplots
for i in range(len(numeric_cols), 4):
axes[i].set_visible(False)
plt.tight_layout()
plot_path = self.plots_dir / f"{session_id}_distributions.png"
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.close()
plots.append(str(plot_path))
# Correlation heatmap if multiple numeric columns
if len(numeric_cols) > 1:
plt.figure(figsize=(10, 8))
corr = df[numeric_cols].corr()
sns.heatmap(corr, annot=True, cmap='coolwarm', center=0, square=True)
plt.title('Correlation Matrix')
plt.tight_layout()
plot_path = self.plots_dir / f"{session_id}_correlation.png"
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.close()
plots.append(str(plot_path))
except Exception as e:
logger.error(f"Error generating plots for session {session_id}: {e}")
return plots
def get_session_content_for_export(self, session_id: str) -> Dict[str, Any]:
"""Get analysis session content formatted for document export"""
session = self.get_analysis_session(session_id)
if not session:
return {}
# Format conversation
conversation_html = ""
for message in session.messages:
role_class = "user-message" if message['role'] == 'user' else "assistant-message"
conversation_html += f"""
<div class="{role_class}">
<strong>{message['role'].title()}:</strong>
<p>{message['content']}</p>
</div>
"""
# Include plots
plots_html = ""
if session.generated_plots:
plots_html = "<h3>Generated Visualizations</h3>\n"
for plot_path in session.generated_plots:
if os.path.exists(plot_path):
# Convert to base64 for embedding
import base64
with open(plot_path, 'rb') as f:
plot_data = base64.b64encode(f.read()).decode()
plots_html += f'<img src="data:image/png;base64,{plot_data}" style="max-width: 100%; margin: 10px 0;" />\n'
# Format conclusions
conclusions_html = ""
if session.conclusions:
conclusions_html = f"""
<h3>Analysis Conclusions</h3>
<div class="conclusions">
{session.conclusions}
</div>
"""
return {
'title': session.title,
'conversation': conversation_html,
'plots': plots_html,
'conclusions': conclusions_html,
'full_content': f"""
<div class="data-analysis-section">
<h2>{session.title}</h2>
<div class="analysis-conversation">
{conversation_html}
</div>
{plots_html}
{conclusions_html}
</div>
"""
}
def update_session_conclusions(self, session_id: str, conclusions: str) -> bool:
"""Update the conclusions for an analysis session"""
session = self.get_analysis_session(session_id)
if not session:
return False
session.conclusions = conclusions
session.updated_at = datetime.now()
return True
def delete_analysis_session(self, session_id: str) -> bool:
"""Delete an analysis session and its artifacts"""
session = self.get_analysis_session(session_id)
if not session:
return False
try:
# Clean up files
for plot_path in session.generated_plots:
if os.path.exists(plot_path):
os.remove(plot_path)
if session.data_source and session.data_source.file_path:
if os.path.exists(session.data_source.file_path):
os.remove(session.data_source.file_path)
# Remove from memory
del self.sessions[session_id]
logger.info(f"Deleted analysis session {session_id}")
return True
except Exception as e:
logger.error(f"Error deleting session {session_id}: {e}")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
config: Configuration dictionary or object containing settings for the service. Expected keys include 'ANALYSIS_FOLDER' (path for storing analysis artifacts). The config is also passed to SmartStat components if available.
hybrid_rag_engine: Optional hybrid RAG (Retrieval-Augmented Generation) engine instance for providing enhanced statistical context during analysis. If provided, it will be queried for relevant statistical methods and data science information to improve analysis responses. Can be None if RAG functionality is not needed.
Return Value
Instantiation returns a DataAnalysisService object. Key method returns: create_analysis_session() returns DataAnalysisSession object; upload_dataset() returns dict with 'success', 'dataset_summary', and 'message' keys; process_analysis_request() returns dict with 'response' (str), 'plots' (list of paths), and 'data' (dict) keys; get_session_content_for_export() returns dict with 'title', 'conversation', 'plots', 'conclusions', and 'full_content' HTML strings.
Class Interface
Methods
__init__(self, config, hybrid_rag_engine=None)
Purpose: Initialize the DataAnalysisService with configuration and optional RAG engine, set up directories, and load SmartStat components if available
Parameters:
config: Configuration dictionary with settings like ANALYSIS_FOLDERhybrid_rag_engine: Optional RAG engine for enhanced statistical context
Returns: None (constructor)
create_analysis_session(self, section_id: str, document_id: str, user_id: str, title: str) -> DataAnalysisSession
Purpose: Create a new data analysis session for a document section with unique session ID
Parameters:
section_id: ID of the document section this analysis belongs todocument_id: ID of the parent documentuser_id: ID of the user creating the sessiontitle: Title/name for this analysis session
Returns: DataAnalysisSession object with generated session_id and PENDING status
get_analysis_session(self, session_id: str) -> Optional[DataAnalysisSession]
Purpose: Retrieve an existing analysis session by its session ID
Parameters:
session_id: Unique identifier of the session to retrieve
Returns: DataAnalysisSession object if found, None otherwise
get_session_by_section(self, section_id: str) -> Optional[DataAnalysisSession]
Purpose: Find and return the analysis session associated with a specific document section
Parameters:
section_id: ID of the document section to search for
Returns: DataAnalysisSession object if found, None otherwise
upload_dataset(self, session_id: str, file_path: str, original_filename: str) -> Dict[str, Any]
Purpose: Upload and process a CSV or Excel dataset for analysis, generating summary statistics and updating session status
Parameters:
session_id: ID of the session to upload data tofile_path: Path to the uploaded file on diskoriginal_filename: Original filename with extension (.csv, .xlsx, .xls)
Returns: Dictionary with 'success' (bool), 'dataset_summary' (dict with rows, columns, dtypes, missing_values, summary_stats), and 'message' (str)
async process_analysis_request(self, session_id: str, user_message: str) -> Dict[str, Any]
Purpose: Process a user's analysis request using the statistical agent, generate visualizations if requested, and return analysis results
Parameters:
session_id: ID of the session to process request foruser_message: User's natural language analysis request
Returns: Dictionary with 'response' (str analysis text), 'plots' (list of plot file paths), and 'data' (dict of analysis results)
_generate_fallback_response(self, user_message: str, df: pd.DataFrame = None) -> str
Purpose: Generate a basic analysis response when SmartStat statistical agent is not available, providing guidance or simple statistics
Parameters:
user_message: User's analysis requestdf: Optional pandas DataFrame to analyze
Returns: String containing fallback analysis response or guidance
async _generate_basic_plots(self, session_id: str, df: pd.DataFrame, request: str) -> List[str]
Purpose: Generate basic visualizations (histograms, correlation heatmap) for the dataset and save as PNG files
Parameters:
session_id: ID of the session for naming plot filesdf: pandas DataFrame to visualizerequest: User's request (used for context)
Returns: List of file paths to generated plot PNG files
get_session_content_for_export(self, session_id: str) -> Dict[str, Any]
Purpose: Format analysis session content as HTML for document export, including conversation, plots, and conclusions
Parameters:
session_id: ID of the session to export
Returns: Dictionary with 'title', 'conversation' (HTML), 'plots' (HTML with base64 images), 'conclusions' (HTML), and 'full_content' (complete HTML)
update_session_conclusions(self, session_id: str, conclusions: str) -> bool
Purpose: Update the conclusions text for an analysis session
Parameters:
session_id: ID of the session to updateconclusions: Conclusions text to store
Returns: True if successful, False if session not found
delete_analysis_session(self, session_id: str) -> bool
Purpose: Delete an analysis session and clean up all associated files (plots, datasets)
Parameters:
session_id: ID of the session to delete
Returns: True if successful, False if session not found or error occurred
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
config |
dict or config object | Configuration settings for the service | instance |
hybrid_rag_engine |
Optional[object] | RAG engine instance for enhanced statistical context, can be None | instance |
smartstat_config |
Optional[SmartStatConfig] | SmartStat configuration object if SmartStat is available, None otherwise | instance |
analysis_dir |
Path | Root directory for storing all analysis artifacts | instance |
plots_dir |
Path | Directory for storing generated plot images | instance |
data_dir |
Path | Directory for storing processed datasets | instance |
reports_dir |
Path | Directory for storing analysis reports | instance |
sessions |
Dict[str, DataAnalysisSession] | In-memory storage of active analysis sessions, keyed by session_id | instance |
statistical_agent |
Optional[StatisticalAgent] | SmartStat statistical agent for processing analysis requests, None if unavailable | instance |
data_processor |
Optional[DataProcessor] | SmartStat data processor component, None if unavailable | instance |
script_executor |
Optional[ScriptExecutor] | SmartStat script executor component, None if unavailable | instance |
Dependencies
uuidjsonloggingosshutiltempfilepandasnumpydatetimetypingpathlibmatplotlibseabornsysbase64
Required Imports
import uuid
import json
import logging
import os
import shutil
import tempfile
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
from pathlib import Path
from models import DataAnalysisSession, AnalysisResult, AnalysisStatus, DataSource, DataSourceType, AnalysisConfiguration, AnalysisType
import matplotlib.pyplot as plt
import seaborn as sns
import base64
Conditional/Optional Imports
These imports are only needed under specific conditions:
from smartstat_config import Config as SmartStatConfig
Condition: only if SmartStat is available (SMARTSTAT_AVAILABLE flag is True)
Optionalfrom statistical_agent import StatisticalAgent
Condition: only if SmartStat components are installed in smartstat_full directory
Optionalfrom data_processor import DataProcessor
Condition: only if SmartStat components are installed in smartstat_full directory
Optionalfrom script_executor import ScriptExecutor
Condition: only if SmartStat components are installed in smartstat_full directory
OptionalUsage Example
# Initialize the service
config = {'ANALYSIS_FOLDER': 'my_analysis_data'}
service = DataAnalysisService(config, hybrid_rag_engine=None)
# Create an analysis session
session = service.create_analysis_session(
section_id='section_123',
document_id='doc_456',
user_id='user_789',
title='Sales Data Analysis'
)
# Upload a dataset
result = service.upload_dataset(
session_id=session.session_id,
file_path='/path/to/data.csv',
original_filename='sales_data.csv'
)
print(f"Dataset loaded: {result['dataset_summary']['rows']} rows")
# Process analysis request
import asyncio
response = asyncio.run(service.process_analysis_request(
session_id=session.session_id,
user_message='Show me correlation between sales and marketing spend'
))
print(response['response'])
print(f"Generated {len(response['plots'])} plots")
# Update conclusions
service.update_session_conclusions(
session_id=session.session_id,
conclusions='Strong positive correlation found between variables'
)
# Export for document
export_data = service.get_session_content_for_export(session.session_id)
print(export_data['full_content'])
# Clean up
service.delete_analysis_session(session.session_id)
Best Practices
- Always create an analysis session before uploading datasets or processing requests
- Check if session exists using get_analysis_session() before performing operations
- Handle the case where SmartStat components are not available (statistical_agent will be None)
- Use async/await when calling process_analysis_request() as it's an async method
- Clean up sessions using delete_analysis_session() when done to free disk space
- Ensure write permissions exist for the analysis directory before instantiation
- Monitor session status through session.status to track analysis progress
- The service stores sessions in memory; consider implementing database persistence for production
- Dataset files are saved to disk; manage disk space for large datasets
- Generated plots are saved as PNG files; paths are returned for embedding in documents
- The service gracefully degrades to fallback responses when SmartStat is unavailable
- Use get_session_by_section() to retrieve sessions associated with specific document sections
- Call update_session_conclusions() to add final analysis summaries before export
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DataSectionService 77.7% similar
-
function test_data_analysis_service 70.7% similar
-
class DataAnalysisSession_v1 69.4% similar
-
class DataSection 69.4% similar
-
class SmartStatSession 68.9% similar