security: add RBAC, HTTPS enforcement; add tests and CI pytest step

This commit is contained in:
TLimoges33 2025-08-28 17:29:16 +00:00
parent a2b8950d9a
commit f0c61de280
16 changed files with 271 additions and 198 deletions

View File

@ -6,6 +6,8 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Check Python syntax - name: Check Python syntax
run: python -m py_compile modern/backend/server.py run: python -m py_compile modern/backend/*.py
- name: Check frontend package.json - name: Run tests
run: cat modern/frontend/package.json run: |
python -m pip install -r modern/backend/requirements_full.txt
pytest -q

View File

@ -18,6 +18,18 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
# HTTPS enforcement middleware (for production behind a proxy, check X-Forwarded-Proto)
@app.middleware('http')
async def https_redirect(request, call_next):
if os.getenv('FORCE_HTTPS', 'false').lower() == 'true':
proto = request.headers.get('x-forwarded-proto', request.url.scheme)
if proto != 'https':
from starlette.responses import RedirectResponse
url = request.url.replace(scheme='https')
return RedirectResponse(str(url))
return await call_next(request)
@app.on_event('startup') @app.on_event('startup')
def startup_event(): def startup_event():
models.init_db() models.init_db()
@ -198,12 +210,15 @@ def list_integrations():
@app.delete('/api/v1/integrations/{integration_id}') @app.delete('/api/v1/integrations/{integration_id}')
def delete_integration(integration_id: int): def delete_integration(integration_id: int, request=None):
db = models.SessionLocal() db = models.SessionLocal()
try: try:
row = db.query(models.Integration).filter_by(id=integration_id).first() row = db.query(models.Integration).filter_by(id=integration_id).first()
if not row: if not row:
raise HTTPException(status_code=404, detail='integration not found') raise HTTPException(status_code=404, detail='integration not found')
# require owner or admin
from .rbac import require_owner_or_admin
require_owner_or_admin(row.user_id)(request)
db.delete(row) db.delete(row)
db.commit() db.commit()
return {'ok': True} return {'ok': True}
@ -223,6 +238,9 @@ def sync_integration_to_habits(integration_id: int, payload: dict = Body({})):
if not integration: if not integration:
raise HTTPException(status_code=404, detail='integration not found') raise HTTPException(status_code=404, detail='integration not found')
# require owner or admin
from .rbac import require_owner_or_admin
require_owner_or_admin(integration.user_id)(None)
# Fetch events via existing events endpoint logic # Fetch events via existing events endpoint logic
# Reuse token refresh + decrypt logic from oauth module # Reuse token refresh + decrypt logic from oauth module
token_row = db.query(models.OAuthToken).filter_by(integration_id=integration_id).order_by(models.OAuthToken.id.desc()).first() token_row = db.query(models.OAuthToken).filter_by(integration_id=integration_id).order_by(models.OAuthToken.id.desc()).first()

View File

@ -15,6 +15,7 @@ class User(Base):
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, nullable=False, index=True) email = Column(String, unique=True, nullable=False, index=True)
password_hash = Column(String) password_hash = Column(String)
role = Column(String, default='user')
display_name = Column(String) display_name = Column(String)
created_at = Column(DateTime, server_default=func.current_timestamp()) created_at = Column(DateTime, server_default=func.current_timestamp())
updated_at = Column(DateTime, server_default=func.current_timestamp(), onupdate=func.current_timestamp()) updated_at = Column(DateTime, server_default=func.current_timestamp(), onupdate=func.current_timestamp())

23
modern/backend/rbac.py Normal file
View File

@ -0,0 +1,23 @@
from fastapi import HTTPException
from .auth import get_current_user
from . import models
def require_role(min_role: str):
# Simple role hierarchy
hierarchy = {'user': 1, 'moderator': 2, 'admin': 3}
def _inner(request=None):
user = get_current_user(request)
if hierarchy.get(user.role, 0) < hierarchy.get(min_role, 0):
raise HTTPException(status_code=403, detail='insufficient role')
return user
return _inner
def require_owner_or_admin(resource_user_id: int):
def _inner(request=None):
user = get_current_user(request)
if user.id == resource_user_id or user.role == 'admin':
return user
raise HTTPException(status_code=403, detail='must be owner or admin')
return _inner

View File

@ -6,3 +6,6 @@ python-dotenv
requests requests
cryptography cryptography
boto3 boto3
pytest
httpx
requests

View File

@ -1,12 +1,15 @@
<!doctype html> <!doctype html>
<html> <html>
<head> <head>
<meta charset="utf-8" /> <meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>LifeRPG Modern</title> <title>LifeRPG Modern</title>
</head> </head>
<body> <body>
<div id="root"></div> <div id="root"></div>
<script type="module" src="/src/main.jsx"></script> <script type="module" src="/src/main.jsx"></script>
</body> </body>
</html> </html>

13
modern/tests/test_auth.py Normal file
View File

@ -0,0 +1,13 @@
import pytest
from fastapi.testclient import TestClient
from modern.backend.app import app
client = TestClient(app)
def test_signup_and_login():
resp = client.post('/api/v1/auth/signup', json={'email':'test@example.com','password':'secret'})
assert resp.status_code == 200
resp = client.post('/api/v1/auth/login', json={'email':'test@example.com','password':'secret'})
assert resp.status_code == 200
assert 'session' in resp.cookies
*** End Patch

View File

@ -0,0 +1,10 @@
import pytest
from fastapi.testclient import TestClient
from modern.backend.app import app
client = TestClient(app)
def test_list_integrations_empty():
resp = client.get('/api/v1/integrations')
assert resp.status_code == 200
*** End Patch