🔍 Code Extractor

function count_audit_events

Maturity: 71

Counts audit events in a Neo4j graph database with flexible filtering options including resource, user, event type, category, and date range.

File:
/tf/active/vicechatdev/CDocs/utils/audit_trail.py
Lines:
519 - 591
Complexity:
moderate

Purpose

This function queries a Neo4j database to count audit events matching specified criteria. It builds a dynamic Cypher query based on provided filters, allowing for flexible audit trail analysis. The function is designed to work with an AuditTrail-AuditEvent graph structure where events can be linked to resources and users. It's useful for generating audit statistics, compliance reporting, and monitoring system activity.

Source Code

def count_audit_events(resource_uid: Optional[str] = None,
                      event_types: Optional[List[str]] = None,
                      event_category: Optional[str] = None,
                      user_uid: Optional[str] = None,
                      start_date: Optional[datetime] = None,
                      end_date: Optional[datetime] = None) -> int:
    """
    Count audit events with optional filtering.
    
    Args:
        resource_uid: Optional UID of resource to count events for
        event_types: Optional list of event types to filter by
        event_category: Optional event category to filter by
        user_uid: Optional UID of user to count events for
        start_date: Optional start date for events
        end_date: Optional end date for events
        
    Returns:
        Number of matching audit events
    """
    try:
        # Start building query - now using AuditTrail
        query = "MATCH (t:AuditTrail)-[:HAS_EVENT]->(e:AuditEvent) "
        conditions = []
        params = {}
        
        # Add resource filter if provided
        if resource_uid:
            query += "MATCH (e)-[:REFERS_TO]->(r {UID: $resource_uid}) "
            params['resource_uid'] = resource_uid
            
        # Add user filter if provided
        if user_uid:
            query += "MATCH (e)-[:PERFORMED_BY]->(u:User {UID: $user_uid}) "
            params['user_uid'] = user_uid
            
        # Add event type filter if provided
        if event_types:
            conditions.append("e.eventType IN $event_types")
            params['event_types'] = event_types
            
        # Add event category filter if provided
        if event_category:
            conditions.append("e.eventCategory = $event_category")
            params['event_category'] = event_category
            
        # Add date range filters if provided
        if start_date:
            conditions.append("e.timestamp >= $start_date")
            params['start_date'] = start_date
            
        if end_date:
            conditions.append("e.timestamp <= $end_date")
            params['end_date'] = end_date
            
        # Add conditions to query if any
        if conditions:
            query += "WHERE " + " AND ".join(conditions) + " "
            
        # Add return
        query += "RETURN count(e) as count"
        
        # Execute query
        result = db.run_query(query, params)
        
        if result and 'count' in result[0]:
            return result[0]['count']
            
        return 0
        
    except Exception as e:
        logger.error(f"Error counting audit events: {e}")
        return 0

Parameters

Name Type Default Kind
resource_uid Optional[str] None positional_or_keyword
event_types Optional[List[str]] None positional_or_keyword
event_category Optional[str] None positional_or_keyword
user_uid Optional[str] None positional_or_keyword
start_date Optional[datetime] None positional_or_keyword
end_date Optional[datetime] None positional_or_keyword

Parameter Details

resource_uid: Optional unique identifier (string) of a resource to filter events. When provided, only counts events that refer to this specific resource via the REFERS_TO relationship.

event_types: Optional list of event type strings to filter by. When provided, only counts events whose eventType property matches one of the values in this list. Examples might include ['CREATE', 'UPDATE', 'DELETE'].

event_category: Optional string representing an event category to filter by. When provided, only counts events whose eventCategory property matches this exact value. Examples might include 'SECURITY', 'DATA_ACCESS', 'CONFIGURATION'.

user_uid: Optional unique identifier (string) of a user to filter events. When provided, only counts events performed by this specific user via the PERFORMED_BY relationship.

start_date: Optional datetime object representing the earliest timestamp for events to count. When provided, only counts events with timestamp >= this value.

end_date: Optional datetime object representing the latest timestamp for events to count. When provided, only counts events with timestamp <= this value.

Return Value

Type: int

Returns an integer representing the count of audit events matching all provided filter criteria. Returns 0 if no matching events are found or if an error occurs during query execution. The count represents the number of AuditEvent nodes that satisfy all specified conditions.

Dependencies

  • logging
  • uuid
  • typing
  • datetime
  • json
  • CDocs.db
  • CDocs.config
  • CDocs.db.schema_manager
  • CDocs.models.user_extensions

Required Imports

from typing import Optional, List
from datetime import datetime
import logging
from CDocs import db

Usage Example

from datetime import datetime, timedelta
from typing import Optional, List
from CDocs import db

# Count all audit events
total_events = count_audit_events()
print(f"Total audit events: {total_events}")

# Count events for a specific resource
resource_events = count_audit_events(resource_uid="res_12345")
print(f"Events for resource: {resource_events}")

# Count events by user within date range
start = datetime.now() - timedelta(days=7)
end = datetime.now()
user_events = count_audit_events(
    user_uid="user_67890",
    start_date=start,
    end_date=end
)
print(f"User events in last 7 days: {user_events}")

# Count specific event types for a resource
security_events = count_audit_events(
    resource_uid="res_12345",
    event_types=["LOGIN", "LOGOUT", "PERMISSION_CHANGE"],
    event_category="SECURITY"
)
print(f"Security events: {security_events}")

# Complex filter combining multiple criteria
filtered_count = count_audit_events(
    resource_uid="res_12345",
    user_uid="user_67890",
    event_types=["UPDATE", "DELETE"],
    event_category="DATA_MODIFICATION",
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 12, 31)
)
print(f"Filtered events: {filtered_count}")

Best Practices

  • Always handle the return value of 0, which could indicate either no matching events or an error condition
  • Use date range filters (start_date and end_date) when querying large audit trails to improve performance
  • Check logs for error messages if unexpected 0 counts are returned, as errors are caught and logged
  • Ensure the Neo4j database schema includes the required node labels (AuditTrail, AuditEvent, User) and relationship types (HAS_EVENT, REFERS_TO, PERFORMED_BY)
  • When filtering by event_types, use a list even for a single type to maintain consistency
  • Consider indexing the timestamp, eventType, and eventCategory properties on AuditEvent nodes for better query performance
  • The function builds dynamic Cypher queries, so ensure all UIDs passed are validated to prevent injection issues
  • Use timezone-aware datetime objects for start_date and end_date to avoid timezone-related counting discrepancies

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function get_audit_events 90.1% similar

    Retrieves audit events from a Neo4j graph database with flexible filtering options including resource, user, event type, category, and date range filters, with pagination support.

    From: /tf/active/vicechatdev/CDocs/utils/audit_trail.py
  • function log_event 69.0% similar

    Creates and persists an audit trail event in a Neo4j graph database, linking it to users, resources, and an audit trail node with comprehensive event metadata.

    From: /tf/active/vicechatdev/CDocs/utils/audit_trail.py
  • function get_audit_trail 67.2% similar

    Retrieves audit trail events from a Neo4j database with optional filtering by date range, action type, and user, returning a list of audit event dictionaries.

    From: /tf/active/vicechatdev/CDocs/controllers/admin_controller.py
  • function get_user_activity 65.9% similar

    Retrieves user activity logs from a Neo4j graph database by querying AuditEvent nodes with flexible filtering options for date ranges, users, and action types.

    From: /tf/active/vicechatdev/CDocs/controllers/admin_controller.py
  • function migrate_audit_events 60.9% similar

    Migrates orphaned AuditEvent nodes from CDocs nodes to an AuditTrail node in a Neo4j graph database by deleting old relationships and creating new ones.

    From: /tf/active/vicechatdev/CDocs/db/schema_manager.py
← Back to Browse