🔍 Code Extractor

function api_index_folder

Maturity: 50

Flask API endpoint that initiates a background task to index documents in a specified folder, tracking progress and returning a task ID for status monitoring.

File:
/tf/active/vicechatdev/docchat/app.py
Lines:
1301 - 1400
Complexity:
complex

Purpose

This endpoint provides an asynchronous document indexing service that processes files in a folder without blocking the HTTP request. It creates a background thread to index documents using a DocumentIndexer, tracks progress through a shared task dictionary, and allows clients to monitor indexing status via the returned task_id. Supports force re-indexing and provides detailed statistics on processed, skipped, and failed documents.

Source Code

def api_index_folder():
    """Start background indexing task for a folder"""
    try:
        data = request.json
        folder_path = data.get('folder_path', config.DOCUMENT_FOLDER)
        force_reindex = data.get('force_reindex', False)
        
        folder = Path(folder_path)
        
        if not folder.exists() or not folder.is_dir():
            return jsonify({'error': 'Invalid folder path'}), 400
        
        if not document_indexer:
            return jsonify({'error': 'Document indexer not initialized'}), 500
        
        # Create task ID
        task_id = str(uuid_module.uuid4())
        
        # Start background indexing
        def index_task():
            with task_lock:
                active_tasks[task_id] = {
                    'status': 'processing',
                    'progress': 'Starting indexing...',
                    'current_file': '',
                    'processed': 0,
                    'total': 0,
                    'results': None
                }
            
            try:
                logger.info(f"Background indexing started for: {folder} (task_id={task_id})")
                
                # Index folder with progress callback
                def progress_callback(current, total, filename):
                    with task_lock:
                        if task_id in active_tasks:
                            active_tasks[task_id]['processed'] = current
                            active_tasks[task_id]['total'] = total
                            active_tasks[task_id]['current_file'] = filename
                            active_tasks[task_id]['progress'] = f"Processing {current}/{total}: {filename}"
                
                results = document_indexer.index_folder(
                    folder, 
                    recursive=True, 
                    force_reindex=force_reindex,
                    progress_callback=progress_callback
                )
                
                # Calculate new documents
                new_docs = results['success'] - results['reindexed']
                
                # Build message
                message_parts = []
                if new_docs > 0:
                    message_parts.append(f"{new_docs} new document{'s' if new_docs != 1 else ''}")
                if results['reindexed'] > 0:
                    message_parts.append(f"{results['reindexed']} re-indexed")
                if results['skipped'] > 0:
                    message_parts.append(f"{results['skipped']} skipped (up-to-date)")
                if results['failed'] > 0:
                    message_parts.append(f"{results['failed']} failed")
                
                message = "Indexed: " + ", ".join(message_parts) if message_parts else "No documents to index"
                
                with task_lock:
                    if task_id in active_tasks:
                        active_tasks[task_id]['status'] = 'completed'
                        active_tasks[task_id]['progress'] = message
                        active_tasks[task_id]['results'] = {
                            'message': message,
                            'total': results['total'],
                            'new': new_docs,
                            'reindexed': results['reindexed'],
                            'skipped': results['skipped'],
                            'failed': results['failed']
                        }
                
                logger.info(f"Background indexing completed: {message}")
                
            except Exception as e:
                logger.error(f"Error in background indexing: {e}")
                with task_lock:
                    if task_id in active_tasks:
                        active_tasks[task_id]['status'] = 'failed'
                        active_tasks[task_id]['progress'] = f"Error: {str(e)}"
        
        # Start thread
        thread = threading.Thread(target=index_task, daemon=True)
        thread.start()
        
        return jsonify({
            'task_id': task_id,
            'message': 'Indexing started in background',
            'status': 'processing'
        })
        
    except Exception as e:
        logger.error(f"Error starting indexing task: {e}")
        return jsonify({'error': str(e)}), 500

Return Value

Returns a JSON response with status code. On success (200): {'task_id': str (UUID), 'message': str, 'status': 'processing'}. On error (400): {'error': 'Invalid folder path'} for invalid paths. On error (500): {'error': str} for initialization failures or exceptions. The task_id can be used to poll for task status and results.

Dependencies

  • flask
  • uuid
  • pathlib
  • logging
  • threading
  • werkzeug

Required Imports

from flask import Flask, request, jsonify
import uuid as uuid_module
from pathlib import Path
import logging
import threading
from threading import Lock

Usage Example

# Server setup
from flask import Flask, request, jsonify
import uuid as uuid_module
from pathlib import Path
import threading
from threading import Lock
import logging
import config
from document_indexer import DocumentIndexer

app = Flask(__name__)
logger = logging.getLogger(__name__)
task_lock = Lock()
active_tasks = {}
document_indexer = DocumentIndexer()

@app.route('/api/index-folder', methods=['POST'])
def api_index_folder():
    # ... function code ...
    pass

# Client usage
import requests

# Start indexing task
response = requests.post('http://localhost:5000/api/index-folder', json={
    'folder_path': '/path/to/documents',
    'force_reindex': False
})

if response.status_code == 200:
    task_id = response.json()['task_id']
    print(f"Indexing started with task_id: {task_id}")
    # Poll for status using task_id
else:
    print(f"Error: {response.json()['error']}")

Best Practices

  • Ensure document_indexer is properly initialized before the Flask app starts accepting requests
  • The active_tasks dictionary should be periodically cleaned to prevent memory leaks from completed tasks
  • Use the task_lock consistently when accessing active_tasks to prevent race conditions
  • Implement a separate endpoint to query task status using the returned task_id
  • Consider adding task timeout mechanisms to handle long-running or stuck indexing operations
  • The daemon=True flag on the thread means it will be killed when the main process exits, which may interrupt indexing
  • Validate folder_path input to prevent directory traversal attacks in production environments
  • Monitor thread creation to prevent resource exhaustion from too many concurrent indexing tasks
  • The progress_callback function updates task status in real-time, allowing clients to monitor progress
  • Consider implementing task result expiration and cleanup after a certain time period

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function index_all_documents 86.4% similar

    Flask route handler that initiates background indexing of all documents in the system, creating a task ID for tracking progress and returning immediately while indexing continues asynchronously.

    From: /tf/active/vicechatdev/docchat/blueprint.py
  • function api_index_progress 80.2% similar

    Flask API endpoint that retrieves the current progress status of an asynchronous indexing task by its task ID.

    From: /tf/active/vicechatdev/docchat/app.py
  • function api_task_status 69.7% similar

    Flask API endpoint that retrieves and returns the status of asynchronous tasks (chat or indexing operations) by task ID.

    From: /tf/active/vicechatdev/docchat/app.py
  • function get_task_status 68.3% similar

    Flask API endpoint that retrieves the current status of a background task by its task ID from an in-memory active_tasks dictionary.

    From: /tf/active/vicechatdev/docchat/blueprint.py
  • function api_task_status_v1 66.2% similar

    Flask API endpoint that retrieves and returns the status of a background task, with user authorization checks.

    From: /tf/active/vicechatdev/vice_ai/app.py
← Back to Browse