✨ New Features: - AI-powered habit creation with natural language processing - HuggingFace transformers integration for sentiment analysis (tracked via Git LFS) - Advanced predictive analytics and behavioral insights - Voice & image input capabilities for hands-free habit tracking - Real-time notifications and community features - Plugin system with extensible architecture 🔧 Technical Improvements: - Comprehensive FastAPI backend with 30+ endpoints - React frontend with PWA capabilities - Advanced authentication with 2FA support - RBAC authorization system - Comprehensive security features (CSRF, rate limiting, audit logging) - Database migrations and health monitoring - Docker containerization support - Git LFS configured for large AI model files (2+ GB) 📚 Documentation & DevOps: - Complete deployment guides for multiple platforms - Professional README with feature highlights - GitHub Actions CI/CD workflows - Comprehensive API documentation - Security audit roadmap and compliance framework - Setup scripts for development environment 🧪 Testing & Quality: - Comprehensive test suite with 20+ test modules - Setup verification scripts - Working development environment with both backend and frontend - Health checks and monitoring systems 🌟 Ready for: - Portfolio showcasing - Community contributions - Production deployment - Professional presentation
261 lines
9.6 KiB
Python
261 lines
9.6 KiB
Python
"""
|
|
Security monitoring and alerting system
|
|
"""
|
|
import logging
|
|
import json
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, List
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import hashlib
|
|
|
|
|
|
class SecurityEventType(Enum):
|
|
"""Security event types for monitoring"""
|
|
LOGIN_FAILURE = "login_failure"
|
|
LOGIN_SUCCESS = "login_success"
|
|
RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
|
|
INVALID_2FA = "invalid_2fa"
|
|
ACCOUNT_LOCKOUT = "account_lockout"
|
|
UNAUTHORIZED_ACCESS = "unauthorized_access"
|
|
SQL_INJECTION_ATTEMPT = "sql_injection_attempt"
|
|
XSS_ATTEMPT = "xss_attempt"
|
|
CSRF_VIOLATION = "csrf_violation"
|
|
SUSPICIOUS_USER_AGENT = "suspicious_user_agent"
|
|
ANOMALOUS_LOGIN_LOCATION = "anomalous_login_location"
|
|
PASSWORD_BRUTE_FORCE = "password_brute_force"
|
|
PRIVILEGE_ESCALATION = "privilege_escalation"
|
|
DATA_EXPORT_LARGE = "data_export_large"
|
|
ADMIN_ACTION = "admin_action"
|
|
|
|
|
|
@dataclass
|
|
class SecurityEvent:
|
|
"""Security event data structure"""
|
|
event_type: SecurityEventType
|
|
user_id: str = None
|
|
ip_address: str = None
|
|
user_agent: str = None
|
|
request_path: str = None
|
|
timestamp: datetime = None
|
|
details: Dict[str, Any] = None
|
|
severity: str = "medium" # low, medium, high, critical
|
|
|
|
def __post_init__(self):
|
|
if self.timestamp is None:
|
|
self.timestamp = datetime.utcnow()
|
|
if self.details is None:
|
|
self.details = {}
|
|
|
|
|
|
class SecurityMonitor:
|
|
"""Security monitoring and alerting system"""
|
|
|
|
def __init__(self):
|
|
self.events: List[SecurityEvent] = []
|
|
self.logger = self._setup_security_logger()
|
|
self.alert_thresholds = {
|
|
SecurityEventType.LOGIN_FAILURE: {"count": 5, "window_minutes": 5},
|
|
SecurityEventType.RATE_LIMIT_EXCEEDED: {"count": 10, "window_minutes": 1},
|
|
SecurityEventType.INVALID_2FA: {"count": 3, "window_minutes": 5},
|
|
SecurityEventType.UNAUTHORIZED_ACCESS: {"count": 1, "window_minutes": 1},
|
|
}
|
|
self.blocked_ips: Dict[str, datetime] = {}
|
|
|
|
def _setup_security_logger(self):
|
|
"""Set up dedicated security event logger"""
|
|
logger = logging.getLogger("security")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Create security log handler
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - SECURITY - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
return logger
|
|
|
|
def log_event(self, event: SecurityEvent):
|
|
"""Log a security event"""
|
|
self.events.append(event)
|
|
|
|
# Log to security logger
|
|
log_data = {
|
|
"event_type": event.event_type.value,
|
|
"user_id": event.user_id,
|
|
"ip_address": self._hash_ip(event.ip_address) if event.ip_address else None,
|
|
"user_agent_hash": self._hash_user_agent(event.user_agent) if event.user_agent else None,
|
|
"request_path": event.request_path,
|
|
"timestamp": event.timestamp.isoformat(),
|
|
"severity": event.severity,
|
|
"details": event.details,
|
|
}
|
|
|
|
self.logger.info(json.dumps(log_data))
|
|
|
|
# Check for alert conditions
|
|
self._check_alert_conditions(event)
|
|
|
|
# Cleanup old events (keep last 1000)
|
|
if len(self.events) > 1000:
|
|
self.events = self.events[-1000:]
|
|
|
|
def _hash_ip(self, ip: str) -> str:
|
|
"""Hash IP for privacy while maintaining uniqueness"""
|
|
return hashlib.sha256(f"ip_{ip}".encode()).hexdigest()[:16]
|
|
|
|
def _hash_user_agent(self, user_agent: str) -> str:
|
|
"""Hash user agent for privacy"""
|
|
return hashlib.sha256(f"ua_{user_agent}".encode()).hexdigest()[:16]
|
|
|
|
def _check_alert_conditions(self, event: SecurityEvent):
|
|
"""Check if event triggers an alert"""
|
|
event_type = event.event_type
|
|
|
|
if event_type not in self.alert_thresholds:
|
|
return
|
|
|
|
threshold = self.alert_thresholds[event_type]
|
|
window_start = datetime.utcnow() - timedelta(minutes=threshold["window_minutes"])
|
|
|
|
# Count recent events of this type from same IP
|
|
recent_events = [
|
|
e for e in self.events
|
|
if (e.event_type == event_type and
|
|
e.ip_address == event.ip_address and
|
|
e.timestamp >= window_start)
|
|
]
|
|
|
|
if len(recent_events) >= threshold["count"]:
|
|
self._trigger_alert(event, recent_events)
|
|
|
|
def _trigger_alert(self, event: SecurityEvent, recent_events: List[SecurityEvent]):
|
|
"""Trigger security alert"""
|
|
alert_data = {
|
|
"alert_type": "security_threshold_exceeded",
|
|
"event_type": event.event_type.value,
|
|
"ip_address": self._hash_ip(event.ip_address) if event.ip_address else None,
|
|
"event_count": len(recent_events),
|
|
"time_window": self.alert_thresholds[event.event_type]["window_minutes"],
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"recommended_action": self._get_recommended_action(event.event_type),
|
|
}
|
|
|
|
self.logger.warning(f"SECURITY ALERT: {json.dumps(alert_data)}")
|
|
|
|
# Auto-block IP for certain event types
|
|
if event.event_type in [SecurityEventType.PASSWORD_BRUTE_FORCE,
|
|
SecurityEventType.RATE_LIMIT_EXCEEDED]:
|
|
self._block_ip(event.ip_address)
|
|
|
|
def _get_recommended_action(self, event_type: SecurityEventType) -> str:
|
|
"""Get recommended action for event type"""
|
|
actions = {
|
|
SecurityEventType.LOGIN_FAILURE: "Consider IP blocking or account lockout",
|
|
SecurityEventType.RATE_LIMIT_EXCEEDED: "IP temporarily blocked",
|
|
SecurityEventType.INVALID_2FA: "Monitor for account compromise",
|
|
SecurityEventType.UNAUTHORIZED_ACCESS: "Investigate immediately",
|
|
SecurityEventType.PASSWORD_BRUTE_FORCE: "IP blocked, notify user",
|
|
}
|
|
return actions.get(event_type, "Monitor and investigate")
|
|
|
|
def _block_ip(self, ip_address: str, duration_minutes: int = 30):
|
|
"""Block IP address temporarily"""
|
|
if ip_address:
|
|
block_until = datetime.utcnow() + timedelta(minutes=duration_minutes)
|
|
self.blocked_ips[ip_address] = block_until
|
|
|
|
self.logger.warning(f"IP {self._hash_ip(ip_address)} blocked until {block_until}")
|
|
|
|
def is_ip_blocked(self, ip_address: str) -> bool:
|
|
"""Check if IP is currently blocked"""
|
|
if not ip_address or ip_address not in self.blocked_ips:
|
|
return False
|
|
|
|
block_until = self.blocked_ips[ip_address]
|
|
if datetime.utcnow() > block_until:
|
|
# Block expired, remove it
|
|
del self.blocked_ips[ip_address]
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_security_metrics(self) -> Dict[str, Any]:
|
|
"""Get security metrics for dashboard"""
|
|
now = datetime.utcnow()
|
|
last_hour = now - timedelta(hours=1)
|
|
last_day = now - timedelta(days=1)
|
|
|
|
recent_events = [e for e in self.events if e.timestamp >= last_hour]
|
|
daily_events = [e for e in self.events if e.timestamp >= last_day]
|
|
|
|
metrics = {
|
|
"events_last_hour": len(recent_events),
|
|
"events_last_24h": len(daily_events),
|
|
"blocked_ips_count": len(self.blocked_ips),
|
|
"top_event_types_hour": self._get_top_event_types(recent_events),
|
|
"top_event_types_day": self._get_top_event_types(daily_events),
|
|
"critical_events_hour": len([e for e in recent_events if e.severity == "critical"]),
|
|
}
|
|
|
|
return metrics
|
|
|
|
def _get_top_event_types(self, events: List[SecurityEvent]) -> Dict[str, int]:
|
|
"""Get top event types by count"""
|
|
event_counts = {}
|
|
for event in events:
|
|
event_type = event.event_type.value
|
|
event_counts[event_type] = event_counts.get(event_type, 0) + 1
|
|
|
|
# Return top 5
|
|
return dict(sorted(event_counts.items(), key=lambda x: x[1], reverse=True)[:5])
|
|
|
|
|
|
# Global security monitor instance
|
|
security_monitor = SecurityMonitor()
|
|
|
|
|
|
# Helper functions for easy integration
|
|
def log_login_failure(user_id: str, ip_address: str, user_agent: str = None):
|
|
"""Log login failure event"""
|
|
event = SecurityEvent(
|
|
event_type=SecurityEventType.LOGIN_FAILURE,
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
severity="medium"
|
|
)
|
|
security_monitor.log_event(event)
|
|
|
|
|
|
def log_unauthorized_access(user_id: str, ip_address: str, request_path: str, user_agent: str = None):
|
|
"""Log unauthorized access attempt"""
|
|
event = SecurityEvent(
|
|
event_type=SecurityEventType.UNAUTHORIZED_ACCESS,
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
request_path=request_path,
|
|
severity="high"
|
|
)
|
|
security_monitor.log_event(event)
|
|
|
|
|
|
def log_rate_limit_exceeded(ip_address: str, request_path: str, user_agent: str = None):
|
|
"""Log rate limit exceeded event"""
|
|
event = SecurityEvent(
|
|
event_type=SecurityEventType.RATE_LIMIT_EXCEEDED,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
request_path=request_path,
|
|
severity="medium"
|
|
)
|
|
security_monitor.log_event(event)
|
|
|
|
|
|
def check_ip_blocked(ip_address: str) -> bool:
|
|
"""Check if IP is blocked"""
|
|
return security_monitor.is_ip_blocked(ip_address) |