Implemented SSO for google and github

This commit is contained in:
Xoconoch
2025-08-04 10:01:07 -06:00
parent 1da75a3fbc
commit 66ec587e5b
16 changed files with 1006 additions and 337 deletions

View File

@@ -22,12 +22,14 @@ USERS_FILE = USERS_DIR / "users.json"
class User:
def __init__(self, username: str, email: str = None, role: str = "user", created_at: str = None, last_login: str = None):
def __init__(self, username: str, email: str = None, role: str = "user", created_at: str = None, last_login: str = None, sso_provider: str = None, sso_id: str = None):
self.username = username
self.email = email
self.role = role
self.created_at = created_at or datetime.utcnow().isoformat()
self.last_login = last_login
self.sso_provider = sso_provider
self.sso_id = sso_id
def to_dict(self) -> Dict[str, Any]:
return {
@@ -35,7 +37,9 @@ class User:
"email": self.email,
"role": self.role,
"created_at": self.created_at,
"last_login": self.last_login
"last_login": self.last_login,
"sso_provider": self.sso_provider,
"sso_id": self.sso_id
}
def to_public_dict(self) -> Dict[str, Any]:
@@ -45,7 +49,9 @@ class User:
"email": self.email,
"role": self.role,
"created_at": self.created_at,
"last_login": self.last_login
"last_login": self.last_login,
"sso_provider": self.sso_provider,
"is_sso_user": self.sso_provider is not None
}
@@ -87,15 +93,16 @@ class UserManager:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_user(self, username: str, password: str, email: str = None, role: str = "user") -> tuple[bool, str]:
"""Create a new user"""
def create_user(self, username: str, password: str = None, email: str = None, role: str = "user", sso_provider: str = None, sso_id: str = None) -> tuple[bool, str]:
"""Create a new user (traditional or SSO)"""
users = self.load_users()
if username in users:
return False, "Username already exists"
hashed_password = self.hash_password(password)
user = User(username=username, email=email, role=role)
# For SSO users, password is None
hashed_password = self.hash_password(password) if password else None
user = User(username=username, email=email, role=role, sso_provider=sso_provider, sso_id=sso_id)
users[username] = {
**user.to_dict(),
@@ -103,7 +110,7 @@ class UserManager:
}
self.save_users(users)
logger.info(f"Created user: {username}")
logger.info(f"Created user: {username} (SSO: {sso_provider or 'No'})")
return True, "User created successfully"
def authenticate_user(self, username: str, password: str) -> Optional[User]:
@@ -220,3 +227,6 @@ def create_default_admin():
# Initialize default admin on import
create_default_admin()
# SSO functionality will be imported separately to avoid circular imports
SSO_AVAILABLE = True

View File

@@ -43,6 +43,8 @@ class UserResponse(BaseModel):
role: str
created_at: str
last_login: Optional[str]
sso_provider: Optional[str] = None
is_sso_user: bool = False
class LoginResponse(BaseModel):
@@ -60,6 +62,8 @@ class AuthStatusResponse(BaseModel):
authenticated: bool = False
user: Optional[UserResponse] = None
registration_enabled: bool = True
sso_enabled: bool = False
sso_providers: List[str] = []
# Dependency to get current user
@@ -112,11 +116,27 @@ async def require_admin(current_user: User = Depends(require_auth)) -> User:
@router.get("/status", response_model=AuthStatusResponse)
async def auth_status(current_user: Optional[User] = Depends(get_current_user)):
"""Get authentication status"""
# Check if SSO is enabled and get available providers
sso_enabled = False
sso_providers = []
try:
from . import sso
sso_enabled = sso.SSO_ENABLED and AUTH_ENABLED
if sso.google_sso:
sso_providers.append("google")
if sso.github_sso:
sso_providers.append("github")
except ImportError:
pass # SSO module not available
return AuthStatusResponse(
auth_enabled=AUTH_ENABLED,
authenticated=current_user is not None,
user=UserResponse(**current_user.to_public_dict()) if current_user else None,
registration_enabled=AUTH_ENABLED and not DISABLE_REGISTRATION
registration_enabled=AUTH_ENABLED and not DISABLE_REGISTRATION,
sso_enabled=sso_enabled,
sso_providers=sso_providers
)
@@ -301,4 +321,7 @@ async def change_password(
user_manager.save_users(users)
logger.info(f"Password changed for user: {current_user.username}")
return MessageResponse(message="Password changed successfully")
return MessageResponse(message="Password changed successfully")
# Note: SSO routes are included in the main app, not here to avoid circular imports

View File

@@ -34,6 +34,7 @@ class AuthMiddleware(BaseHTTPMiddleware):
"/api/auth/login",
"/api/auth/register",
"/api/auth/logout",
"/api/auth/sso", # All SSO endpoints
"/static",
"/favicon.ico"
]

285
routes/auth/sso.py Normal file
View File

@@ -0,0 +1,285 @@
"""
SSO (Single Sign-On) implementation for Google and GitHub authentication
"""
import os
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import RedirectResponse
from fastapi_sso.sso.google import GoogleSSO
from fastapi_sso.sso.github import GithubSSO
from fastapi_sso.sso.base import OpenID
from pydantic import BaseModel
from . import user_manager, token_manager, User, AUTH_ENABLED
logger = logging.getLogger(__name__)
router = APIRouter()
# SSO Configuration
SSO_ENABLED = os.getenv("SSO_ENABLED", "true").lower() in ("true", "1", "yes", "on")
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID")
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET")
GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID")
GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET")
SSO_BASE_REDIRECT_URI = os.getenv("SSO_BASE_REDIRECT_URI", "http://localhost:7171/api/auth/sso/callback")
# Initialize SSO providers
google_sso = None
github_sso = None
if GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET:
google_sso = GoogleSSO(
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
redirect_uri=f"{SSO_BASE_REDIRECT_URI}/google",
allow_insecure_http=True, # Set to False in production with HTTPS
)
if GITHUB_CLIENT_ID and GITHUB_CLIENT_SECRET:
github_sso = GithubSSO(
client_id=GITHUB_CLIENT_ID,
client_secret=GITHUB_CLIENT_SECRET,
redirect_uri=f"{SSO_BASE_REDIRECT_URI}/github",
allow_insecure_http=True, # Set to False in production with HTTPS
)
class MessageResponse(BaseModel):
message: str
class SSOProvider(BaseModel):
name: str
display_name: str
enabled: bool
login_url: Optional[str] = None
class SSOStatusResponse(BaseModel):
sso_enabled: bool
providers: list[SSOProvider]
def create_or_update_sso_user(openid: OpenID, provider: str) -> User:
"""Create or update user from SSO provider data"""
# Generate username from email or use provider ID
email = openid.email
if not email:
raise HTTPException(status_code=400, detail="Email is required for SSO authentication")
# Use email prefix as username, fallback to provider + id
username = email.split("@")[0]
if not username:
username = f"{provider}_{openid.id}"
# Check if user already exists by email
existing_user = None
users = user_manager.load_users()
for user_data in users.values():
if user_data.get("email") == email:
existing_user = User(**{k: v for k, v in user_data.items() if k != "password_hash"})
break
if existing_user:
# Update last login
users[existing_user.username]["last_login"] = datetime.utcnow().isoformat()
users[existing_user.username]["sso_provider"] = provider
users[existing_user.username]["sso_id"] = openid.id
user_manager.save_users(users)
return existing_user
else:
# Create new user
# Ensure username is unique
counter = 1
original_username = username
while username in users:
username = f"{original_username}{counter}"
counter += 1
user = User(
username=username,
email=email,
role="user" # Default role for SSO users
)
users[username] = {
**user.to_dict(),
"sso_provider": provider,
"sso_id": openid.id,
"password_hash": None # SSO users don't have passwords
}
user_manager.save_users(users)
logger.info(f"Created SSO user: {username} via {provider}")
return user
@router.get("/sso/status", response_model=SSOStatusResponse)
async def sso_status():
"""Get SSO status and available providers"""
providers = []
if google_sso:
providers.append(SSOProvider(
name="google",
display_name="Google",
enabled=True,
login_url="/api/auth/sso/login/google"
))
if github_sso:
providers.append(SSOProvider(
name="github",
display_name="GitHub",
enabled=True,
login_url="/api/auth/sso/login/github"
))
return SSOStatusResponse(
sso_enabled=SSO_ENABLED and AUTH_ENABLED,
providers=providers
)
@router.get("/sso/login/google")
async def google_login():
"""Initiate Google SSO login"""
if not SSO_ENABLED or not AUTH_ENABLED:
raise HTTPException(status_code=400, detail="SSO is disabled")
if not google_sso:
raise HTTPException(status_code=400, detail="Google SSO is not configured")
async with google_sso:
return await google_sso.get_login_redirect(params={"prompt": "consent", "access_type": "offline"})
@router.get("/sso/login/github")
async def github_login():
"""Initiate GitHub SSO login"""
if not SSO_ENABLED or not AUTH_ENABLED:
raise HTTPException(status_code=400, detail="SSO is disabled")
if not github_sso:
raise HTTPException(status_code=400, detail="GitHub SSO is not configured")
async with github_sso:
return await github_sso.get_login_redirect()
@router.get("/sso/callback/google")
async def google_callback(request: Request):
"""Handle Google SSO callback"""
if not SSO_ENABLED or not AUTH_ENABLED:
raise HTTPException(status_code=400, detail="SSO is disabled")
if not google_sso:
raise HTTPException(status_code=400, detail="Google SSO is not configured")
try:
async with google_sso:
openid = await google_sso.verify_and_process(request)
# Create or update user
user = create_or_update_sso_user(openid, "google")
# Create JWT token
access_token = token_manager.create_token(user)
# Redirect to frontend with token (you might want to customize this)
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000")
response = RedirectResponse(url=f"{frontend_url}?token={access_token}")
# Also set as HTTP-only cookie
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=False, # Set to True in production with HTTPS
samesite="lax",
max_age=timedelta(hours=24).total_seconds()
)
return response
except Exception as e:
logger.error(f"Google SSO callback error: {e}")
raise HTTPException(status_code=400, detail="Authentication failed")
@router.get("/sso/callback/github")
async def github_callback(request: Request):
"""Handle GitHub SSO callback"""
if not SSO_ENABLED or not AUTH_ENABLED:
raise HTTPException(status_code=400, detail="SSO is disabled")
if not github_sso:
raise HTTPException(status_code=400, detail="GitHub SSO is not configured")
try:
async with github_sso:
openid = await github_sso.verify_and_process(request)
# Create or update user
user = create_or_update_sso_user(openid, "github")
# Create JWT token
access_token = token_manager.create_token(user)
# Redirect to frontend with token (you might want to customize this)
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000")
response = RedirectResponse(url=f"{frontend_url}?token={access_token}")
# Also set as HTTP-only cookie
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=False, # Set to True in production with HTTPS
samesite="lax",
max_age=timedelta(hours=24).total_seconds()
)
return response
except Exception as e:
logger.error(f"GitHub SSO callback error: {e}")
raise HTTPException(status_code=400, detail="Authentication failed")
@router.post("/sso/unlink/{provider}", response_model=MessageResponse)
async def unlink_sso_provider(
provider: str,
request: Request,
):
"""Unlink SSO provider from user account"""
if not SSO_ENABLED or not AUTH_ENABLED:
raise HTTPException(status_code=400, detail="SSO is disabled")
if provider not in ["google", "github"]:
raise HTTPException(status_code=400, detail="Invalid SSO provider")
# Get current user from request (avoiding circular imports)
from .middleware import require_auth_from_state
current_user = await require_auth_from_state(request)
if not current_user.sso_provider:
raise HTTPException(status_code=400, detail="User is not linked to any SSO provider")
if current_user.sso_provider != provider:
raise HTTPException(status_code=400, detail=f"User is not linked to {provider}")
# Update user to remove SSO linkage
users = user_manager.load_users()
if current_user.username in users:
users[current_user.username]["sso_provider"] = None
users[current_user.username]["sso_id"] = None
user_manager.save_users(users)
logger.info(f"Unlinked SSO provider {provider} from user {current_user.username}")
return MessageResponse(message=f"SSO provider {provider} unlinked successfully")