feat: improve Login5 authentication with token renewal

Signed-off-by: Lev Rusanov <30170278+JDM170@users.noreply.github.com>
This commit is contained in:
2025-09-20 23:38:20 +07:00
parent 56dade95e7
commit 72dffca271
6 changed files with 214 additions and 140 deletions

View File

@@ -825,8 +825,6 @@ class Session(Closeable, MessageListener, SubListener):
__stored_str: str = ""
__token_provider: typing.Union[TokenProvider, None]
__user_attributes = {}
__login5_access_token: typing.Union[str, None] = None
__login5_token_expiry: typing.Union[int, None] = None
def __init__(self, inner: Inner, address: str) -> None:
self.__client = Session.create_client(inner.conf)
@@ -867,12 +865,10 @@ class Session(Closeable, MessageListener, SubListener):
"""
self.__authenticate_partial(credential, False)
# Try Login5 authentication for access token
self.__authenticate_login5(credential)
with self.__auth_lock:
self.__mercury_client = MercuryClient(self)
self.__token_provider = TokenProvider(self)
self.__audio_key_manager = AudioKeyManager(self)
self.__channel_manager = ChannelManager(self)
self.__api = ApiClient(self)
@@ -882,6 +878,12 @@ class Session(Closeable, MessageListener, SubListener):
self.__dealer_client = DealerClient(self)
self.__search = SearchManager(self)
self.__event_service = EventService(self)
# Try Login5 authentication for access token (after session is fully initialized)
try:
self.__token_provider.authenticate_login5(credential)
except Exception as e:
self.logger.warning(f"Login5 authentication failed: {e}")
self.__auth_lock_bool = False
self.__auth_lock.notify_all()
self.dealer().connect()
@@ -1287,65 +1289,6 @@ class Session(Closeable, MessageListener, SubListener):
def __send_unchecked(self, cmd: bytes, payload: bytes) -> None:
self.cipher_pair.send_encoded(self.connection, cmd, payload)
def __authenticate_login5(self, credential: Authentication.LoginCredentials) -> None:
"""Authenticate using Login5 to get access token"""
try:
# Build Login5 request
login5_request = Login5.LoginRequest()
# Set client info
login5_request.client_info.client_id = Session.CLIENT_ID
login5_request.client_info.device_id = self.__inner.device_id
# Set stored credential from APWelcome
if hasattr(self, '_Session__ap_welcome') and self.__ap_welcome:
stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = self.__ap_welcome.canonical_username
stored_cred.data = self.__ap_welcome.reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)
# Send Login5 request
login5_url = "https://login5.spotify.com/v3/login"
headers = {
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
}
response = requests.post(
login5_url,
data=login5_request.SerializeToString(),
headers=headers
)
if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)
if login5_response.HasField('ok'):
self.__login5_access_token = login5_response.ok.access_token
self.__login5_token_expiry = int(time.time()) + login5_response.ok.access_token_expires_in
self.logger.info("Login5 authentication successful, got access token")
else:
error_msg = "Login5 authentication failed"
if login5_response.HasField('error'):
error_msg += f": {login5_response.error}"
self.logger.error(error_msg)
# Don't raise exception here, as the session can still work with some limitations
else:
self.logger.error("Login5 request failed with status: {}".format(response.status_code))
# Don't raise exception here, as the session can still work with some limitations
except Exception as e:
self.logger.error("Failed to authenticate with Login5: {}".format(e))
# Don't raise exception here, as the session can still work with some limitations
def get_login5_token(self) -> typing.Union[str, None]:
"""Get the Login5 access token if available and not expired"""
if self.__login5_access_token and self.__login5_token_expiry:
if int(time.time()) < self.__login5_token_expiry - 60: # 60 second buffer
return self.__login5_access_token
else:
self.logger.debug("Login5 token expired, need to re-authenticate")
return None
def __wait_auth_lock(self) -> None:
if self.__closing and self.connection is None:
@@ -2210,6 +2153,123 @@ class TokenProvider:
def __init__(self, session: Session):
self._session = session
def authenticate_login5(self, credential: Authentication.LoginCredentials) -> bool:
"""Authenticate using Login5 to get initial access token"""
self.logger.debug("Starting initial Login5 authentication process...")
# Use the login5 method to get initial token with common scopes
initial_scopes = ["playlist-read", "user-read-private", "user-read-playback-state"]
token = self.login5(initial_scopes)
if token is not None:
self.__tokens.append(token)
self.logger.info("Initial Login5 authentication successful, token cached")
return True
else:
self.logger.error("Initial Login5 authentication failed")
return False
def login5(self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
"""Get a Login5 token for the specified scopes"""
self.logger.debug("Requesting Login5 token for scopes: {}".format(scopes))
try:
# Build Login5 request
login5_request = Login5.LoginRequest()
# Set client info
login5_request.client_info.client_id = Session.CLIENT_ID
login5_request.client_info.device_id = self._session.device_id()
# Set stored credential from APWelcome
has_attr = hasattr(self._session, '_Session__ap_welcome')
if has_attr:
try:
# Use a different approach since signal doesn't work in threads
import threading
result = [None] # Use list for mutable reference
exception = [None]
def get_ap_welcome():
try:
result[0] = self._session.ap_welcome()
except Exception as e:
exception[0] = e
thread = threading.Thread(target=get_ap_welcome)
thread.daemon = True
thread.start()
thread.join(timeout=3) # 3-second timeout
if thread.is_alive():
self.logger.warning("ap_welcome() call timed out")
return None
if exception[0]:
raise exception[0]
ap_welcome = result[0]
if ap_welcome:
stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = ap_welcome.canonical_username
stored_cred.data = ap_welcome.reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)
else:
self.logger.warning("Login5 token request skipped: No APWelcome credentials available")
return None
except Exception as e:
self.logger.warning("Login5 token request skipped: No APWelcome credentials available")
return None
else:
self.logger.warning("Login5 token request skipped: No APWelcome credentials available")
return None
# Send Login5 request
login5_url = "https://login5.spotify.com/v3/login"
headers = {
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
}
self.logger.debug("Sending Login5 token request to: {}".format(login5_url))
response = requests.post(
login5_url,
data=login5_request.SerializeToString(),
headers=headers,
timeout=10 # Add timeout to prevent hanging
)
if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)
if login5_response.HasField('ok'):
# Create StoredToken object
token_data = {
"expiresIn": login5_response.ok.access_token_expires_in,
"accessToken": login5_response.ok.access_token,
"scope": scopes
}
stored_token = TokenProvider.StoredToken(token_data)
self.logger.info("Login5 token obtained successfully with {} seconds validity".format(
login5_response.ok.access_token_expires_in))
return stored_token
else:
error_msg = "Login5 token request failed"
if login5_response.HasField('error'):
error_msg += f": {login5_response.error}"
self.logger.error(error_msg)
return None
else:
self.logger.error("Login5 token request failed with status: {}".format(response.status_code))
return None
except Exception as e:
self.logger.error("Failed to obtain Login5 token: {} - {}".format(type(e).__name__, str(e)))
return None
def find_token_with_all_scopes(
self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
"""
@@ -2240,45 +2300,28 @@ class TokenProvider:
if len(scopes) == 0:
raise RuntimeError("The token doesn't have any scope")
# Use Login5 token
login5_token = self._session.get_login5_token()
if login5_token:
# Create a StoredToken-compatible object using Login5 token
login5_stored_token = TokenProvider.Login5StoredToken(login5_token, scopes)
self.logger.debug("Using Login5 access token for scopes: {}".format(scopes))
return login5_stored_token
# Check existing tokens
# Check existing tokens first
token = self.find_token_with_all_scopes(scopes)
if token is not None:
if token.expired():
self.logger.debug("Token expired, removing from cache")
self.__tokens.remove(token)
else:
self.logger.debug("Using cached token for scopes: {}".format(scopes))
return token
# Try to get a new Login5 token
self.logger.debug("No valid cached token found, attempting Login5 renewal")
login5_token = self.login5(scopes)
if login5_token is not None:
self.__tokens.append(login5_token)
self.logger.debug("Login5 token obtained and cached for scopes: {}".format(scopes))
return login5_token
# No valid token available
raise RuntimeError("Unable to obtain access token. Login5 authentication may have failed.")
self.logger.warning("Unable to obtain access token through Login5")
raise RuntimeError("Unable to obtain access token. Login5 authentication failed.")
class Login5StoredToken:
"""StoredToken-compatible wrapper for Login5 access tokens"""
access_token: str
scopes: typing.List[str]
def __init__(self, access_token: str, scopes: typing.List[str]):
self.access_token = access_token
self.scopes = scopes
def expired(self) -> bool:
"""Login5 tokens are managed by Session, so delegate expiry check"""
return False # Session handles expiry
def has_scope(self, scope: str) -> bool:
"""Login5 tokens are general-purpose, assume they have all scopes"""
return True
def has_scopes(self, sc: typing.List[str]) -> bool:
"""Login5 tokens are general-purpose, assume they have all scopes"""
return True
class StoredToken:
""" """

View File

@@ -19,7 +19,7 @@ from librespot.proto.spotify.login5.v3.credentials import \
Credentials_pb2 as \
spotify_dot_login5_dot_v3_dot_credentials_dot_credentials__pb2
from librespot.proto.spotify.login5.v3.identifiers import \
Identifiers as \
Identifiers_pb2 as \
spotify_dot_login5_dot_v3_dot_identifiers_dot_identifiers__pb2
# @@protoc_insertion_point(imports)

View File

@@ -63,15 +63,16 @@ class ZeroconfServer(Closeable):
def __init__(self, inner: Inner, listen_port):
self.__inner = inner
self.__keys = DiffieHellman()
if listen_port == -1:
listen_port = random.randint(self.__min_port + 1, self.__max_port)
self.__runner = ZeroconfServer.HttpRunner(self, listen_port)
threading.Thread(target=self.__runner.run,
name="zeroconf-http-server",
daemon=True).start()
self.__zeroconf = zeroconf.Zeroconf()
advertised_ip_str = self._get_local_ip()
server_hostname = socket.gethostname()
@@ -105,7 +106,12 @@ class ZeroconfServer(Closeable):
addresses=service_addresses # Pass resolved IP, or None if 0.0.0.0 or conversion failed
)
try:
self.__zeroconf.register_service(self.__service_info)
self.logger.info("Zeroconf service registered successfully!")
except Exception as e:
self.logger.error("Failed to register zeroconf service: {}".format(e))
raise
def _get_local_ip(self) -> str:
"""Tries to determine a non-loopback local IP address for network advertisement."""
@@ -184,6 +190,7 @@ class ZeroconfServer(Closeable):
def handle_add_user(self, __socket: socket.socket, params: dict[str, str],
http_version: str) -> None:
self.logger.info("Spotify Connect transfer detected")
username = params.get("userName")
if not username:
self.logger.error("Missing userName!")
@@ -236,11 +243,13 @@ class ZeroconfServer(Closeable):
initial_value=int.from_bytes(
iv, "big")))
decrypted = aes.decrypt(encrypted)
try:
self.close_session()
with self.__connection_lock:
self.__connecting_username = username
self.logger.info("Accepted new user from {}. [deviceId: {}]".format(
params.get("deviceName"), self.__inner.device_id))
response = json.dumps(self.__default_successful_add_user)
__socket.send(http_version.encode())
__socket.send(b" 200 OK")
@@ -250,6 +259,7 @@ class ZeroconfServer(Closeable):
__socket.send(self.__eol)
__socket.send(self.__eol)
__socket.send(response.encode())
self.__session = Session.Builder(self.__inner.conf) \
.set_device_id(self.__inner.device_id) \
.set_device_name(self.__inner.device_name) \
@@ -257,11 +267,19 @@ class ZeroconfServer(Closeable):
.set_preferred_locale(self.__inner.preferred_locale) \
.blob(username, decrypted) \
.create()
with self.__connection_lock:
self.__connecting_username = None
self.logger.info("Session created for user: {}".format(self.__session.username()))
for session_listener in self.__session_listeners:
session_listener.session_changed(self.__session)
except Exception as e:
self.logger.error("Error in handle_add_user: {}".format(e))
with self.__connection_lock:
self.__connecting_username = None
def handle_get_info(self, __socket: socket.socket,
http_version: str) -> None:
info = copy.deepcopy(self.__default_get_info_fields)
@@ -289,6 +307,10 @@ class ZeroconfServer(Closeable):
self.__session = None
return valid
def get_session(self) -> typing.Union[Session, None]:
"""Get the current session if valid, None otherwise"""
return self.__session if self.has_valid_session() else None
def parse_path(self, path: str) -> dict[str, str]:
url = "https://host" + path
parsed = {}
@@ -309,10 +331,10 @@ class ZeroconfServer(Closeable):
return self
def create(self) -> ZeroconfServer:
return ZeroconfServer(
ZeroconfServer.Inner(self.device_type, self.device_name,
inner = ZeroconfServer.Inner(self.device_type, self.device_name,
self.device_id, self.preferred_locale,
self.conf), self.listen_port)
self.conf)
return ZeroconfServer(inner, self.listen_port)
class HttpRunner(Closeable, Runnable):
__should_stop = False
@@ -392,6 +414,7 @@ class ZeroconfServer(Closeable):
def handle_request(self, __socket: socket.socket, http_version: str,
action: str, params: dict[str, str]) -> None:
self.__zeroconf_server.logger.debug("HTTP request received - action: {}, params: {}".format(action, params.keys() if params else "None"))
if action == "addUser":
if params is None:
raise RuntimeError

View File

@@ -26,12 +26,15 @@ def test_with_stored_credentials():
token = token_provider.get("playlist-read")
print(f"✓ Successfully got playlist-read token: {token[:20]}...")
# Check if Login5 token is available
login5_token = session.get_login5_token()
if login5_token:
print(f"✓ Login5 token available: {login5_token[:20]}...")
# Test Login5 token by requesting a different scope
try:
login5_token = token_provider.get_token("user-read-email", "user-read-private")
if login5_token and login5_token.access_token:
print(f"✓ Login5 token available: {login5_token.access_token[:20]}...")
else:
print("⚠ Login5 token not available")
except Exception as login5_error:
print(f"⚠ Login5 token test failed: {login5_error}")
session.close()
return True

View File

@@ -14,11 +14,15 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level
def test_zeroconf_login5():
"""Test Login5 using Zeroconf authentication"""
print("=== Testing Login5 with Zeroconf ===")
print("IMPORTANT: This test requires a Spotify Connect transfer!")
print("1. Open Spotify on your phone/computer")
print("2. Look for 'librespot-spotizerr' in Spotify Connect devices")
print("3. Transfer playback to it")
print("4. Wait for credentials to be stored...")
print("2. Start playing some music")
print("3. Look for 'librespot-spotizerr' in Spotify Connect devices")
print("4. Click on 'librespot-spotizerr' to transfer playback to it")
print("5. Wait for credentials to be stored...")
print("\nWaiting for Spotify Connect transfer...")
print("NOTE: You'll see 'Created new session!' logs - this is normal startup.")
print("The test will only succeed when you transfer playback from Spotify Connect.")
zs = ZeroconfServer.Builder().create()
@@ -37,8 +41,9 @@ def test_zeroconf_login5():
print("- Transfer playback to it")
return False
if zs._ZeroconfServer__session:
session = zs._ZeroconfServer__session
# Check for session
session = zs.get_session()
if session:
print(f"\n✓ Got session for user: {session.username()}")
# Test token retrieval
@@ -47,10 +52,10 @@ def test_zeroconf_login5():
token = token_provider.get("playlist-read")
print(f"✓ Got playlist-read token: {token[:20]}...")
# Test Login5 token
login5_token = session.get_login5_token()
# Test Login5 token by requesting a different scope using get_token
login5_token = token_provider.get_token("user-read-email", "user-read-private")
if login5_token:
print(f"✓ Login5 token available: {login5_token[:20]}...")
print(f"✓ Login5 token available: {login5_token.access_token[:20]}...")
else:
print("⚠ Login5 token not available")