function migrate_audit_events_v1
Migrates orphaned AuditEvent nodes from CDocs nodes to an AuditTrail node in a Neo4j graph database by deleting old relationships and creating new ones.
/tf/active/vicechatdev/CDocs single class/db/schema_manager.py
683 - 739
moderate
Purpose
This function performs a database migration to reorganize audit event relationships in Neo4j. It finds AuditEvent nodes that are connected to CDocs nodes via HAS_EVENT relationships and reconnects them to a central AuditTrail node instead. This is useful for consolidating audit events under a single audit trail structure, likely as part of a schema refactoring or data cleanup operation. If no audit trail UID is provided, it attempts to find or create one automatically.
Source Code
def migrate_audit_events(driver: Driver, audit_trail_uid: Optional[str] = None) -> Dict:
"""
Migrate orphaned audit events to be connected to the audit trail.
Args:
driver: Neo4j driver instance
audit_trail_uid: Optional UID of the audit trail to connect events to.
If None, will attempt to find the audit trail UID first.
Returns:
Dict containing migration results
"""
try:
# If no audit_trail_uid provided, try to find it first
if audit_trail_uid is None:
audit_trail_uid = ensure_audit_trail_exists(driver)
if not audit_trail_uid:
return {
"success": False,
"message": "Could not find or create audit trail",
"events_migrated": 0
}
# Migration query to connect orphaned audit events
migration_query = """
MATCH (c:CDocs)-[r:HAS_EVENT]->(e:AuditEvent)
WITH c, r, e, count(e) as count
MATCH (t:AuditTrail {UID: $audit_trail_uid})
DELETE r
CREATE (t)-[:HAS_EVENT]->(e)
RETURN count(e) as migrated
"""
with driver.session() as session:
result = session.run(migration_query, {"audit_trail_uid": audit_trail_uid})
record = result.single()
if record:
return {
"success": True,
"message": f"Successfully migrated {record['migrated']} audit events",
"events_migrated": record["migrated"]
}
else:
return {
"success": False,
"message": "Migration query returned no results",
"events_migrated": 0
}
except Exception as e:
logger.error(f"Error migrating audit events: {e}")
logger.error(traceback.format_exc())
return {
"success": False,
"message": f"Error: {str(e)}",
"events_migrated": 0
}
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
driver |
Driver | - | positional_or_keyword |
audit_trail_uid |
Optional[str] | None | positional_or_keyword |
Parameter Details
driver: A Neo4j Driver instance used to establish database connections and execute queries. This should be an active, authenticated connection to a Neo4j database containing the CDocs, AuditEvent, and AuditTrail node types.
audit_trail_uid: Optional string representing the unique identifier (UID) of the AuditTrail node to which orphaned audit events should be connected. If None, the function will call ensure_audit_trail_exists() to find or create an audit trail and obtain its UID. Should match the UID property of an existing AuditTrail node in the database.
Return Value
Type: Dict
Returns a dictionary with three keys: 'success' (boolean indicating whether migration completed successfully), 'message' (string describing the outcome or error), and 'events_migrated' (integer count of how many AuditEvent nodes were successfully migrated). On success, events_migrated contains the count; on failure, it will be 0.
Dependencies
neo4jlogginguuidtracebacktypingdatetimeCDocs
Required Imports
from neo4j import Driver
from typing import Dict, Optional
import logging
import traceback
Conditional/Optional Imports
These imports are only needed under specific conditions:
from CDocs import ensure_audit_trail_exists
Condition: Required when audit_trail_uid parameter is None, as the function calls ensure_audit_trail_exists() to obtain the UID
Required (conditional)from CDocs import logger
Condition: Required for error logging functionality within the exception handler
Required (conditional)Usage Example
from neo4j import GraphDatabase
from typing import Dict, Optional
import logging
import traceback
# Assuming ensure_audit_trail_exists and logger are available from CDocs
# from CDocs import ensure_audit_trail_exists, logger
# Setup Neo4j connection
uri = "bolt://localhost:7687"
username = "neo4j"
password = "password"
driver = GraphDatabase.driver(uri, auth=(username, password))
try:
# Option 1: Let function find/create audit trail
result = migrate_audit_events(driver)
print(f"Migration result: {result['message']}")
print(f"Events migrated: {result['events_migrated']}")
# Option 2: Provide specific audit trail UID
specific_uid = "audit-trail-123"
result = migrate_audit_events(driver, audit_trail_uid=specific_uid)
if result['success']:
print(f"Successfully migrated {result['events_migrated']} events")
else:
print(f"Migration failed: {result['message']}")
finally:
driver.close()
Best Practices
- Always close the Neo4j driver connection after use to prevent resource leaks
- Ensure the Neo4j database has appropriate indexes on UID properties for performance
- Run this migration during low-traffic periods as it modifies relationships in the database
- Test the migration on a backup or development database before running in production
- Check the returned 'events_migrated' count to verify expected number of events were migrated
- The function is idempotent - running it multiple times will not duplicate relationships
- Monitor logs for errors as exceptions are caught and logged but don't raise
- Ensure the ensure_audit_trail_exists function is available when audit_trail_uid is None
- Consider backing up the database before running this migration operation
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function migrate_audit_events 98.9% similar
-
function migrate_audit_trail_v1 81.0% similar
-
function migrate_audit_trail 79.3% similar
-
function ensure_audit_trail_exists 64.0% similar
-
function get_or_create_audit_trail_uid 63.9% similar