LifeRPG_v2.0/modern/backend/security_monitor.py
TLimoges33 2b961611fd
🚀 Major Enhancement: Complete AI-Powered LifeRPG Platform with Git LFS
 New Features:
- AI-powered habit creation with natural language processing
- HuggingFace transformers integration for sentiment analysis (tracked via Git LFS)
- Advanced predictive analytics and behavioral insights
- Voice & image input capabilities for hands-free habit tracking
- Real-time notifications and community features
- Plugin system with extensible architecture

🔧 Technical Improvements:
- Comprehensive FastAPI backend with 30+ endpoints
- React frontend with PWA capabilities
- Advanced authentication with 2FA support
- RBAC authorization system
- Comprehensive security features (CSRF, rate limiting, audit logging)
- Database migrations and health monitoring
- Docker containerization support
- Git LFS configured for large AI model files (2+ GB)

📚 Documentation & DevOps:
- Complete deployment guides for multiple platforms
- Professional README with feature highlights
- GitHub Actions CI/CD workflows
- Comprehensive API documentation
- Security audit roadmap and compliance framework
- Setup scripts for development environment

🧪 Testing & Quality:
- Comprehensive test suite with 20+ test modules
- Setup verification scripts
- Working development environment with both backend and frontend
- Health checks and monitoring systems

🌟 Ready for:
- Portfolio showcasing
- Community contributions
- Production deployment
- Professional presentation
2025-09-28 21:29:19 +00:00

261 lines
9.6 KiB
Python

"""
Security monitoring and alerting system
"""
import logging
import json
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List
from dataclasses import dataclass
from enum import Enum
import hashlib
class SecurityEventType(Enum):
"""Security event types for monitoring"""
LOGIN_FAILURE = "login_failure"
LOGIN_SUCCESS = "login_success"
RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
INVALID_2FA = "invalid_2fa"
ACCOUNT_LOCKOUT = "account_lockout"
UNAUTHORIZED_ACCESS = "unauthorized_access"
SQL_INJECTION_ATTEMPT = "sql_injection_attempt"
XSS_ATTEMPT = "xss_attempt"
CSRF_VIOLATION = "csrf_violation"
SUSPICIOUS_USER_AGENT = "suspicious_user_agent"
ANOMALOUS_LOGIN_LOCATION = "anomalous_login_location"
PASSWORD_BRUTE_FORCE = "password_brute_force"
PRIVILEGE_ESCALATION = "privilege_escalation"
DATA_EXPORT_LARGE = "data_export_large"
ADMIN_ACTION = "admin_action"
@dataclass
class SecurityEvent:
"""Security event data structure"""
event_type: SecurityEventType
user_id: str = None
ip_address: str = None
user_agent: str = None
request_path: str = None
timestamp: datetime = None
details: Dict[str, Any] = None
severity: str = "medium" # low, medium, high, critical
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow()
if self.details is None:
self.details = {}
class SecurityMonitor:
"""Security monitoring and alerting system"""
def __init__(self):
self.events: List[SecurityEvent] = []
self.logger = self._setup_security_logger()
self.alert_thresholds = {
SecurityEventType.LOGIN_FAILURE: {"count": 5, "window_minutes": 5},
SecurityEventType.RATE_LIMIT_EXCEEDED: {"count": 10, "window_minutes": 1},
SecurityEventType.INVALID_2FA: {"count": 3, "window_minutes": 5},
SecurityEventType.UNAUTHORIZED_ACCESS: {"count": 1, "window_minutes": 1},
}
self.blocked_ips: Dict[str, datetime] = {}
def _setup_security_logger(self):
"""Set up dedicated security event logger"""
logger = logging.getLogger("security")
logger.setLevel(logging.INFO)
# Create security log handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - SECURITY - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def log_event(self, event: SecurityEvent):
"""Log a security event"""
self.events.append(event)
# Log to security logger
log_data = {
"event_type": event.event_type.value,
"user_id": event.user_id,
"ip_address": self._hash_ip(event.ip_address) if event.ip_address else None,
"user_agent_hash": self._hash_user_agent(event.user_agent) if event.user_agent else None,
"request_path": event.request_path,
"timestamp": event.timestamp.isoformat(),
"severity": event.severity,
"details": event.details,
}
self.logger.info(json.dumps(log_data))
# Check for alert conditions
self._check_alert_conditions(event)
# Cleanup old events (keep last 1000)
if len(self.events) > 1000:
self.events = self.events[-1000:]
def _hash_ip(self, ip: str) -> str:
"""Hash IP for privacy while maintaining uniqueness"""
return hashlib.sha256(f"ip_{ip}".encode()).hexdigest()[:16]
def _hash_user_agent(self, user_agent: str) -> str:
"""Hash user agent for privacy"""
return hashlib.sha256(f"ua_{user_agent}".encode()).hexdigest()[:16]
def _check_alert_conditions(self, event: SecurityEvent):
"""Check if event triggers an alert"""
event_type = event.event_type
if event_type not in self.alert_thresholds:
return
threshold = self.alert_thresholds[event_type]
window_start = datetime.utcnow() - timedelta(minutes=threshold["window_minutes"])
# Count recent events of this type from same IP
recent_events = [
e for e in self.events
if (e.event_type == event_type and
e.ip_address == event.ip_address and
e.timestamp >= window_start)
]
if len(recent_events) >= threshold["count"]:
self._trigger_alert(event, recent_events)
def _trigger_alert(self, event: SecurityEvent, recent_events: List[SecurityEvent]):
"""Trigger security alert"""
alert_data = {
"alert_type": "security_threshold_exceeded",
"event_type": event.event_type.value,
"ip_address": self._hash_ip(event.ip_address) if event.ip_address else None,
"event_count": len(recent_events),
"time_window": self.alert_thresholds[event.event_type]["window_minutes"],
"timestamp": datetime.utcnow().isoformat(),
"recommended_action": self._get_recommended_action(event.event_type),
}
self.logger.warning(f"SECURITY ALERT: {json.dumps(alert_data)}")
# Auto-block IP for certain event types
if event.event_type in [SecurityEventType.PASSWORD_BRUTE_FORCE,
SecurityEventType.RATE_LIMIT_EXCEEDED]:
self._block_ip(event.ip_address)
def _get_recommended_action(self, event_type: SecurityEventType) -> str:
"""Get recommended action for event type"""
actions = {
SecurityEventType.LOGIN_FAILURE: "Consider IP blocking or account lockout",
SecurityEventType.RATE_LIMIT_EXCEEDED: "IP temporarily blocked",
SecurityEventType.INVALID_2FA: "Monitor for account compromise",
SecurityEventType.UNAUTHORIZED_ACCESS: "Investigate immediately",
SecurityEventType.PASSWORD_BRUTE_FORCE: "IP blocked, notify user",
}
return actions.get(event_type, "Monitor and investigate")
def _block_ip(self, ip_address: str, duration_minutes: int = 30):
"""Block IP address temporarily"""
if ip_address:
block_until = datetime.utcnow() + timedelta(minutes=duration_minutes)
self.blocked_ips[ip_address] = block_until
self.logger.warning(f"IP {self._hash_ip(ip_address)} blocked until {block_until}")
def is_ip_blocked(self, ip_address: str) -> bool:
"""Check if IP is currently blocked"""
if not ip_address or ip_address not in self.blocked_ips:
return False
block_until = self.blocked_ips[ip_address]
if datetime.utcnow() > block_until:
# Block expired, remove it
del self.blocked_ips[ip_address]
return False
return True
def get_security_metrics(self) -> Dict[str, Any]:
"""Get security metrics for dashboard"""
now = datetime.utcnow()
last_hour = now - timedelta(hours=1)
last_day = now - timedelta(days=1)
recent_events = [e for e in self.events if e.timestamp >= last_hour]
daily_events = [e for e in self.events if e.timestamp >= last_day]
metrics = {
"events_last_hour": len(recent_events),
"events_last_24h": len(daily_events),
"blocked_ips_count": len(self.blocked_ips),
"top_event_types_hour": self._get_top_event_types(recent_events),
"top_event_types_day": self._get_top_event_types(daily_events),
"critical_events_hour": len([e for e in recent_events if e.severity == "critical"]),
}
return metrics
def _get_top_event_types(self, events: List[SecurityEvent]) -> Dict[str, int]:
"""Get top event types by count"""
event_counts = {}
for event in events:
event_type = event.event_type.value
event_counts[event_type] = event_counts.get(event_type, 0) + 1
# Return top 5
return dict(sorted(event_counts.items(), key=lambda x: x[1], reverse=True)[:5])
# Global security monitor instance
security_monitor = SecurityMonitor()
# Helper functions for easy integration
def log_login_failure(user_id: str, ip_address: str, user_agent: str = None):
"""Log login failure event"""
event = SecurityEvent(
event_type=SecurityEventType.LOGIN_FAILURE,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
severity="medium"
)
security_monitor.log_event(event)
def log_unauthorized_access(user_id: str, ip_address: str, request_path: str, user_agent: str = None):
"""Log unauthorized access attempt"""
event = SecurityEvent(
event_type=SecurityEventType.UNAUTHORIZED_ACCESS,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
request_path=request_path,
severity="high"
)
security_monitor.log_event(event)
def log_rate_limit_exceeded(ip_address: str, request_path: str, user_agent: str = None):
"""Log rate limit exceeded event"""
event = SecurityEvent(
event_type=SecurityEventType.RATE_LIMIT_EXCEEDED,
ip_address=ip_address,
user_agent=user_agent,
request_path=request_path,
severity="medium"
)
security_monitor.log_event(event)
def check_ip_blocked(ip_address: str) -> bool:
"""Check if IP is blocked"""
return security_monitor.is_ip_blocked(ip_address)