splunk_local/generators/json_logs.py
bpmcdevitt ecb505f159 Add comprehensive log generators for realistic test data
- Add four types of log generators: web access logs, syslog messages, JSON application logs, and HEC events
- Implement Docker Compose services with generators profile for easy activation
- Create Python scripts for realistic log generation with varied data patterns
- Update documentation in README.md and CLAUDE.md with usage instructions and generator details
- Support file-based log forwarding and direct HEC event submission for comprehensive testing scenarios

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 17:29:37 -05:00

138 lines
No EOL
5.2 KiB
Python

#!/usr/bin/env python3
"""
JSON log generator for Splunk testing
Generates structured JSON logs for application monitoring
"""
import time
import random
import datetime
import json
from pathlib import Path
SERVICES = ['user-service', 'payment-service', 'inventory-service', 'notification-service']
LOG_LEVELS = ['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']
ENDPOINTS = ['/api/v1/users', '/api/v1/payments', '/api/v1/inventory', '/api/v1/orders']
USERS = ['user123', 'admin', 'john.doe', 'jane.smith', 'api_client_001']
TRANSACTION_TYPES = ['purchase', 'refund', 'transfer', 'deposit', 'withdrawal']
def generate_user_event():
return {
"timestamp": datetime.datetime.now().isoformat(),
"level": random.choice(LOG_LEVELS),
"service": "user-service",
"message": "User authentication event",
"user_id": random.choice(USERS),
"action": random.choice(['login', 'logout', 'password_change', 'profile_update']),
"ip_address": f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"session_id": f"sess_{random.randint(100000, 999999)}",
"success": random.choice([True, False])
}
def generate_api_event():
status_code = random.choice([200, 200, 200, 201, 400, 401, 403, 404, 500])
return {
"timestamp": datetime.datetime.now().isoformat(),
"level": "INFO" if status_code < 400 else "ERROR",
"service": random.choice(SERVICES),
"message": "API request processed",
"endpoint": random.choice(ENDPOINTS),
"method": random.choice(['GET', 'POST', 'PUT', 'DELETE']),
"status_code": status_code,
"response_time_ms": random.randint(10, 2000),
"user_id": random.choice(USERS + [None]),
"request_id": f"req_{random.randint(1000000, 9999999)}",
"bytes_sent": random.randint(100, 50000)
}
def generate_payment_event():
return {
"timestamp": datetime.datetime.now().isoformat(),
"level": "INFO",
"service": "payment-service",
"message": "Payment transaction processed",
"transaction_id": f"txn_{random.randint(1000000, 9999999)}",
"user_id": random.choice(USERS),
"amount": round(random.uniform(1.0, 1000.0), 2),
"currency": random.choice(['USD', 'EUR', 'GBP']),
"transaction_type": random.choice(TRANSACTION_TYPES),
"payment_method": random.choice(['credit_card', 'debit_card', 'paypal', 'bank_transfer']),
"status": random.choice(['completed', 'pending', 'failed']),
"merchant_id": f"merchant_{random.randint(1000, 9999)}"
}
def generate_error_event():
return {
"timestamp": datetime.datetime.now().isoformat(),
"level": random.choice(['ERROR', 'FATAL']),
"service": random.choice(SERVICES),
"message": "Application error occurred",
"error_type": random.choice(['DatabaseError', 'ValidationError', 'TimeoutError', 'AuthenticationError']),
"error_message": random.choice([
"Connection timeout to database",
"Invalid user credentials",
"Required field missing",
"External service unavailable",
"Rate limit exceeded"
]),
"stack_trace": "java.lang.Exception: at com.example.service.method(Service.java:123)",
"user_id": random.choice(USERS + [None]),
"request_id": f"req_{random.randint(1000000, 9999999)}"
}
def generate_performance_event():
return {
"timestamp": datetime.datetime.now().isoformat(),
"level": "INFO",
"service": "monitoring-service",
"message": "Performance metrics",
"metric_type": "system_stats",
"cpu_usage_percent": round(random.uniform(10, 95), 2),
"memory_usage_percent": round(random.uniform(20, 90), 2),
"disk_usage_percent": round(random.uniform(30, 85), 2),
"active_connections": random.randint(10, 500),
"requests_per_second": random.randint(1, 100),
"avg_response_time_ms": random.randint(50, 1500)
}
EVENT_GENERATORS = [
generate_user_event,
generate_api_event,
generate_payment_event,
generate_error_event,
generate_performance_event
]
def main():
log_file = Path("/var/log/app/application.json")
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting JSON log generator...")
while True:
try:
# Generate random event type
generator = random.choice(EVENT_GENERATORS)
event = generator()
# Write as JSON line
json_line = json.dumps(event)
with open(log_file, "a") as f:
f.write(json_line + "\n")
print(f"Generated: {event['service']} - {event['message']}")
# Random delay between 1-8 seconds
time.sleep(random.uniform(1, 8))
except KeyboardInterrupt:
print("Stopping JSON log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()