splunk_local/generators/firewall_logs.py
bpmcdevitt fa8fd73f1a Add security-focused log generators for SOC and SIEM testing
- Implement 5 new security log generators: Windows events, firewall logs, DNS queries, authentication logs, and cloud service logs
- Add 'security' Docker Compose profile for easy deployment of security generators
- Windows generator creates realistic Security/System/Application events with attack patterns (failed logins, account creation, service events)
- Firewall generator supports pfSense, iptables, and Cisco ASA formats with malicious traffic blocking simulation
- DNS generator includes DGA domains, suspicious lookups, and multiple DNS server formats (BIND, Pi-hole, Windows DNS)
- Authentication generator creates LDAP, RADIUS, and SSH logs with brute force attack patterns
- Cloud generator produces AWS CloudTrail, Azure Activity, and GCP audit logs with security-relevant events
- Update documentation with comprehensive security use cases for SOC training, threat hunting, and compliance testing
- Enhance Docker Compose configuration with new security profile and service definitions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 17:44:47 -05:00

264 lines
No EOL
10 KiB
Python

#!/usr/bin/env python3
"""
Firewall log generator for Splunk security testing
Generates realistic firewall logs in pfSense, iptables, and Cisco ASA formats
"""
import time
import random
import datetime
from pathlib import Path
# Common protocols and ports
PROTOCOLS = ['TCP', 'UDP', 'ICMP']
COMMON_PORTS = {
'TCP': [22, 23, 25, 53, 80, 110, 143, 443, 993, 995, 1433, 3389, 5432, 8080, 8443],
'UDP': [53, 67, 68, 123, 161, 162, 514, 1812, 1813, 4500]
}
# Attack signatures and suspicious ports
ATTACK_PORTS = [1, 7, 9, 13, 17, 19, 135, 137, 138, 139, 445, 1024, 1025, 2745, 3127, 6129, 12345, 27374, 31337]
MALICIOUS_IPS = [
'185.220.100.', '185.220.101.', '198.98.51.', '162.142.125.',
'89.248.171.', '45.148.10.', '192.42.116.', '171.25.193.'
]
# Internal networks
INTERNAL_NETWORKS = [
'192.168.1.', '192.168.0.', '10.0.0.', '10.0.1.', '172.16.1.', '172.16.0.'
]
# External IPs for legitimate traffic
EXTERNAL_IPS = [
'8.8.8.8', '8.8.4.4', '1.1.1.1', '208.67.222.222',
'142.250.191.', '140.82.114.', '52.84.', '54.230.'
]
def generate_internal_ip():
"""Generate internal IP address"""
network = random.choice(INTERNAL_NETWORKS)
return f"{network}{random.randint(1, 254)}"
def generate_external_ip():
"""Generate external IP address"""
if random.random() < 0.1: # 10% chance of malicious IP
malicious_range = random.choice(MALICIOUS_IPS)
return f"{malicious_range}{random.randint(1, 254)}"
else:
return random.choice(EXTERNAL_IPS) + str(random.randint(1, 254))
def generate_mac_address():
"""Generate MAC address"""
return ':'.join([f'{random.randint(0, 255):02x}' for _ in range(6)])
def generate_pfsense_log():
"""Generate pfSense firewall log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
hostname = 'firewall'
# Determine traffic direction and IPs
outbound = random.choice([True, False])
if outbound:
src_ip = generate_internal_ip()
dst_ip = generate_external_ip()
interface = 'wan'
else:
src_ip = generate_external_ip()
dst_ip = generate_internal_ip()
interface = random.choice(['lan', 'dmz'])
protocol = random.choice(PROTOCOLS)
if protocol in COMMON_PORTS:
if random.random() < 0.8: # 80% normal traffic
dst_port = random.choice(COMMON_PORTS[protocol])
else: # 20% suspicious traffic
dst_port = random.choice(ATTACK_PORTS)
else: # ICMP
dst_port = 0
src_port = random.randint(1024, 65535) if protocol != 'ICMP' else 0
# Determine action (block suspicious traffic more often)
if dst_port in ATTACK_PORTS or any(malicious in src_ip for malicious in MALICIOUS_IPS):
action = 'block'
else:
action = random.choices(['pass', 'block'], weights=[85, 15])[0]
# pfSense log format
rule_num = random.randint(1, 100)
tracker = random.randint(1000000000, 9999999999)
if protocol == 'ICMP':
log_entry = f"{timestamp} {hostname} filterlog: {rule_num},{tracker},,1000000103,{interface},match,{action},in,4,0x0,,64,{protocol},{len(src_ip + dst_ip)},{src_ip},{dst_ip},unreachable,need-frag"
else:
flags = random.choice(['S', 'A', 'SA', 'F', 'R', 'P'])
log_entry = f"{timestamp} {hostname} filterlog: {rule_num},{tracker},,1000000103,{interface},match,{action},in,4,0x0,,64,{protocol},{len(str(src_port) + str(dst_port))},{src_ip},{dst_ip},{src_port},{dst_port},{len(str(src_port))},{flags},,"
return log_entry
def generate_iptables_log():
"""Generate iptables log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
hostname = 'gateway'
src_ip = generate_external_ip() if random.choice([True, False]) else generate_internal_ip()
dst_ip = generate_internal_ip() if src_ip.startswith(('192.168', '10.', '172.16')) else generate_external_ip()
protocol = random.choice(PROTOCOLS)
if protocol in COMMON_PORTS:
dst_port = random.choice(COMMON_PORTS[protocol] + ATTACK_PORTS)
else:
dst_port = 0
src_port = random.randint(1024, 65535) if protocol != 'ICMP' else 0
# Determine action
if dst_port in ATTACK_PORTS:
action = 'DROP'
chain = 'FORWARD'
else:
action = random.choices(['ACCEPT', 'DROP'], weights=[80, 20])[0]
chain = random.choice(['INPUT', 'OUTPUT', 'FORWARD'])
interface_in = random.choice(['eth0', 'eth1', 'eth2'])
interface_out = random.choice(['eth0', 'eth1', 'eth2'])
mac_src = generate_mac_address()
if protocol == 'ICMP':
log_entry = f"{timestamp} {hostname} kernel: iptables {action}: IN={interface_in} OUT={interface_out} MAC={mac_src} SRC={src_ip} DST={dst_ip} LEN=84 TOS=0x00 PREC=0x00 TTL=64 ID={random.randint(1, 65535)} DF PROTO={protocol} TYPE=8 CODE=0"
else:
ttl = random.randint(32, 128)
packet_id = random.randint(1, 65535)
window = random.randint(1024, 65535)
flags = random.choice(['SYN', 'ACK', 'FIN', 'RST', 'PSH'])
log_entry = f"{timestamp} {hostname} kernel: iptables {action}: IN={interface_in} OUT={interface_out} MAC={mac_src} SRC={src_ip} DST={dst_ip} LEN={random.randint(40, 1500)} TOS=0x00 PREC=0x00 TTL={ttl} ID={packet_id} DF PROTO={protocol} SPT={src_port} DPT={dst_port} WINDOW={window} RES=0x00 {flags} URGP=0"
return log_entry
def generate_cisco_asa_log():
"""Generate Cisco ASA firewall log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %Y %H:%M:%S')
hostname = 'ASA-01'
# ASA message IDs and their descriptions
asa_messages = {
'106023': 'Deny tcp src {src_zone}:{src_ip}/{src_port} dst {dst_zone}:{dst_ip}/{dst_port} by access-group',
'106100': 'access-list {acl_name} {action} {protocol} {src_zone}/{src_ip}({src_port}) -> {dst_zone}/{dst_ip}({dst_port}) hit-cnt {count}',
'302013': 'Built {direction} TCP connection {conn_id} for {outside_zone}:{outside_ip}/{outside_port} to {inside_zone}:{inside_ip}/{inside_port}',
'302014': 'Teardown TCP connection {conn_id} for {outside_zone}:{outside_ip}/{outside_port} to {inside_zone}:{inside_ip}/{inside_port} duration {duration}',
'305011': 'Built {direction} UDP connection {conn_id} for {outside_zone}:{outside_ip}/{outside_port} to {inside_zone}:{inside_ip}/{inside_port}',
'313001': 'Denied ICMP type={icmp_type}, code={icmp_code} from {src_ip} on interface {interface}',
'733100': 'Object drop rate {rate} exceeded. Current burst rate is {burst_rate} per second, max configured rate is {max_rate}',
'710003': 'TCP access requested from {src_ip}/{src_port} to {dst_zone}:{dst_ip}/{dst_port}',
'419002': 'Received duplicate TCP SYN from {src_zone}:{src_ip}/{src_port} to {dst_zone}:{dst_ip}/{dst_port} with different initial sequence number'
}
msg_id = random.choice(list(asa_messages.keys()))
msg_template = asa_messages[msg_id]
# Generate IPs and zones
zones = ['inside', 'outside', 'dmz', 'management']
src_zone = random.choice(zones)
dst_zone = random.choice([z for z in zones if z != src_zone])
if src_zone == 'outside':
src_ip = generate_external_ip()
dst_ip = generate_internal_ip()
else:
src_ip = generate_internal_ip()
dst_ip = generate_external_ip() if dst_zone == 'outside' else generate_internal_ip()
protocol = random.choice(['tcp', 'udp', 'icmp'])
src_port = random.randint(1024, 65535)
if protocol in ['tcp', 'udp']:
if protocol == 'tcp':
dst_port = random.choice(COMMON_PORTS['TCP'] + ATTACK_PORTS)
else:
dst_port = random.choice(COMMON_PORTS['UDP'])
else:
dst_port = 0
# Fill in template variables
variables = {
'src_zone': src_zone,
'dst_zone': dst_zone,
'src_ip': src_ip,
'dst_ip': dst_ip,
'src_port': src_port,
'dst_port': dst_port,
'protocol': protocol,
'acl_name': f'acl_{random.choice(["inside", "outside", "dmz"])}',
'action': random.choice(['permit', 'deny']),
'conn_id': random.randint(1000, 9999),
'direction': random.choice(['inbound', 'outbound']),
'outside_zone': 'outside',
'inside_zone': 'inside',
'outside_ip': generate_external_ip(),
'inside_ip': generate_internal_ip(),
'outside_port': random.randint(1024, 65535),
'inside_port': random.choice(COMMON_PORTS.get('TCP', [80])),
'duration': f"{random.randint(1, 3600):02d}:{random.randint(0, 59):02d}:{random.randint(0, 59):02d}",
'interface': random.choice(['inside', 'outside', 'dmz']),
'icmp_type': random.randint(0, 18),
'icmp_code': random.randint(0, 15),
'rate': random.randint(1, 1000),
'burst_rate': random.randint(1, 100),
'max_rate': random.randint(100, 1000),
'count': random.randint(1, 100)
}
try:
message = msg_template.format(**variables)
except KeyError:
# Fallback for missing variables
message = f"Connection {random.choice(['established', 'denied', 'terminated'])} from {src_ip} to {dst_ip}"
# ASA log format: timestamp hostname %ASA-level-msgid: message
level = random.choice([1, 2, 3, 4, 5, 6, 7])
log_entry = f"{timestamp} {hostname} %ASA-{level}-{msg_id}: {message}"
return log_entry
def main():
log_file = Path("/var/log/app/firewall.log")
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting firewall log generator...")
while True:
try:
# Generate different firewall log types
log_type = random.choices(
['pfsense', 'iptables', 'cisco_asa'],
weights=[40, 30, 30],
k=1
)[0]
if log_type == 'pfsense':
log_entry = generate_pfsense_log()
elif log_type == 'iptables':
log_entry = generate_iptables_log()
else:
log_entry = generate_cisco_asa_log()
with open(log_file, "a") as f:
f.write(log_entry + "\n")
print(f"Generated {log_type} log: {log_entry[:100]}...")
# Random delay between 2-15 seconds
time.sleep(random.uniform(2, 15))
except KeyboardInterrupt:
print("Stopping firewall log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()