splunk_local/generators/cloud_logs.py
bpmcdevitt fa8fd73f1a Add security-focused log generators for SOC and SIEM testing
- Implement 5 new security log generators: Windows events, firewall logs, DNS queries, authentication logs, and cloud service logs
- Add 'security' Docker Compose profile for easy deployment of security generators
- Windows generator creates realistic Security/System/Application events with attack patterns (failed logins, account creation, service events)
- Firewall generator supports pfSense, iptables, and Cisco ASA formats with malicious traffic blocking simulation
- DNS generator includes DGA domains, suspicious lookups, and multiple DNS server formats (BIND, Pi-hole, Windows DNS)
- Authentication generator creates LDAP, RADIUS, and SSH logs with brute force attack patterns
- Cloud generator produces AWS CloudTrail, Azure Activity, and GCP audit logs with security-relevant events
- Update documentation with comprehensive security use cases for SOC training, threat hunting, and compliance testing
- Enhance Docker Compose configuration with new security profile and service definitions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 17:44:47 -05:00

395 lines
No EOL
14 KiB
Python

#!/usr/bin/env python3
"""
Cloud service log generator for Splunk security testing
Generates realistic AWS CloudTrail and Azure Activity logs with security events
"""
import time
import random
import datetime
import json
import uuid
from pathlib import Path
# AWS Services and regions
AWS_SERVICES = [
'ec2', 's3', 'iam', 'cloudformation', 'lambda', 'rds', 'vpc', 'cloudwatch',
'route53', 'elb', 'autoscaling', 'sns', 'sqs', 'dynamodb', 'kms', 'sts'
]
AWS_REGIONS = [
'us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1', 'ap-northeast-1'
]
# AWS IAM users and roles
AWS_USERS = [
'admin-user', 'dev-user-01', 'backup-service', 'monitoring-role',
'lambda-execution-role', 'ec2-instance-role', 'contractor-access',
'security-audit-user', 'automation-user', 'temp-user-123'
]
# Suspicious AWS users for attack simulation
SUSPICIOUS_AWS_USERS = [
'root', 'admin', 'test-user', 'deleted-user', 'unknown-user', 'external-user'
]
# Azure services and locations
AZURE_SERVICES = [
'Microsoft.Compute', 'Microsoft.Storage', 'Microsoft.Network',
'Microsoft.Authorization', 'Microsoft.Resources', 'Microsoft.KeyVault',
'Microsoft.Sql', 'Microsoft.Web', 'Microsoft.Security', 'Microsoft.Insights'
]
AZURE_LOCATIONS = [
'East US', 'West US 2', 'West Europe', 'Southeast Asia', 'Japan East'
]
# Azure users and service principals
AZURE_USERS = [
'admin@company.com', 'john.doe@company.com', 'service-principal-01',
'backup-automation', 'monitoring-agent', 'security-scanner',
'contractor@external.com', 'guest-user@company.com'
]
def generate_aws_ip():
"""Generate AWS-like IP addresses"""
aws_ranges = [
'54.', '52.', '18.', '34.', '35.', '13.', '3.', '15.',
'172.', '10.', '192.168.' # Internal ranges
]
base = random.choice(aws_ranges)
if len(base.split('.')) == 1:
return f"{base}{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
else:
return f"{base}{random.randint(0, 255)}.{random.randint(1, 254)}"
def generate_aws_cloudtrail_event():
"""Generate AWS CloudTrail event"""
event_time = datetime.datetime.utcnow().isoformat() + 'Z'
# Determine if this should be suspicious (20% chance)
is_suspicious = random.random() < 0.2
if is_suspicious:
# Suspicious events
event_patterns = [
{
'eventName': 'CreateUser',
'eventSource': 'iam.amazonaws.com',
'errorCode': None,
'risk_level': 'high'
},
{
'eventName': 'PutBucketPolicy',
'eventSource': 's3.amazonaws.com',
'errorCode': None,
'risk_level': 'medium'
},
{
'eventName': 'CreateAccessKey',
'eventSource': 'iam.amazonaws.com',
'errorCode': None,
'risk_level': 'high'
},
{
'eventName': 'ConsoleLogin',
'eventSource': 'signin.amazonaws.com',
'errorCode': 'Failed authentication',
'risk_level': 'medium'
},
{
'eventName': 'RunInstances',
'eventSource': 'ec2.amazonaws.com',
'errorCode': None,
'risk_level': 'medium'
}
]
pattern = random.choice(event_patterns)
user_identity = {
'type': random.choice(['IAMUser', 'Root', 'AssumedRole']),
'userName': random.choice(SUSPICIOUS_AWS_USERS),
'arn': f"arn:aws:iam::123456789012:user/{random.choice(SUSPICIOUS_AWS_USERS)}"
}
source_ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
else:
# Normal events
event_patterns = [
{
'eventName': 'DescribeInstances',
'eventSource': 'ec2.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
},
{
'eventName': 'GetObject',
'eventSource': 's3.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
},
{
'eventName': 'ListBuckets',
'eventSource': 's3.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
},
{
'eventName': 'PutMetricData',
'eventSource': 'monitoring.amazonaws.com',
'errorCode': None,
'risk_level': 'low'
}
]
pattern = random.choice(event_patterns)
user_identity = {
'type': random.choice(['IAMUser', 'AssumedRole']),
'userName': random.choice(AWS_USERS),
'arn': f"arn:aws:iam::123456789012:user/{random.choice(AWS_USERS)}"
}
source_ip = generate_aws_ip()
cloudtrail_event = {
'eventVersion': '1.05',
'userIdentity': user_identity,
'eventTime': event_time,
'eventSource': pattern['eventSource'],
'eventName': pattern['eventName'],
'awsRegion': random.choice(AWS_REGIONS),
'sourceIPAddress': source_ip,
'userAgent': random.choice([
'aws-cli/2.1.34 Python/3.8.8',
'console.aws.amazon.com',
'Boto3/1.17.49 Python/3.8.8',
'terraform/1.0.0'
]),
'requestID': str(uuid.uuid4()),
'eventID': str(uuid.uuid4()),
'eventType': 'AwsApiCall',
'managementEvent': True,
'recipientAccountId': '123456789012',
'serviceEventDetails': None
}
if pattern['errorCode']:
cloudtrail_event['errorCode'] = pattern['errorCode']
cloudtrail_event['errorMessage'] = 'Authentication failed'
# Add request parameters based on event
if pattern['eventName'] == 'RunInstances':
cloudtrail_event['requestParameters'] = {
'instanceType': random.choice(['t3.micro', 't3.small', 'm5.large']),
'maxCount': random.randint(1, 5),
'minCount': 1
}
elif pattern['eventName'] == 'CreateUser':
cloudtrail_event['requestParameters'] = {
'userName': f"new-user-{random.randint(1, 999)}"
}
return cloudtrail_event
def generate_azure_activity_event():
"""Generate Azure Activity Log event"""
event_time = datetime.datetime.utcnow().isoformat() + 'Z'
# Determine if this should be suspicious (18% chance)
is_suspicious = random.random() < 0.18
if is_suspicious:
# Suspicious Azure events
operations = [
{
'operationName': 'Microsoft.Authorization/roleAssignments/write',
'resourceType': 'Microsoft.Authorization/roleAssignments',
'level': 'Warning',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Compute/virtualMachines/write',
'resourceType': 'Microsoft.Compute/virtualMachines',
'level': 'Informational',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Storage/storageAccounts/delete',
'resourceType': 'Microsoft.Storage/storageAccounts',
'level': 'Warning',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Authorization/policies/write',
'resourceType': 'Microsoft.Authorization/policies',
'level': 'Warning',
'status': 'Succeeded'
}
]
caller = random.choice([
'external-user@company.com',
'temp-admin@company.com',
'service-principal-unknown'
])
else:
# Normal Azure events
operations = [
{
'operationName': 'Microsoft.Compute/virtualMachines/read',
'resourceType': 'Microsoft.Compute/virtualMachines',
'level': 'Informational',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Storage/storageAccounts/read',
'resourceType': 'Microsoft.Storage/storageAccounts',
'level': 'Informational',
'status': 'Succeeded'
},
{
'operationName': 'Microsoft.Network/networkSecurityGroups/read',
'resourceType': 'Microsoft.Network/networkSecurityGroups',
'level': 'Informational',
'status': 'Succeeded'
}
]
caller = random.choice(AZURE_USERS)
operation = random.choice(operations)
azure_event = {
'time': event_time,
'resourceId': f"/subscriptions/{uuid.uuid4()}/resourceGroups/rg-production/providers/{operation['resourceType']}/resource-{random.randint(1, 999)}",
'operationName': operation['operationName'],
'operationVersion': '2020-06-01',
'category': 'Administrative',
'resultType': operation['status'],
'resultSignature': random.choice(['Started', 'Succeeded', 'Failed']),
'durationMs': random.randint(100, 5000),
'callerIpAddress': f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}",
'correlationId': str(uuid.uuid4()),
'identity': {
'claims': {
'name': caller,
'aud': 'https://management.azure.com/',
'iss': 'https://sts.windows.net/' + str(uuid.uuid4()) + '/'
}
},
'level': operation['level'],
'location': random.choice(AZURE_LOCATIONS),
'properties': {
'statusCode': random.choice(['OK', 'Created', 'Accepted', 'BadRequest', 'Unauthorized', 'Forbidden']),
'serviceRequestId': str(uuid.uuid4()),
'statusMessage': random.choice(['Success', 'Resource created', 'Access denied', 'Resource not found'])
}
}
return azure_event
def generate_gcp_audit_log():
"""Generate Google Cloud Platform audit log event (bonus cloud provider)"""
timestamp = datetime.datetime.utcnow().isoformat() + 'Z'
# GCP services and methods
gcp_services = [
'compute.googleapis.com',
'storage.googleapis.com',
'iam.googleapis.com',
'cloudresourcemanager.googleapis.com',
'logging.googleapis.com'
]
service = random.choice(gcp_services)
if service == 'compute.googleapis.com':
method_name = random.choice([
'v1.compute.instances.insert',
'v1.compute.instances.delete',
'v1.compute.instances.list'
])
elif service == 'iam.googleapis.com':
method_name = random.choice([
'google.iam.admin.v1.CreateServiceAccount',
'google.iam.admin.v1.DeleteServiceAccount'
])
else:
method_name = f"{service}.list"
gcp_event = {
'protoPayload': {
'@type': 'type.googleapis.com/google.cloud.audit.AuditLog',
'serviceName': service,
'methodName': method_name,
'authenticationInfo': {
'principalEmail': random.choice([
'user@company.com',
'service-account@project.iam.gserviceaccount.com',
'admin@company.com'
])
},
'requestMetadata': {
'callerIp': f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}",
'callerSuppliedUserAgent': 'google-cloud-sdk gcloud/345.0.0'
}
},
'insertId': str(uuid.uuid4()).replace('-', ''),
'resource': {
'type': 'gce_instance',
'labels': {
'project_id': 'my-project-123',
'zone': random.choice(['us-central1-a', 'us-east1-b', 'europe-west1-c'])
}
},
'timestamp': timestamp,
'severity': random.choice(['INFO', 'WARNING', 'ERROR']),
'logName': f"projects/my-project-123/logs/cloudaudit.googleapis.com%2Factivity"
}
return gcp_event
def main():
# Create log files for different cloud providers
aws_log = Path("/var/log/app/aws_cloudtrail.json")
azure_log = Path("/var/log/app/azure_activity.json")
gcp_log = Path("/var/log/app/gcp_audit.json")
for log_file in [aws_log, azure_log, gcp_log]:
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting cloud service log generator...")
while True:
try:
# Generate different cloud provider logs
cloud_provider = random.choices(
['aws', 'azure', 'gcp'],
weights=[50, 35, 15],
k=1
)[0]
if cloud_provider == 'aws':
event = generate_aws_cloudtrail_event()
log_file = aws_log
elif cloud_provider == 'azure':
event = generate_azure_activity_event()
log_file = azure_log
else:
event = generate_gcp_audit_log()
log_file = gcp_log
# Write as JSON line
json_line = json.dumps(event, separators=(',', ':'))
with open(log_file, "a") as f:
f.write(json_line + "\n")
print(f"Generated {cloud_provider.upper()} event: {event.get('eventName', event.get('operationName', 'unknown'))}")
# Random delay between 5-25 seconds (cloud events are less frequent)
time.sleep(random.uniform(5, 25))
except KeyboardInterrupt:
print("Stopping cloud service log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()