296 lines
10 KiB
Python
296 lines
10 KiB
Python
import os
|
|
import json
|
|
import bcrypt
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
AUTH_ENABLED = os.getenv("ENABLE_AUTH", "false").lower() in ("true", "1", "yes", "on")
|
|
DISABLE_REGISTRATION = os.getenv("DISABLE_REGISTRATION", "false").lower() in ("true", "1", "yes", "on")
|
|
JWT_SECRET = os.getenv("JWT_SECRET", "your-super-secret-jwt-key-change-in-production")
|
|
JWT_ALGORITHM = "HS256"
|
|
JWT_EXPIRATION_HOURS = int(os.getenv("JWT_EXPIRATION_HOURS", "24"))
|
|
|
|
# Paths
|
|
USERS_DIR = Path("./data/users")
|
|
USERS_FILE = USERS_DIR / "users.json"
|
|
|
|
|
|
class User:
|
|
def __init__(self, username: str, email: str = None, role: str = "user", created_at: str = None, last_login: str = None, sso_provider: str = None, sso_id: str = None):
|
|
self.username = username
|
|
self.email = email
|
|
self.role = role
|
|
self.created_at = created_at or datetime.utcnow().isoformat()
|
|
self.last_login = last_login
|
|
self.sso_provider = sso_provider
|
|
self.sso_id = sso_id
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"username": self.username,
|
|
"email": self.email,
|
|
"role": self.role,
|
|
"created_at": self.created_at,
|
|
"last_login": self.last_login,
|
|
"sso_provider": self.sso_provider,
|
|
"sso_id": self.sso_id
|
|
}
|
|
|
|
def to_public_dict(self) -> Dict[str, Any]:
|
|
"""Return user data without sensitive information"""
|
|
return {
|
|
"username": self.username,
|
|
"email": self.email,
|
|
"role": self.role,
|
|
"created_at": self.created_at,
|
|
"last_login": self.last_login,
|
|
"sso_provider": self.sso_provider,
|
|
"is_sso_user": self.sso_provider is not None
|
|
}
|
|
|
|
|
|
class UserManager:
|
|
def __init__(self):
|
|
self.ensure_users_file()
|
|
|
|
def ensure_users_file(self):
|
|
"""Ensure users directory and file exist"""
|
|
USERS_DIR.mkdir(parents=True, exist_ok=True)
|
|
if not USERS_FILE.exists():
|
|
with open(USERS_FILE, 'w') as f:
|
|
json.dump({}, f, indent=2)
|
|
logger.info(f"Created users file at {USERS_FILE}")
|
|
|
|
def load_users(self) -> Dict[str, Dict]:
|
|
"""Load users from file"""
|
|
try:
|
|
with open(USERS_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Error loading users: {e}")
|
|
return {}
|
|
|
|
def save_users(self, users: Dict[str, Dict]):
|
|
"""Save users to file"""
|
|
try:
|
|
with open(USERS_FILE, 'w') as f:
|
|
json.dump(users, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Error saving users: {e}")
|
|
raise
|
|
|
|
def hash_password(self, password: str) -> str:
|
|
"""Hash password using bcrypt"""
|
|
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
|
def verify_password(self, password: str, hashed: str) -> bool:
|
|
"""Verify password against hash"""
|
|
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
|
|
|
|
def create_user(self, username: str, password: str = None, email: str = None, role: str = "user", sso_provider: str = None, sso_id: str = None) -> tuple[bool, str]:
|
|
"""Create a new user (traditional or SSO)"""
|
|
users = self.load_users()
|
|
|
|
if username in users:
|
|
return False, "Username already exists"
|
|
|
|
# For SSO users, password is None
|
|
hashed_password = self.hash_password(password) if password else None
|
|
user = User(username=username, email=email, role=role, sso_provider=sso_provider, sso_id=sso_id)
|
|
|
|
users[username] = {
|
|
**user.to_dict(),
|
|
"password_hash": hashed_password
|
|
}
|
|
|
|
self.save_users(users)
|
|
logger.info(f"Created user: {username} (SSO: {sso_provider or 'No'})")
|
|
return True, "User created successfully"
|
|
|
|
def authenticate_user(self, username: str, password: str) -> Optional[User]:
|
|
"""Authenticate user and return User object if successful"""
|
|
users = self.load_users()
|
|
|
|
if username not in users:
|
|
return None
|
|
|
|
user_data = users[username]
|
|
if not self.verify_password(password, user_data["password_hash"]):
|
|
return None
|
|
|
|
# Update last login
|
|
user_data["last_login"] = datetime.utcnow().isoformat()
|
|
users[username] = user_data
|
|
self.save_users(users)
|
|
|
|
return User(**{k: v for k, v in user_data.items() if k != "password_hash"})
|
|
|
|
def get_user(self, username: str) -> Optional[User]:
|
|
"""Get user by username"""
|
|
users = self.load_users()
|
|
|
|
if username not in users:
|
|
return None
|
|
|
|
user_data = users[username]
|
|
return User(**{k: v for k, v in user_data.items() if k != "password_hash"})
|
|
|
|
def list_users(self) -> list[User]:
|
|
"""List all users"""
|
|
users = self.load_users()
|
|
return [User(**{k: v for k, v in user_data.items() if k != "password_hash"})
|
|
for user_data in users.values()]
|
|
|
|
def delete_user(self, username: str) -> tuple[bool, str]:
|
|
"""Delete a user"""
|
|
users = self.load_users()
|
|
|
|
if username not in users:
|
|
return False, "User not found"
|
|
|
|
del users[username]
|
|
self.save_users(users)
|
|
logger.info(f"Deleted user: {username}")
|
|
return True, "User deleted successfully"
|
|
|
|
def update_user_role(self, username: str, role: str) -> tuple[bool, str]:
|
|
"""Update user role"""
|
|
users = self.load_users()
|
|
|
|
if username not in users:
|
|
return False, "User not found"
|
|
|
|
users[username]["role"] = role
|
|
self.save_users(users)
|
|
logger.info(f"Updated role for user {username} to {role}")
|
|
return True, "User role updated successfully"
|
|
|
|
def change_password(self, username: str, current_password: str, new_password: str) -> tuple[bool, str]:
|
|
"""Change user password after validating current password"""
|
|
users = self.load_users()
|
|
|
|
if username not in users:
|
|
return False, "User not found"
|
|
|
|
user_data = users[username]
|
|
|
|
# Check if user is SSO user
|
|
if user_data.get("sso_provider"):
|
|
return False, f"Cannot change password for SSO user. Please change your password through {user_data['sso_provider']}."
|
|
|
|
# Check if user has a password hash
|
|
if not user_data.get("password_hash"):
|
|
return False, "Cannot change password for SSO user"
|
|
|
|
# Verify current password
|
|
if not self.verify_password(current_password, user_data["password_hash"]):
|
|
return False, "Current password is incorrect"
|
|
|
|
# Validate new password
|
|
if len(new_password) < 6:
|
|
return False, "New password must be at least 6 characters long"
|
|
|
|
if current_password == new_password:
|
|
return False, "New password must be different from current password"
|
|
|
|
# Update password
|
|
users[username]["password_hash"] = self.hash_password(new_password)
|
|
self.save_users(users)
|
|
|
|
logger.info(f"Password changed for user: {username}")
|
|
return True, "Password changed successfully"
|
|
|
|
def admin_reset_password(self, username: str, new_password: str) -> tuple[bool, str]:
|
|
"""Admin reset user password (no current password verification required)"""
|
|
users = self.load_users()
|
|
|
|
if username not in users:
|
|
return False, "User not found"
|
|
|
|
user_data = users[username]
|
|
|
|
# Check if user is SSO user
|
|
if user_data.get("sso_provider"):
|
|
return False, f"Cannot reset password for SSO user. User manages password through {user_data['sso_provider']}."
|
|
|
|
# Check if user has a password hash (should exist for non-SSO users)
|
|
if not user_data.get("password_hash"):
|
|
return False, "Cannot reset password for SSO user"
|
|
|
|
# Validate new password
|
|
if len(new_password) < 6:
|
|
return False, "New password must be at least 6 characters long"
|
|
|
|
# Update password
|
|
users[username]["password_hash"] = self.hash_password(new_password)
|
|
self.save_users(users)
|
|
|
|
logger.info(f"Password reset by admin for user: {username}")
|
|
return True, "Password reset successfully"
|
|
|
|
|
|
class TokenManager:
|
|
@staticmethod
|
|
def create_token(user: User) -> str:
|
|
"""Create JWT token for user"""
|
|
payload = {
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"exp": datetime.utcnow() + timedelta(hours=JWT_EXPIRATION_HOURS),
|
|
"iat": datetime.utcnow()
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
@staticmethod
|
|
def verify_token(token: str) -> Optional[Dict[str, Any]]:
|
|
"""Verify JWT token and return payload"""
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
return None
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
|
|
# Global instances
|
|
user_manager = UserManager()
|
|
token_manager = TokenManager()
|
|
|
|
|
|
def create_default_admin():
|
|
"""Create default admin user if no users exist"""
|
|
if not AUTH_ENABLED:
|
|
return
|
|
|
|
users = user_manager.load_users()
|
|
if not users:
|
|
default_username = os.getenv("DEFAULT_ADMIN_USERNAME", "admin")
|
|
default_password = os.getenv("DEFAULT_ADMIN_PASSWORD", "admin123")
|
|
|
|
success, message = user_manager.create_user(
|
|
username=default_username,
|
|
password=default_password,
|
|
role="admin"
|
|
)
|
|
|
|
if success:
|
|
logger.info(f"Created default admin user: {default_username}")
|
|
logger.warning(f"Default admin password is: {default_password}")
|
|
logger.warning("Please change the default admin password immediately!")
|
|
else:
|
|
logger.error(f"Failed to create default admin: {message}")
|
|
|
|
|
|
# Initialize default admin on import
|
|
create_default_admin()
|
|
|
|
# SSO functionality will be imported separately to avoid circular imports
|
|
SSO_AVAILABLE = True
|