Source code for kxspy.ws

import asyncio
import logging
import aiohttp
import kxspy
from .emitter import Emitter
from time import perf_counter
from .utils import get_random_username
from .events import *


_LOG = logging.getLogger("kxspy.ws")

MESSAGE_QUEUE_MAX_SIZE = 25

OP_EVENT_NAMES = {
    1: "HeartBeatEvent",
    2: "IdentifyEvent",
    3: "GameStart",
    4: "GameEnd",
    5: "KillEvent",
    6: "VersionUpdate",
    7: "ChatMessage",
    10: "HelloEvent",
    12: "ExchangejoinEvent",
    13: "ExchangeOnlineEvent",
    14: "ExchangeOfflineEvent",
    15: "ExchangeGameAliveEvent",
    16: "ExchangeGameEnd",
    87: "BroadCasteEvent",
    98: "VoiceChatUpdate",
    99: "VoiceData",
}

[docs] class WS: """Handles the WebSocket connection to the Kxs network.""" def __init__( self, ws_url: str = "wss://network.kxs.rip/", username: str | None = None, enable_voice_chat: bool = False, exchange_key: str | None = None, connect: bool = True, isMobile: bool = False, isSecure: bool = True, session: aiohttp.ClientSession | None = None ): self.ws_url = ws_url self.username = username or get_random_username() self.enable_voice_chat = enable_voice_chat self.exchange_key = exchange_key self.isMobile = isMobile self.isSecure = isSecure self._loop = asyncio.get_event_loop() self._session = session or aiohttp.ClientSession() self._ws: aiohttp.ClientWebSocketResponse | None = None self._message_queue: list[dict] = [] self._listen_task: asyncio.Task | None = None self._heartbeat_task: asyncio.Task | None = None self._destroyed = False self.is_connect: bool = False self.is_authenticated: bool = False self._uuid = None self.emitter = Emitter() self.version = f"kxspy/{kxspy.__version__}" if connect: self.connect()
[docs] def connect(self) -> asyncio.Task: if self._destroyed: raise IOError("Cannot connect: transport destroyed.") if self._listen_task and not self._listen_task.done(): self._listen_task.cancel() return self._loop.create_task(self._connect())
async def _connect(self): await self.close() attempt = 0 while not self._destroyed and not self.is_connect: attempt += 1 try: _LOG.info(f"Connecting to WebSocket: {self.ws_url}") self._ws = await self._session.ws_connect(self.ws_url, heartbeat=60) self.is_connect = True _LOG.info("WebSocket connection established.") self._listen_task = self._loop.create_task(self._listen()) break except (aiohttp.ClientError, asyncio.TimeoutError) as exc: delay = min(10 * attempt, 60) _LOG.error( f"Connection failed ({type(exc).__name__}: {exc}), retrying in {delay}s..." ) await asyncio.sleep(delay) except (aiohttp.ClientConnectorError, aiohttp.WSServerHandshakeError, aiohttp.ServerDisconnectedError) as error: if isinstance(error, (aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError)): delay = min(10 * attempt, 60) _LOG.error( f"Connection failed ({type(error).__name__}: {error}), retrying in {delay}s..." ) await asyncio.sleep(delay) else: _LOG.error(f"Exception WS: {error}")
[docs] async def measure_latency(self, timeout: float = 5.0) -> float: if not self.is_connected: raise ConnectionError("WebSocket is not connected.") fut = asyncio.get_event_loop().create_future() start = perf_counter() async def on_version_update(_event): try: self.emitter.remove_listener("VersionUpdate", on_version_update) except ValueError: pass if not fut.done(): fut.set_result((perf_counter() - start) * 1000) self.emitter.add_listener("VersionUpdate", on_version_update) await self.send({"op": 6, "d": {}}) try: latency = await asyncio.wait_for(fut, timeout) return latency except asyncio.TimeoutError: _LOG.error("Timeout : VersionUpdate did not arrive in time ( mesure_latency ).") return None
[docs] async def close(self, code=aiohttp.WSCloseCode.OK): if self._listen_task: self._listen_task.cancel() if self._heartbeat_task: self._heartbeat_task.cancel() if self._ws: try: await self._ws.close(code=code) except Exception: pass finally: self._ws = None self.is_connect = False
[docs] async def destroy(self): self._closing = True tasks = [] for task in [self._listen_task, self._heartbeat_task]: if task and not task.done(): task.cancel() tasks.append(task) if self._ws and not self._ws.closed: await self._ws.close() if self._session and not self._session.closed: await self._session.close() if tasks: await asyncio.gather(*tasks, return_exceptions=True) self._ws = None self.is_connect = False self._destroyed = True _LOG.info("Kxspy WS destroyed cleanly.")
async def _listen(self): assert self._ws is not None try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: _LOG.debug(f"Received message: {msg.data}") self._loop.create_task(self._handle_message_safe(msg)) elif msg.type in ( aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, aiohttp.WSMsgType.CLOSED, ): _LOG.warning("WebSocket closed by server.") break elif msg.type == aiohttp.WSMsgType.ERROR: _LOG.error(f"WebSocket error: {msg.data}") break except asyncio.CancelledError: pass except Exception as e: _LOG.exception(f"Unexpected error while listening websocket message: {e}") self.is_connect = False self._ws = None if not self._destroyed: _LOG.info("Reconnecting after disconnection...") self._loop.create_task(self._connect()) async def _handle_message_safe(self, msg: aiohttp.WSMessage): try: await self._handle_message(msg.json()) except Exception: _LOG.exception("Error while handling websocket message") async def _handle_message(self, payload: dict): op = payload.get("op") d = payload.get("d", {}) if d.get("error", None) is not None: event_name = OP_EVENT_NAMES.get(op, f"UnknownEvent(op={op})") self.emitter.emit( "ErrorEvent", ErrorEvent(event=event_name, error=d.get("error", "Unknown error"),op=op) ) return if op == 1: # Heartbeat self.emitter.emit("HeartBeatEvent", HeartBeatEvent.from_kwargs(**d)) elif op == 2: # Identify self._uuid = d.get("uuid") self.emitter.emit("IdentifyEvent", IdentifyEvent.from_kwargs(**d)) elif op == 3: # Game start if d.get("system", None) is not None: self.emitter.emit("GameStart", GameStart.from_kwargs(**payload["d"])) else: self.emitter.emit("ConfirmGameStart", ConfirmGameStart.from_kwargs(**payload["d"])) elif op == 4: # Game end if d.get("left", None) is not None: self.emitter.emit("GameEnd", GameEnd.from_kwargs(**payload["d"])) else: self.emitter.emit("ConfirmGameEnd", ConfirmGameEnd.from_kwargs(**payload["d"])) elif op == 5: # KILL EVENT self.emitter.emit("KillEvent", KillEvent.from_kwargs(**d)) elif op == 6: # VERSION UPDATE self.emitter.emit("VersionUpdate", VersionUpdate.from_kwargs(**d)) elif op == 7: # CHAT MESSAGE if d.get("user", None) is not None: self.emitter.emit("ChatMessage", ChatMessage.from_kwargs(**payload["d"])) else: self.emitter.emit("ConfirmChatMessage", ConfirmChatMessage.from_kwargs(**payload["d"])) elif op == 10: # Hello (heartbeat interval) interval = d.get("heartbeat_interval", 3000) await self.send({"op": 2,"d":{"username":self.username,"isVoiceChat":self.enable_voice_chat,"v":self.version,"isMobile":self.isMobile,"isSecure":self.isSecure,"exchangeKey":self.exchange_key}}) await self._start_heartbeat(interval) self.emitter.emit("HelloEvent", HelloEvent.from_kwargs(**d)) elif op == 12: # EXCHANGE KEY JOIN self.emitter.emit("ExchangejoinEvent", ExchangejoinEvent.from_kwargs(**d)) elif op == 13: # EXCHANGE KEY ONLINE self.emitter.emit("ExchangeOnlineEvent", ExchangeOnlineEvent.from_kwargs(**d)) elif op == 14: # EXCHANGE KEY OFFLINE self.emitter.emit("ExchangeOfflineEvent", ExchangeOfflineEvent.from_kwargs(**d)) elif op == 15: # GAME ALIVE EXCHANGE KEY self.emitter.emit("ExchangeGameAliveEvent", ExchangeGameAliveEvent.from_kwargs(**d)) elif op == 16: # GAME END EXCHANGE KEY d["data"]["stuff"] = Stuff.from_kwargs(**d["data"]["stuff"]) self.emitter.emit("ExchangeGameEnd", ExchangeGameEnd.from_kwargs(**d["data"])) elif op == 87: # BROADCAST MESSAGE self.emitter.emit("BroadCasteEvent", BroadCasteEvent.from_kwargs(**d)) _LOG.info("Received BroadcastEvent (op 87).") elif op == 98: # VOICE CHAT UPDATE if d.get("user", None) is not None: self.emitter.emit("VoiceChatUpdate", VoiceChatUpdate.from_kwargs(**payload["d"])) else: self.emitter.emit("ConfirmVoiceChatUpdate", ConfirmVoiceChatUpdate.from_kwargs(**payload["d"])) elif op == 99: # VOICE DATA self.emitter.emit("VoiceData", VoiceData(d=d, u=payload.get("u"))) else: _LOG.warning(f"Unknown opcode: {op} — payload: {payload}")
[docs] async def send(self, payload: dict): if not self.is_connect or not self._ws: if len(self._message_queue) >= MESSAGE_QUEUE_MAX_SIZE: _LOG.warning("Message queue full, discarding payload.") return _LOG.debug("Queueing payload until reconnected.") self._message_queue.append(payload) return try: await self._ws.send_json(payload) except ConnectionResetError: _LOG.warning("Connection reset during send, requeueing payload.") self._message_queue.append(payload) await self._connect()
async def _start_heartbeat(self, interval: int): if self._heartbeat_task and not self._heartbeat_task.done(): self._heartbeat_task.cancel() self._heartbeat_task = self._loop.create_task(self._heartbeat(interval)) async def _heartbeat(self, interval: int): while self.is_connect and self._ws and not self._ws.closed: try: await asyncio.sleep(interval / 3000) await self.send({"op": 1, "d": {}}) except asyncio.CancelledError: break except Exception as e: _LOG.error(f"Heartbeat error: {e}") break @property def is_connected(self) -> bool: return self.is_connect and self._ws.closed is False @property def uuid(self) -> bool: return self._uuid