Add security-focused log generators for SOC and SIEM testing

- Implement 5 new security log generators: Windows events, firewall logs, DNS queries, authentication logs, and cloud service logs
- Add 'security' Docker Compose profile for easy deployment of security generators
- Windows generator creates realistic Security/System/Application events with attack patterns (failed logins, account creation, service events)
- Firewall generator supports pfSense, iptables, and Cisco ASA formats with malicious traffic blocking simulation
- DNS generator includes DGA domains, suspicious lookups, and multiple DNS server formats (BIND, Pi-hole, Windows DNS)
- Authentication generator creates LDAP, RADIUS, and SSH logs with brute force attack patterns
- Cloud generator produces AWS CloudTrail, Azure Activity, and GCP audit logs with security-relevant events
- Update documentation with comprehensive security use cases for SOC training, threat hunting, and compliance testing
- Enhance Docker Compose configuration with new security profile and service definitions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Brendan McDevitt 2025-07-19 17:44:47 -05:00
parent ecb505f159
commit fa8fd73f1a
8 changed files with 1620 additions and 12 deletions

View file

@ -16,7 +16,8 @@ This is a Docker Compose-based local Splunk testing environment. The setup inclu
- Start Splunk: `docker-compose up -d`
- Start with forwarder: `docker-compose --profile forwarder up -d`
- Start with log generators: `docker-compose --profile generators up -d`
- Start everything: `docker-compose --profile forwarder --profile generators up -d`
- Start with security generators: `docker-compose --profile security up -d`
- Start everything: `docker-compose --profile forwarder --profile generators --profile security up -d`
- Stop services: `docker-compose down`
- Reset all data: `docker-compose down -v`
- View logs: `docker-compose logs splunk`
@ -36,6 +37,11 @@ This is a Docker Compose-based local Splunk testing environment. The setup inclu
- **log_generator_syslog**: Syslog message generator (profile: generators)
- **log_generator_json**: JSON application log generator (profile: generators)
- **log_generator_hec**: HTTP Event Collector sender (profile: generators)
- **log_generator_windows**: Windows Event Log generator (profile: security)
- **log_generator_firewall**: Firewall log generator (profile: security)
- **log_generator_dns**: DNS query log generator (profile: security)
- **log_generator_auth**: Authentication log generator (profile: security)
- **log_generator_cloud**: Cloud service log generator (profile: security)
### Key Directories
- `config/`: Splunk configuration files mounted to container
@ -59,32 +65,66 @@ This is a Docker Compose-based local Splunk testing environment. The setup inclu
## Log Generators
The environment includes four types of log generators:
The environment includes multiple log generators organized into two profiles:
### Web Access Logs
### Basic Generators (Profile: generators)
#### Web Access Logs
- **File**: `generators/web_logs.py`
- **Output**: `/logs/web_access.log`
- **Format**: Apache Common Log Format with User-Agent
- **Content**: Realistic web server access logs with various IPs, paths, status codes
### Syslog Messages
#### Syslog Messages
- **File**: `generators/syslog_generator.py`
- **Output**: `/logs/syslog.log`
- **Format**: RFC3164 compliant syslog
- **Content**: System messages from various services (sshd, httpd, systemd, etc.)
### JSON Application Logs
#### JSON Application Logs
- **File**: `generators/json_logs.py`
- **Output**: `/logs/application.json`
- **Format**: Structured JSON logs
- **Content**: User events, API calls, payments, errors, performance metrics
### HTTP Event Collector (HEC)
#### HTTP Event Collector (HEC)
- **File**: `generators/hec_sender.py`
- **Target**: Direct HEC endpoint
- **Format**: JSON events via HTTP
- **Content**: Security events, application metrics, business events
### Security Generators (Profile: security)
#### Windows Event Logs
- **File**: `generators/windows_events.py`
- **Output**: `/logs/windows_security.log`, `/logs/windows_system.log`, `/logs/windows_application.log`
- **Format**: Windows Event Log format
- **Content**: Security events (4624, 4625, 4720), system events, application errors with attack patterns
#### Firewall Logs
- **File**: `generators/firewall_logs.py`
- **Output**: `/logs/firewall.log`
- **Format**: pfSense, iptables, Cisco ASA formats
- **Content**: Allow/deny rules, attack blocking, suspicious traffic patterns
#### DNS Query Logs
- **File**: `generators/dns_logs.py`
- **Output**: `/logs/dns_queries.log`
- **Format**: BIND, syslog, Windows DNS, Pi-hole formats
- **Content**: Normal queries, DGA domains, suspicious lookups, malicious domain patterns
#### Authentication Logs
- **File**: `generators/auth_logs.py`
- **Output**: `/logs/ldap_auth.log`, `/logs/radius_auth.log`, `/logs/ssh_auth.log`
- **Format**: LDAP, RADIUS, SSH authentication formats
- **Content**: Login attempts, brute force attacks, account management events
#### Cloud Service Logs
- **File**: `generators/cloud_logs.py`
- **Output**: `/logs/aws_cloudtrail.json`, `/logs/azure_activity.json`, `/logs/gcp_audit.json`
- **Format**: Native cloud provider JSON formats
- **Content**: API calls, resource changes, privilege escalation, security events
## Testing Workflows
The environment is designed for:
@ -94,3 +134,8 @@ The environment is designed for:
- API integration testing with HEC
- Performance testing with high-volume log generation
- Different log format parsing and field extraction
- **SOC/SIEM Testing**: Detection rule validation with realistic attack patterns
- **Security Training**: Hands-on experience with security event analysis
- **Threat Hunting**: Practice identifying advanced persistent threats
- **Incident Response**: Simulated security incidents for response training
- **Compliance Testing**: Generate logs for security framework validation

View file

@ -35,15 +35,21 @@ docker-compose --profile forwarder up -d
```
### Log Generators (Optional)
To enable log generators for testing data ingestion:
To enable basic log generators for testing data ingestion:
```bash
docker-compose --profile generators up -d
```
### Combined Setup
To run everything together (Splunk + forwarder + generators):
### Security Log Generators (Optional)
To enable security-focused log generators for SOC/SIEM testing:
```bash
docker-compose --profile forwarder --profile generators up -d
docker-compose --profile security up -d
```
### Combined Setup
To run everything together (Splunk + forwarder + all generators):
```bash
docker-compose --profile forwarder --profile generators --profile security up -d
```
## Configuration
@ -69,21 +75,47 @@ docker-compose down -v
The environment includes multiple log generators to create realistic test data:
### Available Generators
#### Basic Generators (Profile: `generators`)
- **Web Access Logs** (`log_generator_web`): Apache-style access logs with realistic traffic patterns
- **Syslog Messages** (`log_generator_syslog`): RFC3164 compliant system logs from various services
- **JSON Application Logs** (`log_generator_json`): Structured application logs with user events, API calls, and metrics
- **HTTP Event Collector** (`log_generator_hec`): Direct event submission to Splunk HEC endpoint
#### Security Generators (Profile: `security`)
- **Windows Event Logs** (`log_generator_windows`): Windows Security, System, and Application event logs with attack patterns
- **Firewall Logs** (`log_generator_firewall`): Multi-format firewall logs (pfSense, iptables, Cisco ASA) with blocked attacks
- **DNS Query Logs** (`log_generator_dns`): DNS queries with DGA domains, suspicious lookups, and malicious domain patterns
- **Authentication Logs** (`log_generator_auth`): LDAP, RADIUS, and SSH authentication events with brute force patterns
- **Cloud Service Logs** (`log_generator_cloud`): AWS CloudTrail, Azure Activity, and GCP audit logs with security events
### Log Output Locations
#### Basic Generator Outputs
- Web logs: `./logs/web_access.log`
- Syslog: `./logs/syslog.log`
- JSON logs: `./logs/application.json`
- HEC events: Sent directly to Splunk HEC
#### Security Generator Outputs
- Windows events: `./logs/windows_security.log`, `./logs/windows_system.log`, `./logs/windows_application.log`
- Firewall logs: `./logs/firewall.log`
- DNS queries: `./logs/dns_queries.log`
- Authentication: `./logs/ldap_auth.log`, `./logs/radius_auth.log`, `./logs/ssh_auth.log`
- Cloud logs: `./logs/aws_cloudtrail.json`, `./logs/azure_activity.json`, `./logs/gcp_audit.json`
### Adding New Generators
1. Create your generator script in the `./generators/` directory
2. Add a new service to `docker-compose.yml` under the `generators` profile
2. Add a new service to `docker-compose.yml` under the appropriate profile (`generators` or `security`)
3. Mount `./generators:/app` and optionally `./logs:/var/log/app` volumes
4. Update this README with your new generator's details
### Security Use Cases
The security generators are designed for:
- **SOC Training**: Realistic attack patterns and security events for analyst training
- **SIEM Testing**: Detection rule validation and alert tuning
- **Threat Hunting**: Practice identifying advanced persistent threats and anomalous behavior
- **Incident Response**: Simulated security incidents for response procedure testing
- **Compliance**: Generate logs for security framework compliance testing
The Universal Forwarder will automatically pick up and forward any new log files placed in the `./logs/` directory.

View file

@ -89,6 +89,67 @@ services:
profiles:
- generators
# Security-focused log generators
log_generator_windows:
image: python:3.11-slim
container_name: log_generator_windows
working_dir: /app
command: python windows_events.py
volumes:
- ./generators:/app
- ./logs:/var/log/app
restart: unless-stopped
profiles:
- security
log_generator_firewall:
image: python:3.11-slim
container_name: log_generator_firewall
working_dir: /app
command: python firewall_logs.py
volumes:
- ./generators:/app
- ./logs:/var/log/app
restart: unless-stopped
profiles:
- security
log_generator_dns:
image: python:3.11-slim
container_name: log_generator_dns
working_dir: /app
command: python dns_logs.py
volumes:
- ./generators:/app
- ./logs:/var/log/app
restart: unless-stopped
profiles:
- security
log_generator_auth:
image: python:3.11-slim
container_name: log_generator_auth
working_dir: /app
command: python auth_logs.py
volumes:
- ./generators:/app
- ./logs:/var/log/app
restart: unless-stopped
profiles:
- security
log_generator_cloud:
image: python:3.11-slim
container_name: log_generator_cloud
working_dir: /app
command: python cloud_logs.py
volumes:
- ./generators:/app
- ./logs:/var/log/app
restart: unless-stopped
profiles:
- security
volumes:
splunk_etc:
splunk_var:

277
generators/auth_logs.py Normal file
View file

@ -0,0 +1,277 @@
#!/usr/bin/env python3
"""
Authentication log generator for Splunk security testing
Generates realistic LDAP and RADIUS authentication logs with attack patterns
"""
import time
import random
import datetime
from pathlib import Path
# User accounts for authentication
USERNAMES = [
'john.doe', 'jane.smith', 'admin', 'administrator', 'service_account',
'backup_user', 'test.user', 'contractor01', 'dev.account', 'sql_service',
'web_service', 'ldap_bind', 'monitoring', 'guest', 'temp_user'
]
# Common password attack usernames
ATTACK_USERNAMES = [
'admin', 'administrator', 'root', 'test', 'guest', 'user', 'sa', 'oracle',
'postgres', 'mysql', 'ftp', 'www', 'mail', 'email', 'operator', 'manager'
]
# Organizational units for LDAP
ORGANIZATIONAL_UNITS = [
'ou=Users,dc=company,dc=com',
'ou=Service Accounts,dc=company,dc=com',
'ou=Contractors,dc=company,dc=com',
'ou=IT,dc=company,dc=com',
'ou=Finance,dc=company,dc=com',
'ou=HR,dc=company,dc=com',
'ou=Sales,dc=company,dc=com'
]
# LDAP servers
LDAP_SERVERS = ['ldap01.company.com', 'ldap02.company.com', 'dc01.company.com']
# RADIUS clients (Network Access Servers)
RADIUS_CLIENTS = [
{'name': 'WiFi-Controller-01', 'ip': '10.0.1.50', 'type': 'wireless'},
{'name': 'VPN-Gateway', 'ip': '192.168.1.1', 'type': 'vpn'},
{'name': 'Switch-Core-01', 'ip': '10.0.1.10', 'type': 'switch'},
{'name': 'Firewall-01', 'ip': '10.0.1.1', 'type': 'firewall'},
{'name': 'AP-Floor2-01', 'ip': '10.0.2.100', 'type': 'wireless'},
{'name': 'Router-Branch-01', 'ip': '192.168.100.1', 'type': 'router'}
]
# Client IP ranges for different access types
WIFI_CLIENTS = ['10.10.1.', '10.10.2.', '10.10.3.']
VPN_CLIENTS = ['172.16.100.', '172.16.101.']
WIRED_CLIENTS = ['192.168.1.', '192.168.2.']
def generate_client_ip(access_type='random'):
"""Generate client IP based on access type"""
if access_type == 'wifi':
network = random.choice(WIFI_CLIENTS)
elif access_type == 'vpn':
network = random.choice(VPN_CLIENTS)
elif access_type == 'wired':
network = random.choice(WIRED_CLIENTS)
else:
network = random.choice(WIFI_CLIENTS + VPN_CLIENTS + WIRED_CLIENTS)
return f"{network}{random.randint(1, 254)}"
def generate_mac_address():
"""Generate MAC address"""
return ':'.join([f'{random.randint(0, 255):02x}' for _ in range(6)])
def generate_ldap_log():
"""Generate LDAP authentication log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
ldap_server = random.choice(LDAP_SERVERS)
# Determine if this is an attack (15% chance)
is_attack = random.random() < 0.15
if is_attack:
username = random.choice(ATTACK_USERNAMES)
# Attack patterns
if random.random() < 0.7:
result = 'FAIL'
reason = random.choice([
'Invalid credentials',
'Account locked',
'Account disabled',
'Bad password'
])
else:
result = 'SUCCESS'
reason = 'Authentication successful'
else:
username = random.choice(USERNAMES)
result = random.choices(['SUCCESS', 'FAIL'], weights=[85, 15])[0]
if result == 'FAIL':
reason = random.choice([
'Bad password',
'Account expired',
'Password expired',
'Time restriction'
])
else:
reason = 'Authentication successful'
client_ip = generate_client_ip()
connection_id = random.randint(1000, 9999)
ou = random.choice(ORGANIZATIONAL_UNITS)
# Different LDAP log formats
log_formats = [
# OpenLDAP format
f"{timestamp} {ldap_server} slapd[{random.randint(1000, 9999)}]: conn={connection_id} fd={random.randint(10, 100)} ACCEPT from IP={client_ip}::{random.randint(1024, 65535)}",
# Microsoft AD format
f"{timestamp} {ldap_server} LDAP: {result} - User: {username} - Client: {client_ip} - Reason: {reason}",
# Detailed bind attempt
f"{timestamp} {ldap_server} slapd[{random.randint(1000, 9999)}]: conn={connection_id} op={random.randint(0, 10)} BIND dn=\"cn={username},{ou}\" method=128 result={0 if result == 'SUCCESS' else 49}"
]
return random.choice(log_formats)
def generate_radius_log():
"""Generate RADIUS authentication log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
radius_server = 'radius-01'
# Select NAS (Network Access Server)
nas = random.choice(RADIUS_CLIENTS)
nas_ip = nas['ip']
nas_type = nas['type']
# Generate client info based on NAS type
if nas_type == 'wireless':
client_ip = generate_client_ip('wifi')
calling_station = generate_mac_address().replace(':', '-')
called_station = f"WiFi-Corp-{random.choice(['5G', '2.4G'])}"
elif nas_type == 'vpn':
client_ip = generate_client_ip('vpn')
calling_station = generate_client_ip() # External IP for VPN
called_station = 'VPN-Gateway'
else:
client_ip = generate_client_ip('wired')
calling_station = generate_mac_address()
called_station = 'Wired-Network'
# Determine authentication result
is_attack = random.random() < 0.12 # 12% attack rate
if is_attack:
username = random.choice(ATTACK_USERNAMES)
auth_result = random.choices(['Access-Accept', 'Access-Reject'], weights=[20, 80])[0]
else:
username = random.choice(USERNAMES)
auth_result = random.choices(['Access-Accept', 'Access-Reject'], weights=[90, 10])[0]
packet_id = random.randint(1, 255)
session_id = f"{random.randint(10000000, 99999999):08x}"
# RADIUS attributes
attributes = []
if auth_result == 'Access-Accept':
attributes.extend([
f"Session-Timeout={random.randint(3600, 28800)}",
f"Idle-Timeout={random.randint(300, 1800)}",
f"Framed-IP-Address={client_ip}"
])
if nas_type == 'wireless':
attributes.append(f"Tunnel-Type=VLAN")
attributes.append(f"Tunnel-Medium-Type=802")
attributes.append(f"Tunnel-Private-Group-Id={random.randint(10, 100)}")
else:
reject_reasons = [
'Authentication-Failure',
'User-Account-Disabled',
'Invalid-User',
'Password-Expired',
'Access-Time-Restriction'
]
attributes.append(f"Reply-Message={random.choice(reject_reasons)}")
# Different RADIUS log formats
log_formats = [
# FreeRADIUS format
f"{timestamp} {radius_server} radiusd[{random.randint(1000, 9999)}]: {auth_result} user '{username}' from client {nas_ip} port {random.randint(0, 100)} cli {calling_station}",
# Detailed RADIUS format
f"{timestamp} {radius_server} RADIUS: User={username} NAS-IP={nas_ip} Calling-Station={calling_station} Called-Station={called_station} Result={auth_result} Session={session_id}",
# Microsoft NPS format
f"{timestamp} {radius_server} NPS: {auth_result} for user {username} from {calling_station} via {nas_ip} ({nas['name']})"
]
base_log = random.choice(log_formats)
# Add attributes for detailed logs
if random.choice([True, False]) and attributes:
attr_string = " ".join(attributes)
base_log += f" [{attr_string}]"
return base_log
def generate_ssh_auth_log():
"""Generate SSH authentication log entry (bonus auth type)"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
hostname = random.choice(['web01', 'db01', 'app01', 'jump01'])
is_attack = random.random() < 0.25 # 25% attack rate for SSH
if is_attack:
username = random.choice(ATTACK_USERNAMES + ['root', 'ubuntu', 'centos'])
client_ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}" # External IP
auth_result = random.choices(['success', 'failure'], weights=[10, 90])[0]
else:
username = random.choice(USERNAMES)
client_ip = generate_client_ip()
auth_result = random.choices(['success', 'failure'], weights=[95, 5])[0]
port = random.randint(1024, 65535)
ssh_pid = random.randint(1000, 9999)
if auth_result == 'success':
auth_methods = ['publickey', 'password', 'keyboard-interactive']
method = random.choice(auth_methods)
log_entry = f"{timestamp} {hostname} sshd[{ssh_pid}]: Accepted {method} for {username} from {client_ip} port {port} ssh2"
else:
if random.random() < 0.7: # Failed password
log_entry = f"{timestamp} {hostname} sshd[{ssh_pid}]: Failed password for {username} from {client_ip} port {port} ssh2"
else: # Invalid user
log_entry = f"{timestamp} {hostname} sshd[{ssh_pid}]: Failed password for invalid user {username} from {client_ip} port {port} ssh2"
return log_entry
def main():
# Create separate log files for different auth types
ldap_log = Path("/var/log/app/ldap_auth.log")
radius_log = Path("/var/log/app/radius_auth.log")
ssh_log = Path("/var/log/app/ssh_auth.log")
for log_file in [ldap_log, radius_log, ssh_log]:
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting authentication log generator...")
while True:
try:
# Generate different authentication log types
auth_type = random.choices(
['ldap', 'radius', 'ssh'],
weights=[35, 35, 30],
k=1
)[0]
if auth_type == 'ldap':
log_entry = generate_ldap_log()
log_file = ldap_log
elif auth_type == 'radius':
log_entry = generate_radius_log()
log_file = radius_log
else:
log_entry = generate_ssh_auth_log()
log_file = ssh_log
with open(log_file, "a") as f:
f.write(log_entry + "\n")
print(f"Generated {auth_type} auth log")
# Random delay between 3-20 seconds
time.sleep(random.uniform(3, 20))
except KeyboardInterrupt:
print("Stopping authentication log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()

395
generators/cloud_logs.py Normal file
View file

@ -0,0 +1,395 @@
#!/usr/bin/env python3
"""
Cloud service log generator for Splunk security testing
Generates realistic AWS CloudTrail and Azure Activity logs with security events
"""
import time
import random
import datetime
import json
import uuid
from pathlib import Path
# AWS Services and regions
AWS_SERVICES = [
'ec2', 's3', 'iam', 'cloudformation', 'lambda', 'rds', 'vpc', 'cloudwatch',
'route53', 'elb', 'autoscaling', 'sns', 'sqs', 'dynamodb', 'kms', 'sts'
]
AWS_REGIONS = [
'us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1', 'ap-northeast-1'
]
# AWS IAM users and roles
AWS_USERS = [
'admin-user', 'dev-user-01', 'backup-service', 'monitoring-role',
'lambda-execution-role', 'ec2-instance-role', 'contractor-access',
'security-audit-user', 'automation-user', 'temp-user-123'
]
# Suspicious AWS users for attack simulation
SUSPICIOUS_AWS_USERS = [
'root', 'admin', 'test-user', 'deleted-user', 'unknown-user', 'external-user'
]
# Azure services and locations
AZURE_SERVICES = [
'Microsoft.Compute', 'Microsoft.Storage', 'Microsoft.Network',
'Microsoft.Authorization', 'Microsoft.Resources', 'Microsoft.KeyVault',
'Microsoft.Sql', 'Microsoft.Web', 'Microsoft.Security', 'Microsoft.Insights'
]
AZURE_LOCATIONS = [
'East US', 'West US 2', 'West Europe', 'Southeast Asia', 'Japan East'
]
# Azure users and service principals
AZURE_USERS = [
'admin@company.com', 'john.doe@company.com', 'service-principal-01',
'backup-automation', 'monitoring-agent', 'security-scanner',
'contractor@external.com', 'guest-user@company.com'
]
def generate_aws_ip():
"""Generate AWS-like IP addresses"""
aws_ranges = [
'54.', '52.', '18.', '34.', '35.', '13.', '3.', '15.',
'172.', '10.', '192.168.' # Internal ranges
]
base = random.choice(aws_ranges)
if len(base.split('.')) == 1:
return f"{base}{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
else:
return f"{base}{random.randint(0, 255)}.{random.randint(1, 254)}"
def generate_aws_cloudtrail_event():
"""Generate AWS CloudTrail event"""
event_time = datetime.datetime.utcnow().isoformat() + 'Z'
# Determine if this should be suspicious (20% chance)
is_suspicious = random.random() < 0.2
if is_suspicious:
# Suspicious events
event_patterns = [
{
'eventName': 'CreateUser',
'eventSource': 'iam.amazonaws.com',
'errorCode': None,
'risk_level': 'high'
},
{
'eventName': 'PutBucketPolicy',
'eventSource': 's3.amazonaws.com',
'errorCode': None,
'risk_level': 'medium'
},
{
'eventName': 'CreateAccessKey',
'eventSource': 'iam.amazonaws.com',
'errorCode': None,
'risk_level': 'high'
},
{
'eventName': 'ConsoleLogin',
'eventSource': 'signin.amazonaws.com',
'errorCode': 'Failed authentication',
'risk_level': 'medium'
},
{
'eventName': 'RunInstances',
'eventSource': 'ec2.amazonaws.com',
'errorCode': None,
'risk_level': 'medium'
}
]
pattern = random.choice(event_patterns)
user_identity = {
'type': random.choice(['IAMUser', 'Root', 'AssumedRole']),
'userName': random.choice(SUSPICIOUS_AWS_USERS),
'arn': f"arn:aws:iam::123456789012:user/{random.choice(SUSPICIOUS_AWS_USERS)}"
}
source_ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
else:
# Normal events
event_patterns = [
{
'eventName': 'DescribeInstances',
'eventSource': 'ec2.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
},
{
'eventName': 'GetObject',
'eventSource': 's3.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
},
{
'eventName': 'ListBuckets',
'eventSource': 's3.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
},
{
'eventName': 'PutMetricData',
'eventSource': 'monitoring.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
}
]
pattern = random.choice(event_patterns)
user_identity = {
'type': random.choice(['IAMUser', 'AssumedRole']),
'userName': random.choice(AWS_USERS),
'arn': f"arn:aws:iam::123456789012:user/{random.choice(AWS_USERS)}"
}
source_ip = generate_aws_ip()
cloudtrail_event = {
'eventVersion': '1.05',
'userIdentity': user_identity,
'eventTime': event_time,
'eventSource': pattern['eventSource'],
'eventName': pattern['eventName'],
'awsRegion': random.choice(AWS_REGIONS),
'sourceIPAddress': source_ip,
'userAgent': random.choice([
'aws-cli/2.1.34 Python/3.8.8',
'console.aws.amazon.com',
'Boto3/1.17.49 Python/3.8.8',
'terraform/1.0.0'
]),
'requestID': str(uuid.uuid4()),
'eventID': str(uuid.uuid4()),
'eventType': 'AwsApiCall',
'managementEvent': True,
'recipientAccountId': '123456789012',
'serviceEventDetails': None
}
if pattern['errorCode']:
cloudtrail_event['errorCode'] = pattern['errorCode']
cloudtrail_event['errorMessage'] = 'Authentication failed'
# Add request parameters based on event
if pattern['eventName'] == 'RunInstances':
cloudtrail_event['requestParameters'] = {
'instanceType': random.choice(['t3.micro', 't3.small', 'm5.large']),
'maxCount': random.randint(1, 5),
'minCount': 1
}
elif pattern['eventName'] == 'CreateUser':
cloudtrail_event['requestParameters'] = {
'userName': f"new-user-{random.randint(1, 999)}"
}
return cloudtrail_event
def generate_azure_activity_event():
"""Generate Azure Activity Log event"""
event_time = datetime.datetime.utcnow().isoformat() + 'Z'
# Determine if this should be suspicious (18% chance)
is_suspicious = random.random() < 0.18
if is_suspicious:
# Suspicious Azure events
operations = [
{
'operationName': 'Microsoft.Authorization/roleAssignments/write',
'resourceType': 'Microsoft.Authorization/roleAssignments',
'level': 'Warning',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Compute/virtualMachines/write',
'resourceType': 'Microsoft.Compute/virtualMachines',
'level': 'Informational',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Storage/storageAccounts/delete',
'resourceType': 'Microsoft.Storage/storageAccounts',
'level': 'Warning',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Authorization/policies/write',
'resourceType': 'Microsoft.Authorization/policies',
'level': 'Warning',
'status': 'Succeeded'
}
]
caller = random.choice([
'external-user@company.com',
'temp-admin@company.com',
'service-principal-unknown'
])
else:
# Normal Azure events
operations = [
{
'operationName': 'Microsoft.Compute/virtualMachines/read',
'resourceType': 'Microsoft.Compute/virtualMachines',
'level': 'Informational',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Storage/storageAccounts/read',
'resourceType': 'Microsoft.Storage/storageAccounts',
'level': 'Informational',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Network/networkSecurityGroups/read',
'resourceType': 'Microsoft.Network/networkSecurityGroups',
'level': 'Informational',
'status': 'Succeeded'
}
]
caller = random.choice(AZURE_USERS)
operation = random.choice(operations)
azure_event = {
'time': event_time,
'resourceId': f"/subscriptions/{uuid.uuid4()}/resourceGroups/rg-production/providers/{operation['resourceType']}/resource-{random.randint(1, 999)}",
'operationName': operation['operationName'],
'operationVersion': '2020-06-01',
'category': 'Administrative',
'resultType': operation['status'],
'resultSignature': random.choice(['Started', 'Succeeded', 'Failed']),
'durationMs': random.randint(100, 5000),
'callerIpAddress': f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}",
'correlationId': str(uuid.uuid4()),
'identity': {
'claims': {
'name': caller,
'aud': 'https://management.azure.com/',
'iss': 'https://sts.windows.net/' + str(uuid.uuid4()) + '/'
}
},
'level': operation['level'],
'location': random.choice(AZURE_LOCATIONS),
'properties': {
'statusCode': random.choice(['OK', 'Created', 'Accepted', 'BadRequest', 'Unauthorized', 'Forbidden']),
'serviceRequestId': str(uuid.uuid4()),
'statusMessage': random.choice(['Success', 'Resource created', 'Access denied', 'Resource not found'])
}
}
return azure_event
def generate_gcp_audit_log():
"""Generate Google Cloud Platform audit log event (bonus cloud provider)"""
timestamp = datetime.datetime.utcnow().isoformat() + 'Z'
# GCP services and methods
gcp_services = [
'compute.googleapis.com',
'storage.googleapis.com',
'iam.googleapis.com',
'cloudresourcemanager.googleapis.com',
'logging.googleapis.com'
]
service = random.choice(gcp_services)
if service == 'compute.googleapis.com':
method_name = random.choice([
'v1.compute.instances.insert',
'v1.compute.instances.delete',
'v1.compute.instances.list'
])
elif service == 'iam.googleapis.com':
method_name = random.choice([
'google.iam.admin.v1.CreateServiceAccount',
'google.iam.admin.v1.DeleteServiceAccount'
])
else:
method_name = f"{service}.list"
gcp_event = {
'protoPayload': {
'@type': 'type.googleapis.com/google.cloud.audit.AuditLog',
'serviceName': service,
'methodName': method_name,
'authenticationInfo': {
'principalEmail': random.choice([
'user@company.com',
'service-account@project.iam.gserviceaccount.com',
'admin@company.com'
])
},
'requestMetadata': {
'callerIp': f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}",
'callerSuppliedUserAgent': 'google-cloud-sdk gcloud/345.0.0'
}
},
'insertId': str(uuid.uuid4()).replace('-', ''),
'resource': {
'type': 'gce_instance',
'labels': {
'project_id': 'my-project-123',
'zone': random.choice(['us-central1-a', 'us-east1-b', 'europe-west1-c'])
}
},
'timestamp': timestamp,
'severity': random.choice(['INFO', 'WARNING', 'ERROR']),
'logName': f"projects/my-project-123/logs/cloudaudit.googleapis.com%2Factivity"
}
return gcp_event
def main():
# Create log files for different cloud providers
aws_log = Path("/var/log/app/aws_cloudtrail.json")
azure_log = Path("/var/log/app/azure_activity.json")
gcp_log = Path("/var/log/app/gcp_audit.json")
for log_file in [aws_log, azure_log, gcp_log]:
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting cloud service log generator...")
while True:
try:
# Generate different cloud provider logs
cloud_provider = random.choices(
['aws', 'azure', 'gcp'],
weights=[50, 35, 15],
k=1
)[0]
if cloud_provider == 'aws':
event = generate_aws_cloudtrail_event()
log_file = aws_log
elif cloud_provider == 'azure':
event = generate_azure_activity_event()
log_file = azure_log
else:
event = generate_gcp_audit_log()
log_file = gcp_log
# Write as JSON line
json_line = json.dumps(event, separators=(',', ':'))
with open(log_file, "a") as f:
f.write(json_line + "\n")
print(f"Generated {cloud_provider.upper()} event: {event.get('eventName', event.get('operationName', 'unknown'))}")
# Random delay between 5-25 seconds (cloud events are less frequent)
time.sleep(random.uniform(5, 25))
except KeyboardInterrupt:
print("Stopping cloud service log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()

244
generators/dns_logs.py Normal file
View file

@ -0,0 +1,244 @@
#!/usr/bin/env python3
"""
DNS query log generator for Splunk security testing
Generates realistic DNS query logs with suspicious domain patterns and DGA domains
"""
import time
import random
import datetime
import string
from pathlib import Path
# Legitimate domains for normal traffic
LEGITIMATE_DOMAINS = [
'google.com', 'microsoft.com', 'amazon.com', 'facebook.com', 'twitter.com',
'github.com', 'stackoverflow.com', 'wikipedia.org', 'youtube.com', 'linkedin.com',
'apple.com', 'netflix.com', 'adobe.com', 'salesforce.com', 'oracle.com',
'ibm.com', 'cisco.com', 'vmware.com', 'redhat.com', 'ubuntu.com',
'docker.com', 'kubernetes.io', 'python.org', 'nodejs.org', 'golang.org'
]
# Suspicious/malicious domains (for testing detection rules)
MALICIOUS_DOMAINS = [
'badactor.com', 'malware-c2.net', 'phishing-site.org', 'exploit-kit.ru',
'trojan-command.xyz', 'ransomware-payment.onion', 'credential-harvest.tk',
'fake-bank-login.ml', 'virus-download.ga', 'spam-relay.cf'
]
# Common TLDs for DGA generation
DGA_TLDS = ['.com', '.net', '.org', '.info', '.biz', '.tk', '.ml', '.ga', '.cf', '.xyz']
# DNS query types
QUERY_TYPES = ['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'PTR', 'SOA', 'SRV']
# Response codes
RESPONSE_CODES = {
'NOERROR': 0,
'FORMERR': 1,
'SERVFAIL': 2,
'NXDOMAIN': 3,
'NOTIMP': 4,
'REFUSED': 5
}
# Internal DNS servers
DNS_SERVERS = ['192.168.1.1', '10.0.0.1', '172.16.1.1', '8.8.8.8', '1.1.1.1']
# Client IP ranges
CLIENT_NETWORKS = [
'192.168.1.', '192.168.0.', '10.0.0.', '10.0.1.',
'172.16.1.', '172.16.0.', '172.17.0.'
]
def generate_client_ip():
"""Generate internal client IP"""
network = random.choice(CLIENT_NETWORKS)
return f"{network}{random.randint(1, 254)}"
def generate_dga_domain():
"""Generate Domain Generation Algorithm (DGA) style domain"""
# Common DGA patterns
patterns = [
# Random string + TLD
lambda: ''.join(random.choices(string.ascii_lowercase, k=random.randint(8, 16))) + random.choice(DGA_TLDS),
# Dictionary words + numbers
lambda: random.choice(['secure', 'update', 'service', 'system', 'admin']) + str(random.randint(100, 999)) + random.choice(DGA_TLDS),
# Mixed alphanumeric
lambda: ''.join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(6, 12))) + random.choice(DGA_TLDS),
# Date-based (common in malware)
lambda: datetime.datetime.now().strftime('%Y%m%d') + ''.join(random.choices(string.ascii_lowercase, k=4)) + random.choice(DGA_TLDS)
]
return random.choice(patterns)()
def generate_suspicious_domain():
"""Generate suspicious domain patterns"""
patterns = [
# Typosquatting
lambda: random.choice(['googIe', 'microsft', 'amazom', 'facebok', 'twiter']) + '.com',
# Homograph attacks
lambda: random.choice(['gооgle', 'microѕoft', 'аmazon']) + '.com', # Using Cyrillic characters
# Long subdomains (common in phishing)
lambda: f"secure-login-{random.choice(LEGITIMATE_DOMAINS.copy())}.{random.choice(['tk', 'ml', 'ga'])}",
# Known malicious patterns
lambda: random.choice(MALICIOUS_DOMAINS),
# DGA domains
generate_dga_domain
]
return random.choice(patterns)()
def generate_bind_query_log():
"""Generate BIND DNS query log entry"""
timestamp = datetime.datetime.now().strftime('%d-%b-%Y %H:%M:%S.%f')[:-3]
client_ip = generate_client_ip()
dns_server = random.choice(DNS_SERVERS)
query_type = random.choice(QUERY_TYPES)
# Determine if this should be suspicious (20% chance)
if random.random() < 0.2:
domain = generate_suspicious_domain()
# Suspicious queries more likely to fail
response_code = random.choices(
list(RESPONSE_CODES.keys()),
weights=[60, 5, 10, 20, 3, 2],
k=1
)[0]
else:
domain = random.choice(LEGITIMATE_DOMAINS)
# Normal queries usually succeed
response_code = random.choices(
list(RESPONSE_CODES.keys()),
weights=[95, 1, 2, 1, 0.5, 0.5],
k=1
)[0]
query_id = random.randint(1, 65535)
# BIND query log format
log_entry = f"{timestamp} client {client_ip}#{random.randint(1024, 65535)}: query: {domain} IN {query_type} + ({dns_server})"
# Add response if not just query
if random.choice([True, False]):
response_time = random.randint(1, 500)
log_entry += f"\n{timestamp} client {client_ip}#{random.randint(1024, 65535)}: query response: {domain} IN {query_type} {response_code} {response_time}ms"
return log_entry
def generate_syslog_dns():
"""Generate syslog format DNS log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
hostname = random.choice(['dns-01', 'dns-02', 'resolver'])
client_ip = generate_client_ip()
query_type = random.choice(QUERY_TYPES)
# Determine domain type
if random.random() < 0.15: # 15% suspicious
domain = generate_suspicious_domain()
if random.random() < 0.7: # Suspicious domains often blocked
action = 'BLOCKED'
else:
action = 'ALLOWED'
else:
domain = random.choice(LEGITIMATE_DOMAINS)
action = 'ALLOWED'
response_code = list(RESPONSE_CODES.keys())[0] if action == 'ALLOWED' else random.choice(['NXDOMAIN', 'REFUSED'])
query_time = random.randint(1, 200)
# Syslog DNS format
log_entry = f"{timestamp} {hostname} named[{random.randint(1000, 9999)}]: client {client_ip}: query {domain} {query_type} {action} ({response_code}) {query_time}ms"
return log_entry
def generate_windows_dns_log():
"""Generate Windows DNS server log entry"""
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
client_ip = generate_client_ip()
query_type = random.choice(QUERY_TYPES)
if random.random() < 0.18: # 18% suspicious
domain = generate_suspicious_domain()
else:
domain = random.choice(LEGITIMATE_DOMAINS)
# Windows DNS log fields: Date, Time, Thread ID, Context, Internal packet identifier, UDP/TCP, Send/Recv, Remote IP, Xid, Query Type, Query Name, Result Code
thread_id = f"{random.randint(100, 999):03X}"
context = random.choice(['PACKET', 'UPDATE', 'NOTIFY'])
packet_id = f"{random.randint(0, 0xFFFF):04X}"
protocol = random.choice(['UDP', 'TCP'])
direction = random.choice(['Snd', 'Rcv'])
xid = f"{random.randint(0, 0xFFFF):04X}"
result_code = random.choice(['NOERROR', 'NXDOMAIN', 'SERVFAIL'])
log_entry = f"{timestamp} {thread_id} {context} {packet_id} {protocol} {direction} {client_ip} {xid} Q [{query_type}] {domain} {result_code}"
return log_entry
def generate_pi_hole_log():
"""Generate Pi-hole DNS log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
client_ip = generate_client_ip()
query_type = random.choice(QUERY_TYPES)
# Pi-hole blocks more aggressively
if random.random() < 0.3: # 30% blocked
domain = generate_suspicious_domain()
action = 'blocked'
status = 'gravity' # Pi-hole's blocklist
else:
domain = random.choice(LEGITIMATE_DOMAINS)
action = 'reply'
status = random.choice(['NODATA', 'NXDOMAIN', '192.168.1.100']) # Reply with IP or negative response
# Pi-hole log format
log_entry = f"{timestamp} dnsmasq[{random.randint(1000, 9999)}]: {action} {domain} is {status} (client {client_ip})"
return log_entry
def main():
log_file = Path("/var/log/app/dns_queries.log")
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting DNS query log generator...")
while True:
try:
# Generate different DNS log formats
log_format = random.choices(
['bind', 'syslog', 'windows', 'pihole'],
weights=[30, 25, 25, 20],
k=1
)[0]
if log_format == 'bind':
log_entry = generate_bind_query_log()
elif log_format == 'syslog':
log_entry = generate_syslog_dns()
elif log_format == 'windows':
log_entry = generate_windows_dns_log()
else:
log_entry = generate_pi_hole_log()
with open(log_file, "a") as f:
f.write(log_entry + "\n")
print(f"Generated {log_format} DNS log")
# Random delay between 1-10 seconds (DNS queries are frequent)
time.sleep(random.uniform(1, 10))
except KeyboardInterrupt:
print("Stopping DNS query log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()

264
generators/firewall_logs.py Normal file
View file

@ -0,0 +1,264 @@
#!/usr/bin/env python3
"""
Firewall log generator for Splunk security testing
Generates realistic firewall logs in pfSense, iptables, and Cisco ASA formats
"""
import time
import random
import datetime
from pathlib import Path
# Common protocols and ports
PROTOCOLS = ['TCP', 'UDP', 'ICMP']
COMMON_PORTS = {
'TCP': [22, 23, 25, 53, 80, 110, 143, 443, 993, 995, 1433, 3389, 5432, 8080, 8443],
'UDP': [53, 67, 68, 123, 161, 162, 514, 1812, 1813, 4500]
}
# Attack signatures and suspicious ports
ATTACK_PORTS = [1, 7, 9, 13, 17, 19, 135, 137, 138, 139, 445, 1024, 1025, 2745, 3127, 6129, 12345, 27374, 31337]
MALICIOUS_IPS = [
'185.220.100.', '185.220.101.', '198.98.51.', '162.142.125.',
'89.248.171.', '45.148.10.', '192.42.116.', '171.25.193.'
]
# Internal networks
INTERNAL_NETWORKS = [
'192.168.1.', '192.168.0.', '10.0.0.', '10.0.1.', '172.16.1.', '172.16.0.'
]
# External IPs for legitimate traffic
EXTERNAL_IPS = [
'8.8.8.8', '8.8.4.4', '1.1.1.1', '208.67.222.222',
'142.250.191.', '140.82.114.', '52.84.', '54.230.'
]
def generate_internal_ip():
"""Generate internal IP address"""
network = random.choice(INTERNAL_NETWORKS)
return f"{network}{random.randint(1, 254)}"
def generate_external_ip():
"""Generate external IP address"""
if random.random() < 0.1: # 10% chance of malicious IP
malicious_range = random.choice(MALICIOUS_IPS)
return f"{malicious_range}{random.randint(1, 254)}"
else:
return random.choice(EXTERNAL_IPS) + str(random.randint(1, 254))
def generate_mac_address():
"""Generate MAC address"""
return ':'.join([f'{random.randint(0, 255):02x}' for _ in range(6)])
def generate_pfsense_log():
"""Generate pfSense firewall log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
hostname = 'firewall'
# Determine traffic direction and IPs
outbound = random.choice([True, False])
if outbound:
src_ip = generate_internal_ip()
dst_ip = generate_external_ip()
interface = 'wan'
else:
src_ip = generate_external_ip()
dst_ip = generate_internal_ip()
interface = random.choice(['lan', 'dmz'])
protocol = random.choice(PROTOCOLS)
if protocol in COMMON_PORTS:
if random.random() < 0.8: # 80% normal traffic
dst_port = random.choice(COMMON_PORTS[protocol])
else: # 20% suspicious traffic
dst_port = random.choice(ATTACK_PORTS)
else: # ICMP
dst_port = 0
src_port = random.randint(1024, 65535) if protocol != 'ICMP' else 0
# Determine action (block suspicious traffic more often)
if dst_port in ATTACK_PORTS or any(malicious in src_ip for malicious in MALICIOUS_IPS):
action = 'block'
else:
action = random.choices(['pass', 'block'], weights=[85, 15])[0]
# pfSense log format
rule_num = random.randint(1, 100)
tracker = random.randint(1000000000, 9999999999)
if protocol == 'ICMP':
log_entry = f"{timestamp} {hostname} filterlog: {rule_num},{tracker},,1000000103,{interface},match,{action},in,4,0x0,,64,{protocol},{len(src_ip + dst_ip)},{src_ip},{dst_ip},unreachable,need-frag"
else:
flags = random.choice(['S', 'A', 'SA', 'F', 'R', 'P'])
log_entry = f"{timestamp} {hostname} filterlog: {rule_num},{tracker},,1000000103,{interface},match,{action},in,4,0x0,,64,{protocol},{len(str(src_port) + str(dst_port))},{src_ip},{dst_ip},{src_port},{dst_port},{len(str(src_port))},{flags},,"
return log_entry
def generate_iptables_log():
"""Generate iptables log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
hostname = 'gateway'
src_ip = generate_external_ip() if random.choice([True, False]) else generate_internal_ip()
dst_ip = generate_internal_ip() if src_ip.startswith(('192.168', '10.', '172.16')) else generate_external_ip()
protocol = random.choice(PROTOCOLS)
if protocol in COMMON_PORTS:
dst_port = random.choice(COMMON_PORTS[protocol] + ATTACK_PORTS)
else:
dst_port = 0
src_port = random.randint(1024, 65535) if protocol != 'ICMP' else 0
# Determine action
if dst_port in ATTACK_PORTS:
action = 'DROP'
chain = 'FORWARD'
else:
action = random.choices(['ACCEPT', 'DROP'], weights=[80, 20])[0]
chain = random.choice(['INPUT', 'OUTPUT', 'FORWARD'])
interface_in = random.choice(['eth0', 'eth1', 'eth2'])
interface_out = random.choice(['eth0', 'eth1', 'eth2'])
mac_src = generate_mac_address()
if protocol == 'ICMP':
log_entry = f"{timestamp} {hostname} kernel: iptables {action}: IN={interface_in} OUT={interface_out} MAC={mac_src} SRC={src_ip} DST={dst_ip} LEN=84 TOS=0x00 PREC=0x00 TTL=64 ID={random.randint(1, 65535)} DF PROTO={protocol} TYPE=8 CODE=0"
else:
ttl = random.randint(32, 128)
packet_id = random.randint(1, 65535)
window = random.randint(1024, 65535)
flags = random.choice(['SYN', 'ACK', 'FIN', 'RST', 'PSH'])
log_entry = f"{timestamp} {hostname} kernel: iptables {action}: IN={interface_in} OUT={interface_out} MAC={mac_src} SRC={src_ip} DST={dst_ip} LEN={random.randint(40, 1500)} TOS=0x00 PREC=0x00 TTL={ttl} ID={packet_id} DF PROTO={protocol} SPT={src_port} DPT={dst_port} WINDOW={window} RES=0x00 {flags} URGP=0"
return log_entry
def generate_cisco_asa_log():
"""Generate Cisco ASA firewall log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %Y %H:%M:%S')
hostname = 'ASA-01'
# ASA message IDs and their descriptions
asa_messages = {
'106023': 'Deny tcp src {src_zone}:{src_ip}/{src_port} dst {dst_zone}:{dst_ip}/{dst_port} by access-group',
'106100': 'access-list {acl_name} {action} {protocol} {src_zone}/{src_ip}({src_port}) -> {dst_zone}/{dst_ip}({dst_port}) hit-cnt {count}',
'302013': 'Built {direction} TCP connection {conn_id} for {outside_zone}:{outside_ip}/{outside_port} to {inside_zone}:{inside_ip}/{inside_port}',
'302014': 'Teardown TCP connection {conn_id} for {outside_zone}:{outside_ip}/{outside_port} to {inside_zone}:{inside_ip}/{inside_port} duration {duration}',
'305011': 'Built {direction} UDP connection {conn_id} for {outside_zone}:{outside_ip}/{outside_port} to {inside_zone}:{inside_ip}/{inside_port}',
'313001': 'Denied ICMP type={icmp_type}, code={icmp_code} from {src_ip} on interface {interface}',
'733100': 'Object drop rate {rate} exceeded. Current burst rate is {burst_rate} per second, max configured rate is {max_rate}',
'710003': 'TCP access requested from {src_ip}/{src_port} to {dst_zone}:{dst_ip}/{dst_port}',
'419002': 'Received duplicate TCP SYN from {src_zone}:{src_ip}/{src_port} to {dst_zone}:{dst_ip}/{dst_port} with different initial sequence number'
}
msg_id = random.choice(list(asa_messages.keys()))
msg_template = asa_messages[msg_id]
# Generate IPs and zones
zones = ['inside', 'outside', 'dmz', 'management']
src_zone = random.choice(zones)
dst_zone = random.choice([z for z in zones if z != src_zone])
if src_zone == 'outside':
src_ip = generate_external_ip()
dst_ip = generate_internal_ip()
else:
src_ip = generate_internal_ip()
dst_ip = generate_external_ip() if dst_zone == 'outside' else generate_internal_ip()
protocol = random.choice(['tcp', 'udp', 'icmp'])
src_port = random.randint(1024, 65535)
if protocol in ['tcp', 'udp']:
if protocol == 'tcp':
dst_port = random.choice(COMMON_PORTS['TCP'] + ATTACK_PORTS)
else:
dst_port = random.choice(COMMON_PORTS['UDP'])
else:
dst_port = 0
# Fill in template variables
variables = {
'src_zone': src_zone,
'dst_zone': dst_zone,
'src_ip': src_ip,
'dst_ip': dst_ip,
'src_port': src_port,
'dst_port': dst_port,
'protocol': protocol,
'acl_name': f'acl_{random.choice(["inside", "outside", "dmz"])}',
'action': random.choice(['permit', 'deny']),
'conn_id': random.randint(1000, 9999),
'direction': random.choice(['inbound', 'outbound']),
'outside_zone': 'outside',
'inside_zone': 'inside',
'outside_ip': generate_external_ip(),
'inside_ip': generate_internal_ip(),
'outside_port': random.randint(1024, 65535),
'inside_port': random.choice(COMMON_PORTS.get('TCP', [80])),
'duration': f"{random.randint(1, 3600):02d}:{random.randint(0, 59):02d}:{random.randint(0, 59):02d}",
'interface': random.choice(['inside', 'outside', 'dmz']),
'icmp_type': random.randint(0, 18),
'icmp_code': random.randint(0, 15),
'rate': random.randint(1, 1000),
'burst_rate': random.randint(1, 100),
'max_rate': random.randint(100, 1000),
'count': random.randint(1, 100)
}
try:
message = msg_template.format(**variables)
except KeyError:
# Fallback for missing variables
message = f"Connection {random.choice(['established', 'denied', 'terminated'])} from {src_ip} to {dst_ip}"
# ASA log format: timestamp hostname %ASA-level-msgid: message
level = random.choice([1, 2, 3, 4, 5, 6, 7])
log_entry = f"{timestamp} {hostname} %ASA-{level}-{msg_id}: {message}"
return log_entry
def main():
log_file = Path("/var/log/app/firewall.log")
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting firewall log generator...")
while True:
try:
# Generate different firewall log types
log_type = random.choices(
['pfsense', 'iptables', 'cisco_asa'],
weights=[40, 30, 30],
k=1
)[0]
if log_type == 'pfsense':
log_entry = generate_pfsense_log()
elif log_type == 'iptables':
log_entry = generate_iptables_log()
else:
log_entry = generate_cisco_asa_log()
with open(log_file, "a") as f:
f.write(log_entry + "\n")
print(f"Generated {log_type} log: {log_entry[:100]}...")
# Random delay between 2-15 seconds
time.sleep(random.uniform(2, 15))
except KeyboardInterrupt:
print("Stopping firewall log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,290 @@
#!/usr/bin/env python3
"""
Windows Event Log generator for Splunk security testing
Generates realistic Windows Security, System, and Application event logs
"""
import time
import random
import datetime
import xml.etree.ElementTree as ET
from pathlib import Path
# Common Windows usernames and computer names
USERNAMES = [
'Administrator', 'admin', 'john.doe', 'jane.smith', 'service_account',
'backup_user', 'guest', 'SYSTEM', 'LOCAL SERVICE', 'NETWORK SERVICE',
'test.user', 'contractor01', 'dev.account', 'sql_service'
]
COMPUTER_NAMES = [
'DC01', 'WS001', 'WS002', 'SQL01', 'FILE01', 'PRINT01',
'LAPTOP-USER01', 'DESKTOP-ADMIN', 'SRV-EXCHANGE', 'WEB01'
]
DOMAINS = ['CORP', 'DOMAIN', 'LOCAL', 'COMPANY']
# Windows Event IDs and descriptions
SECURITY_EVENTS = {
4624: 'An account was successfully logged on',
4625: 'An account failed to log on',
4648: 'A logon was attempted using explicit credentials',
4720: 'A user account was created',
4722: 'A user account was enabled',
4725: 'A user account was disabled',
4726: 'A user account was deleted',
4728: 'A member was added to a security-enabled global group',
4732: 'A member was added to a security-enabled local group',
4740: 'A user account was locked out',
4767: 'A user account was unlocked',
4768: 'A Kerberos authentication ticket (TGT) was requested',
4769: 'A Kerberos service ticket was requested',
4771: 'Kerberos pre-authentication failed'
}
SYSTEM_EVENTS = {
7034: 'A service crashed unexpectedly',
7035: 'The Service Control Manager sent a control to a service',
7036: 'A service was started or stopped',
7040: 'The start type of a service was changed',
6005: 'The Event Log service was started',
6006: 'The Event Log service was stopped',
6009: 'The system was started',
6013: 'The system uptime'
}
APPLICATION_EVENTS = {
1000: 'Application Error',
1001: 'Application Hang',
1002: 'Application Recovery',
11707: 'Installation completed successfully',
11708: 'Installation failed',
11724: 'Removal completed successfully'
}
LOGON_TYPES = {
2: 'Interactive',
3: 'Network',
4: 'Batch',
5: 'Service',
7: 'Unlock',
8: 'NetworkCleartext',
9: 'NewCredentials',
10: 'RemoteInteractive',
11: 'CachedInteractive'
}
def generate_ip():
"""Generate realistic internal IP addresses"""
ranges = [
(192, 168, random.randint(1, 254), random.randint(1, 254)),
(10, random.randint(0, 255), random.randint(0, 255), random.randint(1, 254)),
(172, random.randint(16, 31), random.randint(0, 255), random.randint(1, 254))
]
return '.'.join(map(str, random.choice(ranges)))
def generate_security_event():
"""Generate Windows Security event"""
event_id = random.choice(list(SECURITY_EVENTS.keys()))
description = SECURITY_EVENTS[event_id]
username = random.choice(USERNAMES)
computer = random.choice(COMPUTER_NAMES)
domain = random.choice(DOMAINS)
source_ip = generate_ip()
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
# Special handling for different event types
if event_id in [4624, 4625]: # Logon events
logon_type = random.choice(list(LOGON_TYPES.keys()))
logon_type_desc = LOGON_TYPES[logon_type]
process_name = random.choice(['winlogon.exe', 'explorer.exe', 'svchost.exe'])
if event_id == 4625: # Failed logon
failure_reason = random.choice([
'Unknown user name or bad password',
'User account restriction',
'Account currently disabled',
'Account logon time restriction violation'
])
status = '0xc000006d'
else:
failure_reason = ''
status = '0x0'
event_data = f"""EventCode={event_id}
EventType=Audit Success
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Microsoft Windows security auditing
User={domain}\\{username}
LogonType={logon_type}
LogonTypeDescription={logon_type_desc}
WorkstationName={computer}
SourceNetworkAddress={source_ip}
ProcessName=C:\\Windows\\System32\\{process_name}
Status={status}
FailureReason={failure_reason}
Description={description}"""
elif event_id in [4720, 4722, 4725, 4726]: # Account management
target_user = random.choice(USERNAMES)
event_data = f"""EventCode={event_id}
EventType=Audit Success
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Microsoft Windows security auditing
User={domain}\\{username}
TargetUserName={target_user}
TargetDomainName={domain}
Description={description}"""
else: # Other security events
event_data = f"""EventCode={event_id}
EventType=Audit Success
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Microsoft Windows security auditing
User={domain}\\{username}
Description={description}"""
return event_data
def generate_system_event():
"""Generate Windows System event"""
event_id = random.choice(list(SYSTEM_EVENTS.keys()))
description = SYSTEM_EVENTS[event_id]
computer = random.choice(COMPUTER_NAMES)
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
if event_id in [7034, 7035, 7036]: # Service events
services = [
'Windows Update', 'Print Spooler', 'Task Scheduler',
'Windows Search', 'DHCP Client', 'DNS Client',
'SQL Server', 'IIS Admin Service', 'Apache2.4'
]
service_name = random.choice(services)
if event_id == 7036:
state = random.choice(['running', 'stopped'])
event_data = f"""EventCode={event_id}
EventType=Information
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Service Control Manager
ServiceName={service_name}
State={state}
Description=The {service_name} service entered the {state} state."""
else:
event_data = f"""EventCode={event_id}
EventType=Warning
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Service Control Manager
ServiceName={service_name}
Description={description}"""
else: # Other system events
event_data = f"""EventCode={event_id}
EventType=Information
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=EventLog
Description={description}"""
return event_data
def generate_application_event():
"""Generate Windows Application event"""
event_id = random.choice(list(APPLICATION_EVENTS.keys()))
description = APPLICATION_EVENTS[event_id]
computer = random.choice(COMPUTER_NAMES)
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
applications = [
'chrome.exe', 'firefox.exe', 'outlook.exe', 'winword.exe',
'excel.exe', 'notepad.exe', 'calculator.exe', 'explorer.exe',
'java.exe', 'python.exe', 'svchost.exe'
]
app_name = random.choice(applications)
if event_id in [1000, 1001]: # Application errors
event_type = 'Error'
fault_module = random.choice(['ntdll.dll', 'kernel32.dll', 'user32.dll', app_name])
event_data = f"""EventCode={event_id}
EventType={event_type}
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Application Error
ApplicationName={app_name}
FaultingModule={fault_module}
Description={description}"""
else: # Installation events
event_type = 'Information'
product_name = random.choice([
'Microsoft Office 2019', 'Google Chrome', 'Adobe Reader',
'Java Runtime Environment', 'Visual Studio Code'
])
event_data = f"""EventCode={event_id}
EventType={event_type}
TimeGenerated={timestamp}
ComputerName={computer}
SourceName=Windows Installer
ProductName={product_name}
Description={description}"""
return event_data
def main():
# Create log files
security_log = Path("/var/log/app/windows_security.log")
system_log = Path("/var/log/app/windows_system.log")
application_log = Path("/var/log/app/windows_application.log")
for log_file in [security_log, system_log, application_log]:
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting Windows Event Log generator...")
while True:
try:
# Generate random event type (weighted towards security events)
event_type = random.choices(
['security', 'system', 'application'],
weights=[50, 25, 25],
k=1
)[0]
if event_type == 'security':
event_data = generate_security_event()
log_file = security_log
elif event_type == 'system':
event_data = generate_system_event()
log_file = system_log
else:
event_data = generate_application_event()
log_file = application_log
# Write event to appropriate log file
with open(log_file, "a") as f:
f.write(event_data + "\n\n")
print(f"Generated {event_type} event: {event_data.split('EventCode=')[1].split()[0]}")
# Random delay between 5-30 seconds
time.sleep(random.uniform(5, 30))
except KeyboardInterrupt:
print("Stopping Windows Event Log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()