#!/usr/bin/env python3 """ HTTP Event Collector (HEC) log sender for Splunk testing Sends events directly to Splunk HEC endpoint """ import time import random import datetime import json import os import requests from urllib3.exceptions import InsecureRequestWarning # Disable SSL warnings for testing requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class HECSender: def __init__(self): self.hec_url = os.getenv('SPLUNK_HEC_URL', 'http://localhost:8088/services/collector') self.hec_token = os.getenv('SPLUNK_HEC_TOKEN', '00000000-0000-0000-0000-000000000000') self.headers = { 'Authorization': f'Splunk {self.hec_token}', 'Content-Type': 'application/json' } def send_event(self, event_data, source_type=None, source=None, index=None): """Send a single event to Splunk HEC""" event = { 'time': int(time.time()), 'event': event_data } if source_type: event['sourcetype'] = source_type if source: event['source'] = source if index: event['index'] = index try: response = requests.post( self.hec_url, headers=self.headers, data=json.dumps(event), verify=False, timeout=10 ) response.raise_for_status() print(f"✓ Sent event: {event_data.get('message', 'Unknown event')}") return True except Exception as e: print(f"✗ Failed to send event: {e}") return False def generate_security_event(): """Generate security-related events""" event_types = [ { 'message': 'Failed login attempt detected', 'username': random.choice(['admin', 'root', 'test', 'user123']), 'source_ip': f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}", 'attempts': random.randint(1, 10), 'severity': 'high' }, { 'message': 'Successful user login', 'username': random.choice(['john.doe', 'jane.smith', 'admin']), 'source_ip': f"192.168.1.{random.randint(1, 254)}", 'session_duration': random.randint(300, 7200), 'severity': 'info' }, { 'message': 'Suspicious file access detected', 'filename': random.choice(['/etc/passwd', '/etc/shadow', '/var/log/auth.log']), 'user': random.choice(['root', 'www-data', 'unknown']), 'severity': 'medium' } ] return random.choice(event_types) def generate_application_event(): """Generate application performance events""" events = [ { 'message': 'Database connection pool exhausted', 'service': 'user-service', 'active_connections': random.randint(90, 100), 'max_connections': 100, 'severity': 'critical' }, { 'message': 'High memory usage detected', 'service': random.choice(['web-server', 'api-gateway', 'cache-service']), 'memory_usage_percent': random.randint(85, 99), 'threshold': 85, 'severity': 'warning' }, { 'message': 'Service health check passed', 'service': random.choice(['payment-service', 'notification-service']), 'response_time_ms': random.randint(50, 200), 'status': 'healthy', 'severity': 'info' } ] return random.choice(events) def generate_business_event(): """Generate business metrics events""" events = [ { 'message': 'Order completed', 'order_id': f"ORD-{random.randint(100000, 999999)}", 'customer_id': f"CUST-{random.randint(1000, 9999)}", 'amount': round(random.uniform(10.0, 500.0), 2), 'currency': 'USD', 'payment_method': random.choice(['credit_card', 'paypal', 'bank_transfer']) }, { 'message': 'Product viewed', 'product_id': f"PROD-{random.randint(1000, 9999)}", 'customer_id': f"CUST-{random.randint(1000, 9999)}", 'category': random.choice(['electronics', 'clothing', 'books', 'home']), 'price': round(random.uniform(5.0, 200.0), 2) }, { 'message': 'Cart abandoned', 'session_id': f"sess_{random.randint(100000, 999999)}", 'items_count': random.randint(1, 5), 'cart_value': round(random.uniform(20.0, 300.0), 2), 'time_on_site_minutes': random.randint(5, 60) } ] return random.choice(events) def main(): sender = HECSender() print(f"Starting HEC sender to {sender.hec_url}") print("Waiting for Splunk to be ready...") # Wait for Splunk to be ready max_retries = 30 for i in range(max_retries): try: response = requests.get( sender.hec_url.replace('/services/collector', '/services/collector/health'), headers={'Authorization': f'Splunk {sender.hec_token}'}, verify=False, timeout=5 ) if response.status_code == 200: print("✓ Splunk HEC is ready!") break except Exception: pass print(f"Waiting for Splunk... ({i+1}/{max_retries})") time.sleep(10) else: print("⚠ Could not connect to Splunk HEC, but continuing anyway...") event_generators = [ ('security', generate_security_event), ('application', generate_application_event), ('business', generate_business_event) ] while True: try: # Generate random event source_type, generator = random.choice(event_generators) event_data = generator() # Add timestamp event_data['timestamp'] = datetime.datetime.now().isoformat() event_data['host'] = 'log-generator' # Send to HEC sender.send_event( event_data, source_type=f'generator:{source_type}', source='log_generator_hec' ) # Random delay between 3-12 seconds time.sleep(random.uniform(3, 12)) except KeyboardInterrupt: print("Stopping HEC sender...") break except Exception as e: print(f"Error: {e}") time.sleep(10) if __name__ == "__main__": main()