- Add four types of log generators: web access logs, syslog messages, JSON application logs, and HEC events - Implement Docker Compose services with generators profile for easy activation - Create Python scripts for realistic log generation with varied data patterns - Update documentation in README.md and CLAUDE.md with usage instructions and generator details - Support file-based log forwarding and direct HEC event submission for comprehensive testing scenarios 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
198 lines
No EOL
6.6 KiB
Python
198 lines
No EOL
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
HTTP Event Collector (HEC) log sender for Splunk testing
|
|
Sends events directly to Splunk HEC endpoint
|
|
"""
|
|
|
|
import time
|
|
import random
|
|
import datetime
|
|
import json
|
|
import os
|
|
import requests
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
|
|
# Disable SSL warnings for testing
|
|
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
|
|
|
class HECSender:
|
|
def __init__(self):
|
|
self.hec_url = os.getenv('SPLUNK_HEC_URL', 'http://localhost:8088/services/collector')
|
|
self.hec_token = os.getenv('SPLUNK_HEC_TOKEN', '00000000-0000-0000-0000-000000000000')
|
|
self.headers = {
|
|
'Authorization': f'Splunk {self.hec_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
def send_event(self, event_data, source_type=None, source=None, index=None):
|
|
"""Send a single event to Splunk HEC"""
|
|
event = {
|
|
'time': int(time.time()),
|
|
'event': event_data
|
|
}
|
|
|
|
if source_type:
|
|
event['sourcetype'] = source_type
|
|
if source:
|
|
event['source'] = source
|
|
if index:
|
|
event['index'] = index
|
|
|
|
try:
|
|
response = requests.post(
|
|
self.hec_url,
|
|
headers=self.headers,
|
|
data=json.dumps(event),
|
|
verify=False,
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
print(f"✓ Sent event: {event_data.get('message', 'Unknown event')}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Failed to send event: {e}")
|
|
return False
|
|
|
|
def generate_security_event():
|
|
"""Generate security-related events"""
|
|
event_types = [
|
|
{
|
|
'message': 'Failed login attempt detected',
|
|
'username': random.choice(['admin', 'root', 'test', 'user123']),
|
|
'source_ip': f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}",
|
|
'attempts': random.randint(1, 10),
|
|
'severity': 'high'
|
|
},
|
|
{
|
|
'message': 'Successful user login',
|
|
'username': random.choice(['john.doe', 'jane.smith', 'admin']),
|
|
'source_ip': f"192.168.1.{random.randint(1, 254)}",
|
|
'session_duration': random.randint(300, 7200),
|
|
'severity': 'info'
|
|
},
|
|
{
|
|
'message': 'Suspicious file access detected',
|
|
'filename': random.choice(['/etc/passwd', '/etc/shadow', '/var/log/auth.log']),
|
|
'user': random.choice(['root', 'www-data', 'unknown']),
|
|
'severity': 'medium'
|
|
}
|
|
]
|
|
return random.choice(event_types)
|
|
|
|
def generate_application_event():
|
|
"""Generate application performance events"""
|
|
events = [
|
|
{
|
|
'message': 'Database connection pool exhausted',
|
|
'service': 'user-service',
|
|
'active_connections': random.randint(90, 100),
|
|
'max_connections': 100,
|
|
'severity': 'critical'
|
|
},
|
|
{
|
|
'message': 'High memory usage detected',
|
|
'service': random.choice(['web-server', 'api-gateway', 'cache-service']),
|
|
'memory_usage_percent': random.randint(85, 99),
|
|
'threshold': 85,
|
|
'severity': 'warning'
|
|
},
|
|
{
|
|
'message': 'Service health check passed',
|
|
'service': random.choice(['payment-service', 'notification-service']),
|
|
'response_time_ms': random.randint(50, 200),
|
|
'status': 'healthy',
|
|
'severity': 'info'
|
|
}
|
|
]
|
|
return random.choice(events)
|
|
|
|
def generate_business_event():
|
|
"""Generate business metrics events"""
|
|
events = [
|
|
{
|
|
'message': 'Order completed',
|
|
'order_id': f"ORD-{random.randint(100000, 999999)}",
|
|
'customer_id': f"CUST-{random.randint(1000, 9999)}",
|
|
'amount': round(random.uniform(10.0, 500.0), 2),
|
|
'currency': 'USD',
|
|
'payment_method': random.choice(['credit_card', 'paypal', 'bank_transfer'])
|
|
},
|
|
{
|
|
'message': 'Product viewed',
|
|
'product_id': f"PROD-{random.randint(1000, 9999)}",
|
|
'customer_id': f"CUST-{random.randint(1000, 9999)}",
|
|
'category': random.choice(['electronics', 'clothing', 'books', 'home']),
|
|
'price': round(random.uniform(5.0, 200.0), 2)
|
|
},
|
|
{
|
|
'message': 'Cart abandoned',
|
|
'session_id': f"sess_{random.randint(100000, 999999)}",
|
|
'items_count': random.randint(1, 5),
|
|
'cart_value': round(random.uniform(20.0, 300.0), 2),
|
|
'time_on_site_minutes': random.randint(5, 60)
|
|
}
|
|
]
|
|
return random.choice(events)
|
|
|
|
def main():
|
|
sender = HECSender()
|
|
|
|
print(f"Starting HEC sender to {sender.hec_url}")
|
|
print("Waiting for Splunk to be ready...")
|
|
|
|
# Wait for Splunk to be ready
|
|
max_retries = 30
|
|
for i in range(max_retries):
|
|
try:
|
|
response = requests.get(
|
|
sender.hec_url.replace('/services/collector', '/services/collector/health'),
|
|
headers={'Authorization': f'Splunk {sender.hec_token}'},
|
|
verify=False,
|
|
timeout=5
|
|
)
|
|
if response.status_code == 200:
|
|
print("✓ Splunk HEC is ready!")
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
print(f"Waiting for Splunk... ({i+1}/{max_retries})")
|
|
time.sleep(10)
|
|
else:
|
|
print("⚠ Could not connect to Splunk HEC, but continuing anyway...")
|
|
|
|
event_generators = [
|
|
('security', generate_security_event),
|
|
('application', generate_application_event),
|
|
('business', generate_business_event)
|
|
]
|
|
|
|
while True:
|
|
try:
|
|
# Generate random event
|
|
source_type, generator = random.choice(event_generators)
|
|
event_data = generator()
|
|
|
|
# Add timestamp
|
|
event_data['timestamp'] = datetime.datetime.now().isoformat()
|
|
event_data['host'] = 'log-generator'
|
|
|
|
# Send to HEC
|
|
sender.send_event(
|
|
event_data,
|
|
source_type=f'generator:{source_type}',
|
|
source='log_generator_hec'
|
|
)
|
|
|
|
# Random delay between 3-12 seconds
|
|
time.sleep(random.uniform(3, 12))
|
|
|
|
except KeyboardInterrupt:
|
|
print("Stopping HEC sender...")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
time.sleep(10)
|
|
|
|
if __name__ == "__main__":
|
|
main() |