splunk_local/generators/windows_events.py
bpmcdevitt fa8fd73f1a Add security-focused log generators for SOC and SIEM testing
- Implement 5 new security log generators: Windows events, firewall logs, DNS queries, authentication logs, and cloud service logs
- Add 'security' Docker Compose profile for easy deployment of security generators
- Windows generator creates realistic Security/System/Application events with attack patterns (failed logins, account creation, service events)
- Firewall generator supports pfSense, iptables, and Cisco ASA formats with malicious traffic blocking simulation
- DNS generator includes DGA domains, suspicious lookups, and multiple DNS server formats (BIND, Pi-hole, Windows DNS)
- Authentication generator creates LDAP, RADIUS, and SSH logs with brute force attack patterns
- Cloud generator produces AWS CloudTrail, Azure Activity, and GCP audit logs with security-relevant events
- Update documentation with comprehensive security use cases for SOC training, threat hunting, and compliance testing
- Enhance Docker Compose configuration with new security profile and service definitions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 17:44:47 -05:00

290 lines
No EOL
9.4 KiB
Python

#!/usr/bin/env python3
"""
Windows Event Log generator for Splunk security testing
Generates realistic Windows Security, System, and Application event logs
"""
import time
import random
import datetime
import xml.etree.ElementTree as ET
from pathlib import Path
# Common Windows usernames and computer names
USERNAMES = [
'Administrator', 'admin', 'john.doe', 'jane.smith', 'service_account',
'backup_user', 'guest', 'SYSTEM', 'LOCAL SERVICE', 'NETWORK SERVICE',
'test.user', 'contractor01', 'dev.account', 'sql_service'
]
COMPUTER_NAMES = [
'DC01', 'WS001', 'WS002', 'SQL01', 'FILE01', 'PRINT01',
'LAPTOP-USER01', 'DESKTOP-ADMIN', 'SRV-EXCHANGE', 'WEB01'
]
DOMAINS = ['CORP', 'DOMAIN', 'LOCAL', 'COMPANY']
# Windows Event IDs and descriptions
SECURITY_EVENTS = {
4624: 'An account was successfully logged on',
4625: 'An account failed to log on',
4648: 'A logon was attempted using explicit credentials',
4720: 'A user account was created',
4722: 'A user account was enabled',
4725: 'A user account was disabled',
4726: 'A user account was deleted',
4728: 'A member was added to a security-enabled global group',
4732: 'A member was added to a security-enabled local group',
4740: 'A user account was locked out',
4767: 'A user account was unlocked',
4768: 'A Kerberos authentication ticket (TGT) was requested',
4769: 'A Kerberos service ticket was requested',
4771: 'Kerberos pre-authentication failed'
}
SYSTEM_EVENTS = {
7034: 'A service crashed unexpectedly',
7035: 'The Service Control Manager sent a control to a service',
7036: 'A service was started or stopped',
7040: 'The start type of a service was changed',
6005: 'The Event Log service was started',
6006: 'The Event Log service was stopped',
6009: 'The system was started',
6013: 'The system uptime'
}
APPLICATION_EVENTS = {
1000: 'Application Error',
1001: 'Application Hang',
1002: 'Application Recovery',
11707: 'Installation completed successfully',
11708: 'Installation failed',
11724: 'Removal completed successfully'
}
LOGON_TYPES = {
2: 'Interactive',
3: 'Network',
4: 'Batch',
5: 'Service',
7: 'Unlock',
8: 'NetworkCleartext',
9: 'NewCredentials',
10: 'RemoteInteractive',
11: 'CachedInteractive'
}
def generate_ip():
"""Generate realistic internal IP addresses"""
ranges = [
(192, 168, random.randint(1, 254), random.randint(1, 254)),
(10, random.randint(0, 255), random.randint(0, 255), random.randint(1, 254)),
(172, random.randint(16, 31), random.randint(0, 255), random.randint(1, 254))
]
return '.'.join(map(str, random.choice(ranges)))
def generate_security_event():
"""Generate Windows Security event"""
event_id = random.choice(list(SECURITY_EVENTS.keys()))
description = SECURITY_EVENTS[event_id]
username = random.choice(USERNAMES)
computer = random.choice(COMPUTER_NAMES)
domain = random.choice(DOMAINS)
source_ip = generate_ip()
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
# Special handling for different event types
if event_id in [4624, 4625]: # Logon events
logon_type = random.choice(list(LOGON_TYPES.keys()))
logon_type_desc = LOGON_TYPES[logon_type]
process_name = random.choice(['winlogon.exe', 'explorer.exe', 'svchost.exe'])
if event_id == 4625: # Failed logon
failure_reason = random.choice([
'Unknown user name or bad password',
'User account restriction',
'Account currently disabled',
'Account logon time restriction violation'
])
status = '0xc000006d'
else:
failure_reason = ''
status = '0x0'
event_data = f"""EventCode={event_id}
EventType=Audit Success
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Microsoft Windows security auditing
User={domain}\\{username}
LogonType={logon_type}
LogonTypeDescription={logon_type_desc}
WorkstationName={computer}
SourceNetworkAddress={source_ip}
ProcessName=C:\\Windows\\System32\\{process_name}
Status={status}
FailureReason={failure_reason}
Description={description}"""
elif event_id in [4720, 4722, 4725, 4726]: # Account management
target_user = random.choice(USERNAMES)
event_data = f"""EventCode={event_id}
EventType=Audit Success
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Microsoft Windows security auditing
User={domain}\\{username}
TargetUserName={target_user}
TargetDomainName={domain}
Description={description}"""
else: # Other security events
event_data = f"""EventCode={event_id}
EventType=Audit Success
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Microsoft Windows security auditing
User={domain}\\{username}
Description={description}"""
return event_data
def generate_system_event():
"""Generate Windows System event"""
event_id = random.choice(list(SYSTEM_EVENTS.keys()))
description = SYSTEM_EVENTS[event_id]
computer = random.choice(COMPUTER_NAMES)
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
if event_id in [7034, 7035, 7036]: # Service events
services = [
'Windows Update', 'Print Spooler', 'Task Scheduler',
'Windows Search', 'DHCP Client', 'DNS Client',
'SQL Server', 'IIS Admin Service', 'Apache2.4'
]
service_name = random.choice(services)
if event_id == 7036:
state = random.choice(['running', 'stopped'])
event_data = f"""EventCode={event_id}
EventType=Information
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Service Control Manager
ServiceName={service_name}
State={state}
Description=The {service_name} service entered the {state} state."""
else:
event_data = f"""EventCode={event_id}
EventType=Warning
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Service Control Manager
ServiceName={service_name}
Description={description}"""
else: # Other system events
event_data = f"""EventCode={event_id}
EventType=Information
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=EventLog
Description={description}"""
return event_data
def generate_application_event():
"""Generate Windows Application event"""
event_id = random.choice(list(APPLICATION_EVENTS.keys()))
description = APPLICATION_EVENTS[event_id]
computer = random.choice(COMPUTER_NAMES)
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
applications = [
'chrome.exe', 'firefox.exe', 'outlook.exe', 'winword.exe',
'excel.exe', 'notepad.exe', 'calculator.exe', 'explorer.exe',
'java.exe', 'python.exe', 'svchost.exe'
]
app_name = random.choice(applications)
if event_id in [1000, 1001]: # Application errors
event_type = 'Error'
fault_module = random.choice(['ntdll.dll', 'kernel32.dll', 'user32.dll', app_name])
event_data = f"""EventCode={event_id}
EventType={event_type}
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Application Error
ApplicationName={app_name}
FaultingModule={fault_module}
Description={description}"""
else: # Installation events
event_type = 'Information'
product_name = random.choice([
'Microsoft Office 2019', 'Google Chrome', 'Adobe Reader',
'Java Runtime Environment', 'Visual Studio Code'
])
event_data = f"""EventCode={event_id}
EventType={event_type}
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Windows Installer
ProductName={product_name}
Description={description}"""
return event_data
def main():
# Create log files
security_log = Path("/var/log/app/windows_security.log")
system_log = Path("/var/log/app/windows_system.log")
application_log = Path("/var/log/app/windows_application.log")
for log_file in [security_log, system_log, application_log]:
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting Windows Event Log generator...")
while True:
try:
# Generate random event type (weighted towards security events)
event_type = random.choices(
['security', 'system', 'application'],
weights=[50, 25, 25],
k=1
)[0]
if event_type == 'security':
event_data = generate_security_event()
log_file = security_log
elif event_type == 'system':
event_data = generate_system_event()
log_file = system_log
else:
event_data = generate_application_event()
log_file = application_log
# Write event to appropriate log file
with open(log_file, "a") as f:
f.write(event_data + "\n\n")
print(f"Generated {event_type} event: {event_data.split('EventCode=')[1].split()[0]}")
# Random delay between 5-30 seconds
time.sleep(random.uniform(5, 30))
except KeyboardInterrupt:
print("Stopping Windows Event Log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()