Wazuh-DFN Architecture¶
This document describes the architecture of the wazuh-dfn integration, focusing on its asyncio-based design and component interaction.
Overview¶
Wazuh-DFN is built as a set of asynchronous services that work together to process Wazuh alerts and forward them to DFN-CERT Kafka endpoints. The architecture is designed to be:
High performance: Process large volumes of alerts with minimal overhead
Resilient: Handle errors gracefully and recover from failures
Extensible: Allow new alert handlers to be added easily
Configurable: Support multiple configuration methods
Main Service Orchestrator¶
The main module (main.py
) acts as an orchestrator that:
Loads and validates configuration
Initializes all services
Sets up signal handlers for graceful shutdown
Uses
asyncio.TaskGroup
to manage concurrent service tasks
The orchestrator follows modern asyncio practices:
async def setup_service(config: Config) -> None:
"""Set up and run the Wazuh DFN service using asyncio."""
shutdown_event = asyncio.Event()
alert_queue = AsyncMaxSizeQueue(maxsize=config.wazuh.json_alert_queue_size)
# Initialize core services
wazuh_service = WazuhService(config=config.wazuh)
await wazuh_service.start()
# Initialize more services...
# Use Python 3.11+ task groups for cleaner task management
async with asyncio.TaskGroup() as tg:
# Start all services as concurrent tasks
tg.create_task(kafka_service.start(), name="KafkaService")
tg.create_task(alerts_worker_service.start(), name="AlertsWorkerService")
tg.create_task(alerts_watcher_service.start(), name="AlertsWatcherService")
# Wait until shutdown is signaled
await shutdown_event.wait()
Services¶
All services are designed to work asynchronously and can be categorized into:
Core Services¶
WazuhService: Handles communication with the Wazuh server
Manages socket connections (Unix or TCP)
Sends events and errors
Handles reconnection logic
Uses asyncio streams for efficient I/O
KafkaService: Handles communication with Kafka
Uses aiokafka for asynchronous Kafka operations
Manages producer connections
Sends messages to Kafka topics
Validates topic existence
Handles TLS/SSL security
Implements retry logic with exponential backoff
Alert Processing Services¶
AlertsWatcherService: Monitors alert files for new alerts
Uses
FileMonitor
to read and parse JSON alertsAdds alerts to the processing queue
Handles file rotation detection
AlertsWorkerService: Processes alerts from the queue
Creates a pool of worker tasks using asyncio.TaskGroup
Distributes alerts to workers
Handles processing errors
Manages worker lifecycle
AlertsService: Delegates alerts to specialized handlers
Determines the handler based on alert type
Coordinates processing between handlers
Auxiliary Services¶
LoggingService: Handles logging and statistics
Configures logging
Periodically logs statistics
Monitors system resources using psutil
Reports queue sizes and processing rates
Specialized Handlers¶
SyslogHandler: Processes syslog-specific alerts (e.g., fail2ban) - Formats events according to RFC 5424 - Filters internal IPs based on configuration - Sends events to Kafka and Wazuh
WindowsHandler: Processes Windows event log alerts - Handles specific Windows event IDs - Formats events in Windows XML format - Preserves original event structure
Helper Components¶
AsyncMaxSizeQueue: Queue implementation with specific features: - Discards oldest items when full - Implements asynchronous put/get operations - Tracks discarded items and provides statistics - Prevents memory overflow during traffic spikes
FileMonitor: Advanced file monitoring with: - Asynchronous file operations using aiofiles - Rotation detection via inode tracking - Partial alert handling across reads - Buffer management for large alerts - Character encoding handling
Asynchronous Flow¶
The asynchronous flow of the application follows these steps:
Initialization: Services are initialized and connected
Alert Monitoring: The
FileMonitor
reads new alerts from Wazuh alert filesQueueing: New alerts are added to the
AsyncMaxSizeQueue
Processing: Worker tasks process alerts from the queue
Handling: Specialized handlers format and process alerts based on type
Sending: Processed alerts are sent to Kafka and confirmation is sent to Wazuh
This flow is fully asynchronous, allowing for:
Concurrent processing of multiple alerts
Non-blocking I/O operations
Efficient use of system resources
Graceful handling of backpressure
Asyncio Task Management¶
The application uses modern asyncio patterns:
Task Groups: Python 3.11+ TaskGroup for managing related tasks
Named Tasks: All tasks are named for better observability
Cancellation Handling: Proper handling of CancelledError
Lock Protection: AsyncLock for thread-safe access to shared resources
Event Signaling: AsyncEvent for coordination between services
Error Handling and Recovery¶
The architecture includes several mechanisms for error handling and recovery:
Reconnection Logic: Services automatically reconnect on connection failures
Retry Logic: Failed operations are retried with exponential backoff
Queue Management: Overflow protection ensures system stability under load
Task Management: Proper task cancellation and cleanup during shutdown
Failed Alert Storage: Option to store alerts that fail processing for later analysis
Character Encoding Handling: Automatic handling of encoding issues in alerts
Configuration¶
The system supports multiple configuration methods:
Files: YAML or TOML configuration files
Environment Variables: For containerized environments
Command-line Arguments: For direct configuration
Configuration is validated using Pydantic models with:
Type checking and custom validators
Certificate validation for SSL/TLS
Automatic generation of sample configurations
Secure handling of sensitive information
Performance Considerations¶
The asyncio-based architecture provides several performance benefits:
Non-blocking I/O: All I/O operations are non-blocking
Worker Pool: Configurable number of worker tasks for alert processing
Buffer Management: Efficient buffer management for file monitoring
Queue Sizing: Configurable queue sizes to balance memory usage and throughput
Batching: Message batching for efficient Kafka communication
Resource Monitoring: Built-in monitoring of queue sizes and processing rates
Extension Points¶
To extend the system with new functionality:
Add new handlers in the
handlers
directoryRegister them in the
AlertsService
Implement the necessary processing logic
Example handler implementation pattern:
class NewHandler:
"""Handler for a new alert type."""
def __init__(self, kafka_service, wazuh_service):
self.kafka_service = kafka_service
self.wazuh_service = wazuh_service
async def process_alert(self, alert: dict) -> None:
"""Process a new type of alert asynchronously."""
if self._is_relevant_alert(alert):
message = self._create_message(alert)
await self.kafka_service.send_message(message)
await self.wazuh_service.send_event(alert)
def _is_relevant_alert(self, alert: dict) -> bool:
"""Determine if this alert should be processed by this handler."""
# Implementation
def _create_message(self, alert: dict) -> dict:
"""Create a message for Kafka from the alert."""
# Implementation