🔍 Code Extractor

function migrate_audit_events

Maturity: 63

Migrates orphaned AuditEvent nodes from CDocs nodes to an AuditTrail node in a Neo4j graph database by deleting old relationships and creating new ones.

File:
/tf/active/vicechatdev/CDocs/db/schema_manager.py
Lines:
632 - 687
Complexity:
moderate

Purpose

This function performs a database migration to reorganize audit event relationships in Neo4j. It finds AuditEvent nodes that are connected to CDocs nodes via HAS_EVENT relationships and reconnects them to a central AuditTrail node instead. This is useful for consolidating audit events under a single audit trail structure, likely as part of a schema refactoring or data cleanup operation. If no audit trail UID is provided, it attempts to find or create one automatically.

Source Code

def migrate_audit_events(driver: Driver, audit_trail_uid: Optional[str] = None) -> Dict:
    """
    Migrate orphaned audit events to be connected to the audit trail.
    
    Args:
        driver: Neo4j driver instance
        audit_trail_uid: Optional UID of the audit trail to connect events to.
                         If None, will attempt to find the audit trail UID first.
        
    Returns:
        Dict containing migration results
    """
    try:
        # If no audit_trail_uid provided, try to find it first
        if audit_trail_uid is None:
            audit_trail_uid = ensure_audit_trail_exists(driver)
            if not audit_trail_uid:
                return {
                    "success": False,
                    "message": "Could not find or create audit trail",
                    "events_migrated": 0
                }
        
        # Migration query to connect orphaned audit events
        migration_query = """
        MATCH (c:CDocs)-[r:HAS_EVENT]->(e:AuditEvent)
        WITH c, r, e, count(e) as count
        MATCH (t:AuditTrail {UID: $audit_trail_uid})
        DELETE r
        CREATE (t)-[:HAS_EVENT]->(e)
        RETURN count(e) as migrated
        """
        
        with driver.session() as session:
            result = session.run(migration_query, {"audit_trail_uid": audit_trail_uid})
            record = result.single()
            
            if record:
                return {
                    "success": True,
                    "message": f"Successfully migrated {record['migrated']} audit events",
                    "events_migrated": record["migrated"]
                }
            else:
                return {
                    "success": False,
                    "message": "Migration query returned no results",
                    "events_migrated": 0
                }
    except Exception as e:
        logger.error(f"Error migrating audit events: {e}")
        return {
            "success": False,
            "message": f"Error: {str(e)}",
            "events_migrated": 0
        }

Parameters

Name Type Default Kind
driver Driver - positional_or_keyword
audit_trail_uid Optional[str] None positional_or_keyword

Parameter Details

driver: A Neo4j Driver instance used to establish database connections and execute queries. This should be an active, authenticated connection to a Neo4j database containing the CDocs, AuditEvent, and AuditTrail node types.

audit_trail_uid: Optional string representing the unique identifier (UID) of the AuditTrail node to which orphaned audit events should be connected. If None, the function will call ensure_audit_trail_exists() to find or create an audit trail and obtain its UID. Expected to be a valid UID string that matches an existing AuditTrail node in the database.

Return Value

Type: Dict

Returns a dictionary with three keys: 'success' (boolean indicating whether the migration completed successfully), 'message' (string describing the outcome or error), and 'events_migrated' (integer count of how many AuditEvent nodes were successfully migrated). On success, events_migrated contains the count from the Cypher query; on failure, it is 0.

Dependencies

  • neo4j
  • logging
  • typing

Required Imports

from neo4j import Driver
from typing import Dict, Optional
import logging

Usage Example

from neo4j import GraphDatabase
from typing import Dict, Optional
import logging

# Setup logger
logger = logging.getLogger(__name__)

# Establish Neo4j connection
driver = GraphDatabase.driver(
    'bolt://localhost:7687',
    auth=('neo4j', 'password')
)

# Option 1: Migrate with specific audit trail UID
result = migrate_audit_events(driver, audit_trail_uid='audit-trail-123')
print(f"Migration result: {result['message']}")
print(f"Events migrated: {result['events_migrated']}")

# Option 2: Let function find/create audit trail automatically
result = migrate_audit_events(driver)
if result['success']:
    print(f"Successfully migrated {result['events_migrated']} events")
else:
    print(f"Migration failed: {result['message']}")

# Close driver when done
driver.close()

Best Practices

  • Always ensure the Neo4j driver is properly initialized and authenticated before calling this function
  • Consider running this migration in a transaction-safe environment or during maintenance windows to avoid conflicts
  • Verify that the audit_trail_uid exists in the database before providing it, or let the function handle it automatically
  • Check the returned dictionary's 'success' field before assuming the migration completed successfully
  • The function depends on ensure_audit_trail_exists() being available in scope when audit_trail_uid is None
  • Monitor the 'events_migrated' count to verify expected number of events were processed
  • The migration deletes existing HAS_EVENT relationships from CDocs nodes, so ensure this is the desired behavior
  • Error handling is built-in but logs to 'logger', so ensure logging is properly configured
  • This is a destructive operation that modifies graph structure - consider backing up data before running

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function migrate_audit_trail 79.1% similar

    Migrates existing audit events from a legacy format to a new AuditTrail structure in a Neo4j database, returning migration statistics.

    From: /tf/active/vicechatdev/CDocs/db/db_operations.py
  • function get_or_create_audit_trail_uid 62.8% similar

    Retrieves the UID of an existing AuditTrail node from a Neo4j database, or creates a new AuditTrail node with a generated UID if one doesn't exist.

    From: /tf/active/vicechatdev/CDocs/utils/audit_trail.py
  • function ensure_audit_trail_exists 61.2% similar

    Ensures a singleton AuditTrail node exists in a Neo4j database, creating it if necessary, and returns its unique identifier.

    From: /tf/active/vicechatdev/CDocs/db/schema_manager.py
  • function count_audit_events 60.9% similar

    Counts audit events in a Neo4j graph database with flexible filtering options including resource, user, event type, category, and date range.

    From: /tf/active/vicechatdev/CDocs/utils/audit_trail.py
  • function log_event 60.9% similar

    Creates and persists an audit trail event in a Neo4j graph database, linking it to users, resources, and an audit trail node with comprehensive event metadata.

    From: /tf/active/vicechatdev/CDocs/utils/audit_trail.py
← Back to Browse