Improved user management, added no-registration mode
This commit is contained in:
@@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# Configuration
|
||||
AUTH_ENABLED = os.getenv("ENABLE_AUTH", "false").lower() in ("true", "1", "yes", "on")
|
||||
DISABLE_REGISTRATION = os.getenv("DISABLE_REGISTRATION", "false").lower() in ("true", "1", "yes", "on")
|
||||
JWT_SECRET = os.getenv("JWT_SECRET", "your-super-secret-jwt-key-change-in-production")
|
||||
JWT_ALGORITHM = "HS256"
|
||||
JWT_EXPIRATION_HOURS = int(os.getenv("JWT_EXPIRATION_HOURS", "24"))
|
||||
|
||||
@@ -4,7 +4,7 @@ from pydantic import BaseModel
|
||||
from typing import Optional, List
|
||||
import logging
|
||||
|
||||
from . import AUTH_ENABLED, user_manager, token_manager, User
|
||||
from . import AUTH_ENABLED, DISABLE_REGISTRATION, user_manager, token_manager, User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -24,6 +24,19 @@ class RegisterRequest(BaseModel):
|
||||
email: Optional[str] = None
|
||||
|
||||
|
||||
class CreateUserRequest(BaseModel):
|
||||
"""Admin-only request to create users when registration is disabled"""
|
||||
username: str
|
||||
password: str
|
||||
email: Optional[str] = None
|
||||
role: str = "user"
|
||||
|
||||
|
||||
class RoleUpdateRequest(BaseModel):
|
||||
"""Request to update user role"""
|
||||
role: str
|
||||
|
||||
|
||||
class UserResponse(BaseModel):
|
||||
username: str
|
||||
email: Optional[str]
|
||||
@@ -46,6 +59,7 @@ class AuthStatusResponse(BaseModel):
|
||||
auth_enabled: bool
|
||||
authenticated: bool = False
|
||||
user: Optional[UserResponse] = None
|
||||
registration_enabled: bool = True
|
||||
|
||||
|
||||
# Dependency to get current user
|
||||
@@ -101,7 +115,8 @@ async def auth_status(current_user: Optional[User] = Depends(get_current_user)):
|
||||
return AuthStatusResponse(
|
||||
auth_enabled=AUTH_ENABLED,
|
||||
authenticated=current_user is not None,
|
||||
user=UserResponse(**current_user.to_public_dict()) if current_user else None
|
||||
user=UserResponse(**current_user.to_public_dict()) if current_user else None,
|
||||
registration_enabled=AUTH_ENABLED and not DISABLE_REGISTRATION
|
||||
)
|
||||
|
||||
|
||||
@@ -138,6 +153,12 @@ async def register(request: RegisterRequest):
|
||||
detail="Authentication is disabled"
|
||||
)
|
||||
|
||||
if DISABLE_REGISTRATION:
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Public registration is disabled. Contact an administrator to create an account."
|
||||
)
|
||||
|
||||
# Check if this is the first user (should be admin)
|
||||
existing_users = user_manager.list_users()
|
||||
role = "admin" if len(existing_users) == 0 else "user"
|
||||
@@ -188,11 +209,11 @@ async def delete_user(username: str, current_user: User = Depends(require_admin)
|
||||
@router.put("/users/{username}/role", response_model=MessageResponse)
|
||||
async def update_user_role(
|
||||
username: str,
|
||||
role: str,
|
||||
request: RoleUpdateRequest,
|
||||
current_user: User = Depends(require_admin)
|
||||
):
|
||||
"""Update user role (admin only)"""
|
||||
if role not in ["user", "admin"]:
|
||||
if request.role not in ["user", "admin"]:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Role must be 'user' or 'admin'"
|
||||
@@ -204,13 +225,42 @@ async def update_user_role(
|
||||
detail="Cannot change your own role"
|
||||
)
|
||||
|
||||
success, message = user_manager.update_user_role(username, role)
|
||||
success, message = user_manager.update_user_role(username, request.role)
|
||||
if not success:
|
||||
raise HTTPException(status_code=404, detail=message)
|
||||
|
||||
return MessageResponse(message=message)
|
||||
|
||||
|
||||
@router.post("/users/create", response_model=MessageResponse)
|
||||
async def create_user_admin(request: CreateUserRequest, current_user: User = Depends(require_admin)):
|
||||
"""Create a new user (admin only) - for use when registration is disabled"""
|
||||
if not AUTH_ENABLED:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Authentication is disabled"
|
||||
)
|
||||
|
||||
# Validate role
|
||||
if request.role not in ["user", "admin"]:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Role must be 'user' or 'admin'"
|
||||
)
|
||||
|
||||
success, message = user_manager.create_user(
|
||||
username=request.username,
|
||||
password=request.password,
|
||||
email=request.email,
|
||||
role=request.role
|
||||
)
|
||||
|
||||
if not success:
|
||||
raise HTTPException(status_code=400, detail=message)
|
||||
|
||||
return MessageResponse(message=message)
|
||||
|
||||
|
||||
# Profile endpoints
|
||||
@router.get("/profile", response_model=UserResponse)
|
||||
async def get_profile(current_user: User = Depends(require_auth)):
|
||||
|
||||
Reference in New Issue
Block a user