#!/usr/bin/env python3 """ Syslog message generator for Splunk testing Generates RFC3164 compliant syslog messages """ import time import random import datetime from pathlib import Path # Syslog facilities and severities FACILITIES = { 'kern': 0, 'user': 1, 'mail': 2, 'daemon': 3, 'auth': 4, 'syslog': 5, 'lpr': 6, 'news': 7, 'uucp': 8, 'cron': 9, 'authpriv': 10, 'ftp': 11, 'local0': 16, 'local1': 17, 'local2': 18, 'local3': 19, 'local4': 20, 'local5': 21, 'local6': 22, 'local7': 23 } SEVERITIES = { 'emergency': 0, 'alert': 1, 'critical': 2, 'error': 3, 'warning': 4, 'notice': 5, 'info': 6, 'debug': 7 } HOSTNAMES = ['web01', 'db01', 'app01', 'cache01', 'lb01', 'monitor01'] PROGRAMS = [ 'sshd', 'httpd', 'mysqld', 'nginx', 'systemd', 'kernel', 'postfix', 'cron', 'sudo', 'firewall', 'docker', 'kubelet' ] MESSAGE_TEMPLATES = { 'sshd': [ "Accepted publickey for user from {ip} port {port} ssh2", "Failed password for invalid user admin from {ip} port {port} ssh2", "Connection closed by {ip} port {port}", "pam_unix(sshd:session): session opened for user {user}" ], 'httpd': [ "Server seems busy, (you may need to increase StartServers, or Min/MaxSpareServers)", "child pid {pid} exit signal Segmentation fault (11)", "caught SIGTERM, shutting down" ], 'systemd': [ "Started {service}.service", "Stopped {service}.service", "Failed to start {service}.service", "Reloading." ], 'kernel': [ "Out of memory: Kill process {pid} ({process}) score {score}", "TCP: time wait bucket table overflow", "device eth0 entered promiscuous mode" ] } def generate_ip(): return f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}" def generate_syslog_message(): facility = random.choice(list(FACILITIES.keys())) severity = random.choice(list(SEVERITIES.keys())) priority = FACILITIES[facility] * 8 + SEVERITIES[severity] timestamp = datetime.datetime.now().strftime("%b %d %H:%M:%S") hostname = random.choice(HOSTNAMES) program = random.choice(PROGRAMS) pid = random.randint(1000, 99999) # Generate message based on program if program in MESSAGE_TEMPLATES: template = random.choice(MESSAGE_TEMPLATES[program]) message = template.format( ip=generate_ip(), port=random.randint(1024, 65535), user=random.choice(['root', 'admin', 'ubuntu', 'centos', 'deploy']), pid=pid, process=random.choice(['apache2', 'nginx', 'mysql', 'postgres']), score=random.randint(0, 1000), service=random.choice(['nginx', 'mysql', 'redis', 'postgresql']) ) else: messages = [ "Process started successfully", "Configuration reloaded", "Connection established", "Service is running normally", "Warning: high memory usage detected", "Error: unable to connect to database", "Critical: disk space low" ] message = random.choice(messages) # RFC3164 format: timestamp hostname program[pid]: message syslog_msg = f"<{priority}>{timestamp} {hostname} {program}[{pid}]: {message}" return syslog_msg def main(): log_file = Path("/var/log/app/syslog.log") log_file.parent.mkdir(parents=True, exist_ok=True) print("Starting syslog generator...") while True: try: log_entry = generate_syslog_message() with open(log_file, "a") as f: f.write(log_entry + "\n") print(f"Generated: {log_entry}") # Random delay between 2-15 seconds time.sleep(random.uniform(2, 15)) except KeyboardInterrupt: print("Stopping syslog generator...") break except Exception as e: print(f"Error: {e}") time.sleep(5) if __name__ == "__main__": main()