|
| 1 | +### Core Responsibilities |
| 2 | +The Data Ingest Thread serves as the agent's primary interface to the streaming data ecosystem. Built on `KafkaConsumerBase`, it maintains a persistent connection to configured Kafka topics and transforms raw streaming messages into structured database records. |
| 3 | + |
| 4 | +### Processing Pipeline |
| 5 | +The thread operates through a continuous polling loop that retrieves message batches from Kafka. Each message undergoes parsing to extract sensor readings, timestamps, and metadata. The `AutoencoderDataIngestThread` implementation demonstrates this pattern by parsing JSON messages containing gymnasium environment state data, extracting numeric state values while filtering out non-numeric metadata fieldsI wou. |
| 6 | + |
| 7 | +### Data Storage Strategy |
| 8 | +Parsed data flows into MySQL through the shared `DBManager` instance. The thread uses the `record_sensor_data()` method to store timestamped sensor readings with proper data type conversion (numpy arrays to binary blobs). This creates a persistent training dataset that accumulates over time, providing the foundation for ML model development. |
| 9 | + |
| 10 | +### Error Handling and Resilience |
| 11 | +The thread implements robust error handling at multiple levels: JSON parsing failures are logged but don't terminate processing, database connection issues trigger retry logic, and malformed messages are skipped with appropriate warnings. This ensures that transient data quality issues don't disrupt the overall data flow. |
| 12 | + |
| 13 | +### Configuration Integration |
| 14 | +Topic subscriptions, parsing rules, and storage parameters are all driven by the central configuration file. This allows agents to be reconfigured for different data sources without code changes, supporting the system's flexibility goals. |
| 15 | + |
| 16 | +## User Implementation Requirements |
| 17 | + |
| 18 | +### Single Required Method |
| 19 | +```python |
| 20 | +def store_message(self, message, topic, partition, offset) -> bool: |
| 21 | + # Parse message, validate data, store to database |
| 22 | + # Return True for success, False for failure |
| 23 | +``` |
| 24 | + |
| 25 | +### Implementation Steps |
| 26 | + |
| 27 | +1. **Parse Message**: Decode bytes to string, parse JSON/format |
| 28 | +2. **Extract Data**: Pull relevant fields (timestamps, sensor values, metadata) |
| 29 | +3. **Validate**: Check data types, handle missing fields, filter invalid data |
| 30 | +4. **Transform**: Convert to database schema format (arrays to numpy, timestamps to datetime) |
| 31 | +5. **Store**: Call `self.db_manager.record_sensor_data(data_dict)` |
| 32 | +6. **Return Status**: `True` if successful, `False` if failed |
| 33 | + |
| 34 | +### Configuration Needed |
| 35 | +```python |
| 36 | +config = { |
| 37 | + 'kafka_topics': {'input': 'your-topic-name'} |
| 38 | +} |
| 39 | +``` |
| 40 | + |
| 41 | +### Environment Variables Required |
| 42 | +- `KAFKA_BROKER_URL` |
| 43 | +- `MYSQL_HOST`, `MYSQL_PORT`, `MYSQL_USER`, `MYSQL_ROOT_PASSWORD`, `MYSQL_DATABASE` |
| 44 | + |
| 45 | +## What's Handled Automatically |
| 46 | + |
| 47 | +- Kafka consumer setup/teardown |
| 48 | +- Topic subscription and polling |
| 49 | +- Database connection management |
| 50 | +- Thread lifecycle and health monitoring |
| 51 | +- Error recovery and restart logic |
| 52 | +- Message batching and offset management |
| 53 | + |
| 54 | +## Key Constraints |
| 55 | + |
| 56 | +- **Single-threaded**: Keep `store_message()` fast and efficient |
| 57 | +- **Error handling**: Catch exceptions, log errors, return `False` for failures |
| 58 | +- **Database schema**: Match expected format for `record_sensor_data()` |
| 59 | +- **Memory management**: Don't accumulate state between messages |
| 60 | + |
| 61 | +The base classes handle all infrastructure complexity - users only implement domain-specific data transformation logic. |
0 commit comments