splunk_local/generators/syslog_generator.py
bpmcdevitt ecb505f159 Add comprehensive log generators for realistic test data
- Add four types of log generators: web access logs, syslog messages, JSON application logs, and HEC events
- Implement Docker Compose services with generators profile for easy activation
- Create Python scripts for realistic log generation with varied data patterns
- Update documentation in README.md and CLAUDE.md with usage instructions and generator details
- Support file-based log forwarding and direct HEC event submission for comprehensive testing scenarios

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 17:29:37 -05:00

124 lines
No EOL
4 KiB
Python

#!/usr/bin/env python3
"""
Syslog message generator for Splunk testing
Generates RFC3164 compliant syslog messages
"""
import time
import random
import datetime
from pathlib import Path
# Syslog facilities and severities
FACILITIES = {
'kern': 0, 'user': 1, 'mail': 2, 'daemon': 3, 'auth': 4, 'syslog': 5,
'lpr': 6, 'news': 7, 'uucp': 8, 'cron': 9, 'authpriv': 10, 'ftp': 11,
'local0': 16, 'local1': 17, 'local2': 18, 'local3': 19, 'local4': 20,
'local5': 21, 'local6': 22, 'local7': 23
}
SEVERITIES = {
'emergency': 0, 'alert': 1, 'critical': 2, 'error': 3,
'warning': 4, 'notice': 5, 'info': 6, 'debug': 7
}
HOSTNAMES = ['web01', 'db01', 'app01', 'cache01', 'lb01', 'monitor01']
PROGRAMS = [
'sshd', 'httpd', 'mysqld', 'nginx', 'systemd', 'kernel', 'postfix',
'cron', 'sudo', 'firewall', 'docker', 'kubelet'
]
MESSAGE_TEMPLATES = {
'sshd': [
"Accepted publickey for user from {ip} port {port} ssh2",
"Failed password for invalid user admin from {ip} port {port} ssh2",
"Connection closed by {ip} port {port}",
"pam_unix(sshd:session): session opened for user {user}"
],
'httpd': [
"Server seems busy, (you may need to increase StartServers, or Min/MaxSpareServers)",
"child pid {pid} exit signal Segmentation fault (11)",
"caught SIGTERM, shutting down"
],
'systemd': [
"Started {service}.service",
"Stopped {service}.service",
"Failed to start {service}.service",
"Reloading."
],
'kernel': [
"Out of memory: Kill process {pid} ({process}) score {score}",
"TCP: time wait bucket table overflow",
"device eth0 entered promiscuous mode"
]
}
def generate_ip():
return f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
def generate_syslog_message():
facility = random.choice(list(FACILITIES.keys()))
severity = random.choice(list(SEVERITIES.keys()))
priority = FACILITIES[facility] * 8 + SEVERITIES[severity]
timestamp = datetime.datetime.now().strftime("%b %d %H:%M:%S")
hostname = random.choice(HOSTNAMES)
program = random.choice(PROGRAMS)
pid = random.randint(1000, 99999)
# Generate message based on program
if program in MESSAGE_TEMPLATES:
template = random.choice(MESSAGE_TEMPLATES[program])
message = template.format(
ip=generate_ip(),
port=random.randint(1024, 65535),
user=random.choice(['root', 'admin', 'ubuntu', 'centos', 'deploy']),
pid=pid,
process=random.choice(['apache2', 'nginx', 'mysql', 'postgres']),
score=random.randint(0, 1000),
service=random.choice(['nginx', 'mysql', 'redis', 'postgresql'])
)
else:
messages = [
"Process started successfully",
"Configuration reloaded",
"Connection established",
"Service is running normally",
"Warning: high memory usage detected",
"Error: unable to connect to database",
"Critical: disk space low"
]
message = random.choice(messages)
# RFC3164 format: <priority>timestamp hostname program[pid]: message
syslog_msg = f"<{priority}>{timestamp} {hostname} {program}[{pid}]: {message}"
return syslog_msg
def main():
log_file = Path("/var/log/app/syslog.log")
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting syslog generator...")
while True:
try:
log_entry = generate_syslog_message()
with open(log_file, "a") as f:
f.write(log_entry + "\n")
print(f"Generated: {log_entry}")
# Random delay between 2-15 seconds
time.sleep(random.uniform(2, 15))
except KeyboardInterrupt:
print("Stopping syslog generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()