splunk_local/generators/hec_sender.py
bpmcdevitt ecb505f159 Add comprehensive log generators for realistic test data
- Add four types of log generators: web access logs, syslog messages, JSON application logs, and HEC events
- Implement Docker Compose services with generators profile for easy activation
- Create Python scripts for realistic log generation with varied data patterns
- Update documentation in README.md and CLAUDE.md with usage instructions and generator details
- Support file-based log forwarding and direct HEC event submission for comprehensive testing scenarios

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 17:29:37 -05:00

198 lines
No EOL
6.6 KiB
Python

#!/usr/bin/env python3
"""
HTTP Event Collector (HEC) log sender for Splunk testing
Sends events directly to Splunk HEC endpoint
"""
import time
import random
import datetime
import json
import os
import requests
from urllib3.exceptions import InsecureRequestWarning
# Disable SSL warnings for testing
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class HECSender:
def __init__(self):
self.hec_url = os.getenv('SPLUNK_HEC_URL', 'http://localhost:8088/services/collector')
self.hec_token = os.getenv('SPLUNK_HEC_TOKEN', '00000000-0000-0000-0000-000000000000')
self.headers = {
'Authorization': f'Splunk {self.hec_token}',
'Content-Type': 'application/json'
}
def send_event(self, event_data, source_type=None, source=None, index=None):
"""Send a single event to Splunk HEC"""
event = {
'time': int(time.time()),
'event': event_data
}
if source_type:
event['sourcetype'] = source_type
if source:
event['source'] = source
if index:
event['index'] = index
try:
response = requests.post(
self.hec_url,
headers=self.headers,
data=json.dumps(event),
verify=False,
timeout=10
)
response.raise_for_status()
print(f"✓ Sent event: {event_data.get('message', 'Unknown event')}")
return True
except Exception as e:
print(f"✗ Failed to send event: {e}")
return False
def generate_security_event():
"""Generate security-related events"""
event_types = [
{
'message': 'Failed login attempt detected',
'username': random.choice(['admin', 'root', 'test', 'user123']),
'source_ip': f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}",
'attempts': random.randint(1, 10),
'severity': 'high'
},
{
'message': 'Successful user login',
'username': random.choice(['john.doe', 'jane.smith', 'admin']),
'source_ip': f"192.168.1.{random.randint(1, 254)}",
'session_duration': random.randint(300, 7200),
'severity': 'info'
},
{
'message': 'Suspicious file access detected',
'filename': random.choice(['/etc/passwd', '/etc/shadow', '/var/log/auth.log']),
'user': random.choice(['root', 'www-data', 'unknown']),
'severity': 'medium'
}
]
return random.choice(event_types)
def generate_application_event():
"""Generate application performance events"""
events = [
{
'message': 'Database connection pool exhausted',
'service': 'user-service',
'active_connections': random.randint(90, 100),
'max_connections': 100,
'severity': 'critical'
},
{
'message': 'High memory usage detected',
'service': random.choice(['web-server', 'api-gateway', 'cache-service']),
'memory_usage_percent': random.randint(85, 99),
'threshold': 85,
'severity': 'warning'
},
{
'message': 'Service health check passed',
'service': random.choice(['payment-service', 'notification-service']),
'response_time_ms': random.randint(50, 200),
'status': 'healthy',
'severity': 'info'
}
]
return random.choice(events)
def generate_business_event():
"""Generate business metrics events"""
events = [
{
'message': 'Order completed',
'order_id': f"ORD-{random.randint(100000, 999999)}",
'customer_id': f"CUST-{random.randint(1000, 9999)}",
'amount': round(random.uniform(10.0, 500.0), 2),
'currency': 'USD',
'payment_method': random.choice(['credit_card', 'paypal', 'bank_transfer'])
},
{
'message': 'Product viewed',
'product_id': f"PROD-{random.randint(1000, 9999)}",
'customer_id': f"CUST-{random.randint(1000, 9999)}",
'category': random.choice(['electronics', 'clothing', 'books', 'home']),
'price': round(random.uniform(5.0, 200.0), 2)
},
{
'message': 'Cart abandoned',
'session_id': f"sess_{random.randint(100000, 999999)}",
'items_count': random.randint(1, 5),
'cart_value': round(random.uniform(20.0, 300.0), 2),
'time_on_site_minutes': random.randint(5, 60)
}
]
return random.choice(events)
def main():
sender = HECSender()
print(f"Starting HEC sender to {sender.hec_url}")
print("Waiting for Splunk to be ready...")
# Wait for Splunk to be ready
max_retries = 30
for i in range(max_retries):
try:
response = requests.get(
sender.hec_url.replace('/services/collector', '/services/collector/health'),
headers={'Authorization': f'Splunk {sender.hec_token}'},
verify=False,
timeout=5
)
if response.status_code == 200:
print("✓ Splunk HEC is ready!")
break
except Exception:
pass
print(f"Waiting for Splunk... ({i+1}/{max_retries})")
time.sleep(10)
else:
print("⚠ Could not connect to Splunk HEC, but continuing anyway...")
event_generators = [
('security', generate_security_event),
('application', generate_application_event),
('business', generate_business_event)
]
while True:
try:
# Generate random event
source_type, generator = random.choice(event_generators)
event_data = generator()
# Add timestamp
event_data['timestamp'] = datetime.datetime.now().isoformat()
event_data['host'] = 'log-generator'
# Send to HEC
sender.send_event(
event_data,
source_type=f'generator:{source_type}',
source='log_generator_hec'
)
# Random delay between 3-12 seconds
time.sleep(random.uniform(3, 12))
except KeyboardInterrupt:
print("Stopping HEC sender...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(10)
if __name__ == "__main__":
main()