class EInkFileHandler
A file system event handler that monitors a directory for new files and automatically processes them asynchronously when detected.
/tf/active/vicechatdev/e-ink-llm/processor.py
29 - 59
moderate
Purpose
EInkFileHandler extends FileSystemEventHandler to watch for file creation events in a monitored directory. When a new supported file is created (excluding response files), it triggers asynchronous processing after a delay to ensure the file is fully written. It maintains state to prevent duplicate processing of the same file and integrates with an InputProcessor instance to handle the actual file processing logic.
Source Code
class EInkFileHandler(FileSystemEventHandler):
"""File system event handler for processing new files"""
def __init__(self, processor_instance):
self.processor = processor_instance
self.processing_files = set() # Track files currently being processed
def on_created(self, event):
"""Handle new file creation"""
if not event.is_directory:
file_path = Path(event.src_path)
# Check if it's a supported file type and not already being processed
if (InputProcessor.is_supported_file(file_path) and
file_path not in self.processing_files and
not file_path.name.startswith('RESPONSE_')): # Don't process our own output files
print(f"📁 New file detected: {file_path.name}")
# Add a small delay to ensure file is fully written
asyncio.create_task(self._delayed_process(file_path))
async def _delayed_process(self, file_path: Path):
"""Process file with a small delay to ensure it's fully written"""
await asyncio.sleep(2) # Wait 2 seconds for file to be fully written
if file_path.exists() and file_path not in self.processing_files:
self.processing_files.add(file_path)
try:
await self.processor.process_file(file_path)
finally:
self.processing_files.discard(file_path)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
FileSystemEventHandler | - |
Parameter Details
processor_instance: An instance of InputProcessor (or compatible processor) that provides the process_file() coroutine method and is_supported_file() static method. This processor handles the actual file processing logic once a file is detected and validated.
Return Value
The __init__ method returns an instance of EInkFileHandler. The on_created method returns None (implicit). The _delayed_process method is an async coroutine that returns None after processing completes or if the file is already being processed.
Class Interface
Methods
__init__(self, processor_instance)
Purpose: Initialize the file handler with a processor instance and set up tracking for files being processed
Parameters:
processor_instance: An InputProcessor or compatible object that provides process_file() async method and is_supported_file() static method
Returns: None (constructor)
on_created(self, event)
Purpose: Callback method triggered by watchdog when a new file is created in the monitored directory. Validates the file and schedules async processing if appropriate.
Parameters:
event: FileSystemEvent object from watchdog containing src_path and is_directory attributes
Returns: None
async _delayed_process(self, file_path: Path)
Purpose: Asynchronously process a file after a 2-second delay to ensure it's fully written. Tracks processing state to prevent duplicates and ensures cleanup in finally block.
Parameters:
file_path: Path object representing the file to be processed
Returns: None (async coroutine)
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
processor |
InputProcessor (or compatible) | The processor instance used to handle actual file processing via its process_file() method | instance |
processing_files |
set | A set of Path objects currently being processed, used to prevent duplicate processing of the same file | instance |
Dependencies
asynciopathlibwatchdoginput_processor
Required Imports
import asyncio
from pathlib import Path
from watchdog.events import FileSystemEventHandler
from input_processor import InputProcessor
Usage Example
from pathlib import Path
import asyncio
from watchdog.observers import Observer
from input_processor import InputProcessor
from eink_file_handler import EInkFileHandler
# Create processor instance
processor = InputProcessor()
# Create file handler with processor
file_handler = EInkFileHandler(processor)
# Set up observer to watch a directory
observer = Observer()
watch_path = Path('./watched_folder')
observer.schedule(file_handler, str(watch_path), recursive=False)
# Start observing
observer.start()
try:
# Keep running (file handler will process files automatically)
while True:
asyncio.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
Best Practices
- Always ensure an async event loop is running before using this handler, as it creates async tasks
- The handler includes a 2-second delay before processing to ensure files are fully written - adjust if needed for larger files
- Files starting with 'RESPONSE_' are automatically excluded to prevent processing output files
- The processing_files set prevents duplicate processing - files are tracked during processing and removed when complete
- Must be used with watchdog.observers.Observer to actually monitor the file system
- The processor_instance must implement async process_file(file_path) method and static is_supported_file(file_path) method
- Handle exceptions in the processor's process_file method to prevent handler crashes
- The handler only processes files (not directories) and only supported file types
- Files are only processed if they exist at the time of delayed processing (handles quick deletions)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class OneDriveProcessor 53.2% similar
-
class RemarkableEInkProcessor 48.0% similar
-
class RemarkableRestFileWatcher 45.6% similar
-
function run_demo 44.3% similar
-
function main_v68 43.7% similar