🔍 Code Extractor

function migrate_audit_events_v1

Maturity: 63

Migrates orphaned AuditEvent nodes from CDocs nodes to an AuditTrail node in a Neo4j graph database by deleting old relationships and creating new ones.

File:
/tf/active/vicechatdev/CDocs single class/db/schema_manager.py
Lines:
683 - 739
Complexity:
moderate

Purpose

This function performs a database migration to reorganize audit event relationships in Neo4j. It finds AuditEvent nodes that are connected to CDocs nodes via HAS_EVENT relationships and reconnects them to a central AuditTrail node instead. This is useful for consolidating audit events under a single audit trail structure, likely as part of a schema refactoring or data cleanup operation. If no audit trail UID is provided, it attempts to find or create one automatically.

Source Code

def migrate_audit_events(driver: Driver, audit_trail_uid: Optional[str] = None) -> Dict:
    """
    Migrate orphaned audit events to be connected to the audit trail.
    
    Args:
        driver: Neo4j driver instance
        audit_trail_uid: Optional UID of the audit trail to connect events to.
                         If None, will attempt to find the audit trail UID first.
        
    Returns:
        Dict containing migration results
    """
    try:
        # If no audit_trail_uid provided, try to find it first
        if audit_trail_uid is None:
            audit_trail_uid = ensure_audit_trail_exists(driver)
            if not audit_trail_uid:
                return {
                    "success": False,
                    "message": "Could not find or create audit trail",
                    "events_migrated": 0
                }
        
        # Migration query to connect orphaned audit events
        migration_query = """
        MATCH (c:CDocs)-[r:HAS_EVENT]->(e:AuditEvent)
        WITH c, r, e, count(e) as count
        MATCH (t:AuditTrail {UID: $audit_trail_uid})
        DELETE r
        CREATE (t)-[:HAS_EVENT]->(e)
        RETURN count(e) as migrated
        """
        
        with driver.session() as session:
            result = session.run(migration_query, {"audit_trail_uid": audit_trail_uid})
            record = result.single()
            
            if record:
                return {
                    "success": True,
                    "message": f"Successfully migrated {record['migrated']} audit events",
                    "events_migrated": record["migrated"]
                }
            else:
                return {
                    "success": False,
                    "message": "Migration query returned no results",
                    "events_migrated": 0
                }
    except Exception as e:
        logger.error(f"Error migrating audit events: {e}")
        logger.error(traceback.format_exc())
        return {
            "success": False,
            "message": f"Error: {str(e)}",
            "events_migrated": 0
        }

Parameters

Name Type Default Kind
driver Driver - positional_or_keyword
audit_trail_uid Optional[str] None positional_or_keyword

Parameter Details

driver: A Neo4j Driver instance used to establish database connections and execute queries. This should be an active, authenticated connection to a Neo4j database containing the CDocs, AuditEvent, and AuditTrail node types.

audit_trail_uid: Optional string representing the unique identifier (UID) of the AuditTrail node to which orphaned audit events should be connected. If None, the function will call ensure_audit_trail_exists() to find or create an audit trail and obtain its UID. Should match the UID property of an existing AuditTrail node in the database.

Return Value

Type: Dict

Returns a dictionary with three keys: 'success' (boolean indicating whether migration completed successfully), 'message' (string describing the outcome or error), and 'events_migrated' (integer count of how many AuditEvent nodes were successfully migrated). On success, events_migrated contains the count; on failure, it will be 0.

Dependencies

  • neo4j
  • logging
  • uuid
  • traceback
  • typing
  • datetime
  • CDocs

Required Imports

from neo4j import Driver
from typing import Dict, Optional
import logging
import traceback

Conditional/Optional Imports

These imports are only needed under specific conditions:

from CDocs import ensure_audit_trail_exists

Condition: Required when audit_trail_uid parameter is None, as the function calls ensure_audit_trail_exists() to obtain the UID

Required (conditional)
from CDocs import logger

Condition: Required for error logging functionality within the exception handler

Required (conditional)

Usage Example

from neo4j import GraphDatabase
from typing import Dict, Optional
import logging
import traceback

# Assuming ensure_audit_trail_exists and logger are available from CDocs
# from CDocs import ensure_audit_trail_exists, logger

# Setup Neo4j connection
uri = "bolt://localhost:7687"
username = "neo4j"
password = "password"
driver = GraphDatabase.driver(uri, auth=(username, password))

try:
    # Option 1: Let function find/create audit trail
    result = migrate_audit_events(driver)
    print(f"Migration result: {result['message']}")
    print(f"Events migrated: {result['events_migrated']}")
    
    # Option 2: Provide specific audit trail UID
    specific_uid = "audit-trail-123"
    result = migrate_audit_events(driver, audit_trail_uid=specific_uid)
    
    if result['success']:
        print(f"Successfully migrated {result['events_migrated']} events")
    else:
        print(f"Migration failed: {result['message']}")
finally:
    driver.close()

Best Practices

  • Always close the Neo4j driver connection after use to prevent resource leaks
  • Ensure the Neo4j database has appropriate indexes on UID properties for performance
  • Run this migration during low-traffic periods as it modifies relationships in the database
  • Test the migration on a backup or development database before running in production
  • Check the returned 'events_migrated' count to verify expected number of events were migrated
  • The function is idempotent - running it multiple times will not duplicate relationships
  • Monitor logs for errors as exceptions are caught and logged but don't raise
  • Ensure the ensure_audit_trail_exists function is available when audit_trail_uid is None
  • Consider backing up the database before running this migration operation

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function migrate_audit_events 98.9% similar

    Migrates orphaned AuditEvent nodes from CDocs nodes to an AuditTrail node in a Neo4j graph database by deleting old relationships and creating new ones.

    From: /tf/active/vicechatdev/CDocs/db/schema_manager.py
  • function migrate_audit_trail_v1 81.0% similar

    Migrates existing audit events from a legacy format to a new AuditTrail structure in a Neo4j database, returning migration statistics.

    From: /tf/active/vicechatdev/CDocs single class/db/db_operations.py
  • function migrate_audit_trail 79.3% similar

    Migrates existing audit events from a legacy format to a new AuditTrail structure in a Neo4j database, returning migration statistics.

    From: /tf/active/vicechatdev/CDocs/db/db_operations.py
  • function ensure_audit_trail_exists 64.0% similar

    Ensures a singleton AuditTrail node exists in a Neo4j database, creating it if absent, and returns its unique identifier.

    From: /tf/active/vicechatdev/CDocs single class/db/schema_manager.py
  • function get_or_create_audit_trail_uid 63.9% similar

    Retrieves the UID of an existing AuditTrail node from a Neo4j database, or creates a new AuditTrail node with a generated UID if one doesn't exist.

    From: /tf/active/vicechatdev/CDocs/utils/audit_trail.py
← Back to Browse