function migrate_audit_events
Migrates orphaned AuditEvent nodes from CDocs nodes to an AuditTrail node in a Neo4j graph database by deleting old relationships and creating new ones.
/tf/active/vicechatdev/CDocs/db/schema_manager.py
632 - 687
moderate
Purpose
This function performs a database migration to reorganize audit event relationships in Neo4j. It finds AuditEvent nodes that are connected to CDocs nodes via HAS_EVENT relationships and reconnects them to a central AuditTrail node instead. This is useful for consolidating audit events under a single audit trail structure, likely as part of a schema refactoring or data cleanup operation. If no audit trail UID is provided, it attempts to find or create one automatically.
Source Code
def migrate_audit_events(driver: Driver, audit_trail_uid: Optional[str] = None) -> Dict:
"""
Migrate orphaned audit events to be connected to the audit trail.
Args:
driver: Neo4j driver instance
audit_trail_uid: Optional UID of the audit trail to connect events to.
If None, will attempt to find the audit trail UID first.
Returns:
Dict containing migration results
"""
try:
# If no audit_trail_uid provided, try to find it first
if audit_trail_uid is None:
audit_trail_uid = ensure_audit_trail_exists(driver)
if not audit_trail_uid:
return {
"success": False,
"message": "Could not find or create audit trail",
"events_migrated": 0
}
# Migration query to connect orphaned audit events
migration_query = """
MATCH (c:CDocs)-[r:HAS_EVENT]->(e:AuditEvent)
WITH c, r, e, count(e) as count
MATCH (t:AuditTrail {UID: $audit_trail_uid})
DELETE r
CREATE (t)-[:HAS_EVENT]->(e)
RETURN count(e) as migrated
"""
with driver.session() as session:
result = session.run(migration_query, {"audit_trail_uid": audit_trail_uid})
record = result.single()
if record:
return {
"success": True,
"message": f"Successfully migrated {record['migrated']} audit events",
"events_migrated": record["migrated"]
}
else:
return {
"success": False,
"message": "Migration query returned no results",
"events_migrated": 0
}
except Exception as e:
logger.error(f"Error migrating audit events: {e}")
return {
"success": False,
"message": f"Error: {str(e)}",
"events_migrated": 0
}
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
driver |
Driver | - | positional_or_keyword |
audit_trail_uid |
Optional[str] | None | positional_or_keyword |
Parameter Details
driver: A Neo4j Driver instance used to establish database connections and execute queries. This should be an active, authenticated connection to a Neo4j database containing the CDocs, AuditEvent, and AuditTrail node types.
audit_trail_uid: Optional string representing the unique identifier (UID) of the AuditTrail node to which orphaned audit events should be connected. If None, the function will call ensure_audit_trail_exists() to find or create an audit trail and obtain its UID. Expected to be a valid UID string that matches an existing AuditTrail node in the database.
Return Value
Type: Dict
Returns a dictionary with three keys: 'success' (boolean indicating whether the migration completed successfully), 'message' (string describing the outcome or error), and 'events_migrated' (integer count of how many AuditEvent nodes were successfully migrated). On success, events_migrated contains the count from the Cypher query; on failure, it is 0.
Dependencies
neo4jloggingtyping
Required Imports
from neo4j import Driver
from typing import Dict, Optional
import logging
Usage Example
from neo4j import GraphDatabase
from typing import Dict, Optional
import logging
# Setup logger
logger = logging.getLogger(__name__)
# Establish Neo4j connection
driver = GraphDatabase.driver(
'bolt://localhost:7687',
auth=('neo4j', 'password')
)
# Option 1: Migrate with specific audit trail UID
result = migrate_audit_events(driver, audit_trail_uid='audit-trail-123')
print(f"Migration result: {result['message']}")
print(f"Events migrated: {result['events_migrated']}")
# Option 2: Let function find/create audit trail automatically
result = migrate_audit_events(driver)
if result['success']:
print(f"Successfully migrated {result['events_migrated']} events")
else:
print(f"Migration failed: {result['message']}")
# Close driver when done
driver.close()
Best Practices
- Always ensure the Neo4j driver is properly initialized and authenticated before calling this function
- Consider running this migration in a transaction-safe environment or during maintenance windows to avoid conflicts
- Verify that the audit_trail_uid exists in the database before providing it, or let the function handle it automatically
- Check the returned dictionary's 'success' field before assuming the migration completed successfully
- The function depends on ensure_audit_trail_exists() being available in scope when audit_trail_uid is None
- Monitor the 'events_migrated' count to verify expected number of events were processed
- The migration deletes existing HAS_EVENT relationships from CDocs nodes, so ensure this is the desired behavior
- Error handling is built-in but logs to 'logger', so ensure logging is properly configured
- This is a destructive operation that modifies graph structure - consider backing up data before running
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function migrate_audit_trail 79.1% similar
-
function get_or_create_audit_trail_uid 62.8% similar
-
function ensure_audit_trail_exists 61.2% similar
-
function count_audit_events 60.9% similar
-
function log_event 60.9% similar