created twitter cli for security usecases. warning put in readme for free tier use

This commit is contained in:
Brendan McDevitt 2025-07-21 08:52:49 -05:00
parent 0b852c3124
commit 2f44964301
7 changed files with 548 additions and 0 deletions

12
.env.example Normal file
View file

@ -0,0 +1,12 @@
# Twitter API Credentials
# Get these from https://developer.twitter.com/
TWITTER_BEARER_TOKEN=your_bearer_token_here
TWITTER_API_KEY=your_api_key_here
TWITTER_API_SECRET=your_api_secret_here
TWITTER_ACCESS_TOKEN=your_access_token_here
TWITTER_ACCESS_TOKEN_SECRET=your_access_token_secret_here
# Application Settings
MAX_TWEETS=100
CACHE_DURATION=300
OUTPUT_FORMAT=json

42
.gitignore vendored Normal file
View file

@ -0,0 +1,42 @@
# Environment files
.env
.env.local
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Data and logs
data/
*.log
# Docker
docker-compose.override.yml
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db

24
Dockerfile Normal file
View file

@ -0,0 +1,24 @@
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
jq \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create data and config directories
RUN mkdir -p /app/data /app/config
# Set executable permissions
RUN chmod +x main.py
CMD ["python", "main.py"]

102
README.md
View file

@ -0,0 +1,102 @@
# Security Twitter CLI
A Docker Compose application that provides a Python-based CLI tool for security researchers to monitor Twitter for vulnerability data, threat intelligence, and security-related discussions.
## Features
- **Hashtag Search**: Monitor security-related hashtags like #cybersecurity, #malware, #vulnerability
- **Trending Analysis**: Get security-related trending topics
- **Vulnerability Detection**: Automatic CVE pattern detection and extraction
- **Security Scoring**: Intelligent relevance scoring for tweets
- **Multiple Formats**: Output in table, JSON, or CSV format
- **Caching**: Redis-based caching to manage API rate limits
- **Logging**: Comprehensive logging for monitoring and analysis
## Quick Start
1. **Get Twitter API Credentials**
- Apply for a Twitter Developer account at https://developer.twitter.com/
- Create a new app and generate API keys
2. **Configure Environment**
```bash
cp .env.example .env
# Edit .env with your Twitter API credentials
```
3. **Build and Run**
```bash
docker-compose up --build
```
4. **Search for Security Data**
```bash
# Search for cybersecurity and malware tweets
docker-compose run twitter-cli python main.py search -h cybersecurity -h malware
# Get trending security topics
docker-compose run twitter-cli python main.py trending
# Analyze vulnerability mentions
docker-compose run twitter-cli python main.py analyze -h vulnerability -h cve
```
## CLI Commands
### Search Command
```bash
python main.py search [OPTIONS]
```
- `-h, --hashtags`: Security hashtags to search (required, can use multiple)
- `-n, --max-results`: Maximum tweets per hashtag (default: 100)
- `-o, --output`: Output format: table/json/csv (default: table)
- `-s, --min-score`: Minimum security score threshold (default: 1.0)
### Trending Command
```bash
python main.py trending [OPTIONS]
```
- `-a, --analyze`: Show detailed analysis
### Analyze Command
```bash
python main.py analyze [OPTIONS]
```
- `-h, --hashtags`: Hashtags to analyze (required)
- `-n, --max-results`: Maximum tweets to analyze (default: 100)
## Example Usage
```bash
# Search for high-priority security alerts
docker-compose run twitter-cli python main.py search -h cybersecurity -h breach -s 3.0
# Export vulnerability data as JSON
docker-compose run twitter-cli python main.py search -h cve -h vulnerability -o json > vulns.json
# Monitor multiple security topics
docker-compose run twitter-cli python main.py search -h malware -h ransomware -h phishing -n 50
```
## Security Focus
This tool is designed specifically for **defensive security research**:
- Vulnerability disclosure monitoring
- Threat intelligence gathering
- CVE tracking and analysis
- Security trend identification
- Incident response support
## Data Storage
- Logs: `./data/security_twitter.log`
- Cache: Redis container with persistent volume
- Config: `./config/` directory (mounted volume)
## Requirements
- Docker and Docker Compose
- Twitter Developer Account with API access
- Internet connection for API calls
⚠️ **Twitter API Limitation**: The Twitter Free tier API only allows pulling 100 posts per month. Consider upgrading to a paid tier for production use or extensive monitoring.

36
docker-compose.yml Normal file
View file

@ -0,0 +1,36 @@
version: '3.8'
services:
twitter-cli:
build: .
container_name: security-twitter-cli
volumes:
- ./data:/app/data
- ./config:/app/config
environment:
- TWITTER_BEARER_TOKEN=${TWITTER_BEARER_TOKEN}
- TWITTER_API_KEY=${TWITTER_API_KEY}
- TWITTER_API_SECRET=${TWITTER_API_SECRET}
- TWITTER_ACCESS_TOKEN=${TWITTER_ACCESS_TOKEN}
- TWITTER_ACCESS_TOKEN_SECRET=${TWITTER_ACCESS_TOKEN_SECRET}
stdin_open: true
tty: true
command: ["python", "main.py"]
networks:
- security-net
redis:
image: redis:7-alpine
container_name: security-redis
volumes:
- redis_data:/data
networks:
- security-net
command: redis-server --appendonly yes
networks:
security-net:
driver: bridge
volumes:
redis_data:

323
main.py Normal file
View file

@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
Security-focused Twitter CLI for threat monitoring and vulnerability research.
"""
import os
import sys
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import click
import tweepy
import redis
import pandas as pd
from colorama import init, Fore, Style
from tabulate import tabulate
from dotenv import load_dotenv
# Initialize colorama
init()
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data/security_twitter.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class SecurityTwitterCLI:
def __init__(self):
self.setup_twitter_api()
self.setup_redis()
self.security_keywords = [
'vulnerability', 'exploit', 'malware', 'ransomware', 'phishing',
'databreach', 'cybersecurity', 'infosec', 'threathunting', 'apt',
'zeroday', '0day', 'cve', 'security', 'breach', 'attack'
]
def setup_twitter_api(self):
"""Initialize Twitter API client."""
try:
bearer_token = os.getenv('TWITTER_BEARER_TOKEN')
if not bearer_token:
raise ValueError("Twitter Bearer Token not found in environment")
self.twitter_client = tweepy.Client(
bearer_token=bearer_token,
consumer_key=os.getenv('TWITTER_API_KEY'),
consumer_secret=os.getenv('TWITTER_API_SECRET'),
access_token=os.getenv('TWITTER_ACCESS_TOKEN'),
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'),
wait_on_rate_limit=True
)
logger.info("Twitter API client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Twitter API: {e}")
sys.exit(1)
def setup_redis(self):
"""Initialize Redis connection for caching."""
try:
self.redis_client = redis.Redis(host='redis', port=6379, db=0, decode_responses=True)
self.redis_client.ping()
logger.info("Redis connection established")
except Exception as e:
logger.warning(f"Redis connection failed: {e}. Caching disabled.")
self.redis_client = None
def cache_get(self, key: str) -> Optional[str]:
"""Get data from cache."""
if self.redis_client:
return self.redis_client.get(key)
return None
def cache_set(self, key: str, value: str, ttl: int = 300):
"""Set data in cache with TTL."""
if self.redis_client:
self.redis_client.setex(key, ttl, value)
def search_security_hashtags(self, hashtags: List[str], max_results: int = 100) -> List[Dict]:
"""Search for tweets containing security-related hashtags."""
all_tweets = []
for hashtag in hashtags:
cache_key = f"hashtag:{hashtag}:{max_results}"
cached_data = self.cache_get(cache_key)
if cached_data:
logger.info(f"Using cached data for #{hashtag}")
all_tweets.extend(json.loads(cached_data))
continue
try:
query = f"#{hashtag} -is:retweet lang:en"
tweets = tweepy.Paginator(
self.twitter_client.search_recent_tweets,
query=query,
tweet_fields=['created_at', 'author_id', 'public_metrics', 'context_annotations'],
max_results=min(max_results, 100)
).flatten(limit=max_results)
tweet_data = []
for tweet in tweets:
tweet_info = {
'id': tweet.id,
'text': tweet.text,
'created_at': tweet.created_at.isoformat(),
'author_id': tweet.author_id,
'retweet_count': tweet.public_metrics['retweet_count'],
'like_count': tweet.public_metrics['like_count'],
'hashtag': hashtag,
'security_score': self.calculate_security_score(tweet.text)
}
tweet_data.append(tweet_info)
self.cache_set(cache_key, json.dumps(tweet_data))
all_tweets.extend(tweet_data)
logger.info(f"Found {len(tweet_data)} tweets for #{hashtag}")
except Exception as e:
logger.error(f"Error searching #{hashtag}: {e}")
return all_tweets
def get_trending_topics(self, woeid: int = 1) -> List[Dict]:
"""Get trending topics (worldwide by default)."""
cache_key = f"trending:{woeid}"
cached_data = self.cache_get(cache_key)
if cached_data:
logger.info("Using cached trending data")
return json.loads(cached_data)
try:
# Note: This requires Twitter API v1.1 access
auth = tweepy.OAuth1UserHandler(
consumer_key=os.getenv('TWITTER_API_KEY'),
consumer_secret=os.getenv('TWITTER_API_SECRET'),
access_token=os.getenv('TWITTER_ACCESS_TOKEN'),
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET')
)
api = tweepy.API(auth)
trends = api.get_place_trends(woeid)[0]['trends']
# Filter for security-related trends
security_trends = []
for trend in trends:
name = trend['name'].lower()
if any(keyword in name for keyword in self.security_keywords):
security_trends.append({
'name': trend['name'],
'url': trend['url'],
'tweet_volume': trend['tweet_volume'],
'security_relevance': 'high'
})
self.cache_set(cache_key, json.dumps(security_trends), ttl=600)
return security_trends
except Exception as e:
logger.error(f"Error getting trending topics: {e}")
return []
def calculate_security_score(self, text: str) -> float:
"""Calculate a security relevance score for tweet text."""
text_lower = text.lower()
score = 0.0
# High-value security keywords
high_value_keywords = ['cve', '0day', 'zeroday', 'exploit', 'malware', 'breach']
for keyword in high_value_keywords:
if keyword in text_lower:
score += 2.0
# Medium-value security keywords
medium_value_keywords = ['vulnerability', 'security', 'threat', 'attack']
for keyword in medium_value_keywords:
if keyword in text_lower:
score += 1.0
# Urgency indicators
urgency_keywords = ['urgent', 'critical', 'immediate', 'alert']
for keyword in urgency_keywords:
if keyword in text_lower:
score += 1.5
return min(score, 10.0) # Cap at 10.0
def analyze_vulnerability_mentions(self, tweets: List[Dict]) -> Dict[str, Any]:
"""Analyze tweets for vulnerability mentions and patterns."""
analysis = {
'total_tweets': len(tweets),
'high_priority': [],
'cve_mentions': [],
'threat_actors': [],
'common_keywords': {},
'timeline_analysis': {}
}
for tweet in tweets:
# High priority tweets (score > 5)
if tweet['security_score'] > 5.0:
analysis['high_priority'].append(tweet)
# CVE pattern matching
import re
cve_pattern = r'CVE-\d{4}-\d{4,7}'
cves = re.findall(cve_pattern, tweet['text'], re.IGNORECASE)
if cves:
analysis['cve_mentions'].extend(cves)
# Keyword frequency
words = tweet['text'].lower().split()
for word in words:
if word in self.security_keywords:
analysis['common_keywords'][word] = analysis['common_keywords'].get(word, 0) + 1
return analysis
@click.group()
def cli():
"""Security-focused Twitter CLI for threat monitoring and vulnerability research."""
pass
@cli.command()
@click.option('--hashtags', '-h', multiple=True, required=True,
help='Security hashtags to search for (e.g., -h cybersecurity -h malware)')
@click.option('--max-results', '-n', default=100,
help='Maximum number of tweets to retrieve per hashtag')
@click.option('--output', '-o', type=click.Choice(['table', 'json', 'csv']), default='table',
help='Output format')
@click.option('--min-score', '-s', default=1.0,
help='Minimum security score threshold')
def search(hashtags, max_results, output, min_score):
"""Search for security-related tweets by hashtags."""
click.echo(f"{Fore.CYAN}🔍 Searching for security tweets...{Style.RESET_ALL}")
twitter_cli = SecurityTwitterCLI()
tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results)
# Filter by security score
filtered_tweets = [t for t in tweets if t['security_score'] >= min_score]
if output == 'json':
click.echo(json.dumps(filtered_tweets, indent=2))
elif output == 'csv':
df = pd.DataFrame(filtered_tweets)
click.echo(df.to_csv(index=False))
else: # table
if filtered_tweets:
table_data = []
for tweet in filtered_tweets[:20]: # Show top 20
table_data.append([
tweet['created_at'][:10],
tweet['hashtag'],
tweet['text'][:80] + '...' if len(tweet['text']) > 80 else tweet['text'],
f"{tweet['security_score']:.1f}",
tweet['like_count']
])
headers = ['Date', 'Hashtag', 'Tweet', 'Score', 'Likes']
click.echo(tabulate(table_data, headers=headers, tablefmt='grid'))
click.echo(f"{Fore.GREEN}Found {len(filtered_tweets)} relevant tweets{Style.RESET_ALL}")
else:
click.echo(f"{Fore.YELLOW}No tweets found matching criteria{Style.RESET_ALL}")
@cli.command()
@click.option('--analyze', '-a', is_flag=True, help='Show detailed analysis')
def trending(analyze):
"""Get security-related trending topics."""
click.echo(f"{Fore.CYAN}📈 Getting security-related trending topics...{Style.RESET_ALL}")
twitter_cli = SecurityTwitterCLI()
trends = twitter_cli.get_trending_topics()
if trends:
for trend in trends:
volume = trend['tweet_volume'] if trend['tweet_volume'] else 'N/A'
click.echo(f"{Fore.GREEN}{trend['name']}{Style.RESET_ALL} (Volume: {volume})")
else:
click.echo(f"{Fore.YELLOW}No security-related trending topics found{Style.RESET_ALL}")
@cli.command()
@click.option('--hashtags', '-h', multiple=True, required=True)
@click.option('--max-results', '-n', default=100)
def analyze(hashtags, max_results):
"""Analyze vulnerability mentions and threat patterns."""
click.echo(f"{Fore.CYAN}🔬 Analyzing security threats...{Style.RESET_ALL}")
twitter_cli = SecurityTwitterCLI()
tweets = twitter_cli.search_security_hashtags(list(hashtags), max_results)
analysis = twitter_cli.analyze_vulnerability_mentions(tweets)
click.echo(f"\n{Fore.GREEN}📊 Analysis Results:{Style.RESET_ALL}")
click.echo(f"Total tweets analyzed: {analysis['total_tweets']}")
click.echo(f"High priority alerts: {len(analysis['high_priority'])}")
click.echo(f"CVE mentions found: {len(set(analysis['cve_mentions']))}")
if analysis['cve_mentions']:
click.echo(f"\n{Fore.YELLOW}🚨 CVEs mentioned:{Style.RESET_ALL}")
for cve in set(analysis['cve_mentions']):
click.echo(f"{cve}")
if analysis['common_keywords']:
click.echo(f"\n{Fore.BLUE}🔑 Top security keywords:{Style.RESET_ALL}")
sorted_keywords = sorted(analysis['common_keywords'].items(), key=lambda x: x[1], reverse=True)
for keyword, count in sorted_keywords[:10]:
click.echo(f"{keyword}: {count}")
if __name__ == '__main__':
# Create data directory if it doesn't exist
os.makedirs('data', exist_ok=True)
cli()

9
requirements.txt Normal file
View file

@ -0,0 +1,9 @@
tweepy==4.14.0
click==8.1.7
redis==5.0.1
python-dotenv==1.0.0
requests==2.31.0
pandas==2.1.4
colorama==0.4.6
tabulate==0.9.0
python-dateutil==2.8.2