function count_audit_events
Counts audit events in a Neo4j graph database with flexible filtering options including resource, user, event type, category, and date range.
/tf/active/vicechatdev/CDocs/utils/audit_trail.py
519 - 591
moderate
Purpose
This function queries a Neo4j database to count audit events matching specified criteria. It builds a dynamic Cypher query based on provided filters, allowing for flexible audit trail analysis. The function is designed to work with an AuditTrail-AuditEvent graph structure where events can be linked to resources and users. It's useful for generating audit statistics, compliance reporting, and monitoring system activity.
Source Code
def count_audit_events(resource_uid: Optional[str] = None,
event_types: Optional[List[str]] = None,
event_category: Optional[str] = None,
user_uid: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> int:
"""
Count audit events with optional filtering.
Args:
resource_uid: Optional UID of resource to count events for
event_types: Optional list of event types to filter by
event_category: Optional event category to filter by
user_uid: Optional UID of user to count events for
start_date: Optional start date for events
end_date: Optional end date for events
Returns:
Number of matching audit events
"""
try:
# Start building query - now using AuditTrail
query = "MATCH (t:AuditTrail)-[:HAS_EVENT]->(e:AuditEvent) "
conditions = []
params = {}
# Add resource filter if provided
if resource_uid:
query += "MATCH (e)-[:REFERS_TO]->(r {UID: $resource_uid}) "
params['resource_uid'] = resource_uid
# Add user filter if provided
if user_uid:
query += "MATCH (e)-[:PERFORMED_BY]->(u:User {UID: $user_uid}) "
params['user_uid'] = user_uid
# Add event type filter if provided
if event_types:
conditions.append("e.eventType IN $event_types")
params['event_types'] = event_types
# Add event category filter if provided
if event_category:
conditions.append("e.eventCategory = $event_category")
params['event_category'] = event_category
# Add date range filters if provided
if start_date:
conditions.append("e.timestamp >= $start_date")
params['start_date'] = start_date
if end_date:
conditions.append("e.timestamp <= $end_date")
params['end_date'] = end_date
# Add conditions to query if any
if conditions:
query += "WHERE " + " AND ".join(conditions) + " "
# Add return
query += "RETURN count(e) as count"
# Execute query
result = db.run_query(query, params)
if result and 'count' in result[0]:
return result[0]['count']
return 0
except Exception as e:
logger.error(f"Error counting audit events: {e}")
return 0
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
resource_uid |
Optional[str] | None | positional_or_keyword |
event_types |
Optional[List[str]] | None | positional_or_keyword |
event_category |
Optional[str] | None | positional_or_keyword |
user_uid |
Optional[str] | None | positional_or_keyword |
start_date |
Optional[datetime] | None | positional_or_keyword |
end_date |
Optional[datetime] | None | positional_or_keyword |
Parameter Details
resource_uid: Optional unique identifier (string) of a resource to filter events. When provided, only counts events that refer to this specific resource via the REFERS_TO relationship.
event_types: Optional list of event type strings to filter by. When provided, only counts events whose eventType property matches one of the values in this list. Examples might include ['CREATE', 'UPDATE', 'DELETE'].
event_category: Optional string representing an event category to filter by. When provided, only counts events whose eventCategory property matches this exact value. Examples might include 'SECURITY', 'DATA_ACCESS', 'CONFIGURATION'.
user_uid: Optional unique identifier (string) of a user to filter events. When provided, only counts events performed by this specific user via the PERFORMED_BY relationship.
start_date: Optional datetime object representing the earliest timestamp for events to count. When provided, only counts events with timestamp >= this value.
end_date: Optional datetime object representing the latest timestamp for events to count. When provided, only counts events with timestamp <= this value.
Return Value
Type: int
Returns an integer representing the count of audit events matching all provided filter criteria. Returns 0 if no matching events are found or if an error occurs during query execution. The count represents the number of AuditEvent nodes that satisfy all specified conditions.
Dependencies
logginguuidtypingdatetimejsonCDocs.dbCDocs.configCDocs.db.schema_managerCDocs.models.user_extensions
Required Imports
from typing import Optional, List
from datetime import datetime
import logging
from CDocs import db
Usage Example
from datetime import datetime, timedelta
from typing import Optional, List
from CDocs import db
# Count all audit events
total_events = count_audit_events()
print(f"Total audit events: {total_events}")
# Count events for a specific resource
resource_events = count_audit_events(resource_uid="res_12345")
print(f"Events for resource: {resource_events}")
# Count events by user within date range
start = datetime.now() - timedelta(days=7)
end = datetime.now()
user_events = count_audit_events(
user_uid="user_67890",
start_date=start,
end_date=end
)
print(f"User events in last 7 days: {user_events}")
# Count specific event types for a resource
security_events = count_audit_events(
resource_uid="res_12345",
event_types=["LOGIN", "LOGOUT", "PERMISSION_CHANGE"],
event_category="SECURITY"
)
print(f"Security events: {security_events}")
# Complex filter combining multiple criteria
filtered_count = count_audit_events(
resource_uid="res_12345",
user_uid="user_67890",
event_types=["UPDATE", "DELETE"],
event_category="DATA_MODIFICATION",
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 12, 31)
)
print(f"Filtered events: {filtered_count}")
Best Practices
- Always handle the return value of 0, which could indicate either no matching events or an error condition
- Use date range filters (start_date and end_date) when querying large audit trails to improve performance
- Check logs for error messages if unexpected 0 counts are returned, as errors are caught and logged
- Ensure the Neo4j database schema includes the required node labels (AuditTrail, AuditEvent, User) and relationship types (HAS_EVENT, REFERS_TO, PERFORMED_BY)
- When filtering by event_types, use a list even for a single type to maintain consistency
- Consider indexing the timestamp, eventType, and eventCategory properties on AuditEvent nodes for better query performance
- The function builds dynamic Cypher queries, so ensure all UIDs passed are validated to prevent injection issues
- Use timezone-aware datetime objects for start_date and end_date to avoid timezone-related counting discrepancies
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function get_audit_events 90.1% similar
-
function log_event 69.0% similar
-
function get_audit_trail 67.2% similar
-
function get_user_activity 65.9% similar
-
function migrate_audit_events 60.9% similar