Source code for kxspy.client

import logging
import typing as t
import numpy as np
import aiohttp
from .ws import WS
from .events import Event
from .utils import get_random_username
from typing import Union, List, Optional
from .rest import RestApi

_LOG = logging.getLogger("kxspy.client")

[docs] class Client: """ The main class. """ def __init__( self, ws_url: str = "wss://network.kxs.rip/", rest_url: str = "https://network.kxs.rip", username: str = get_random_username(), enablevoicechat: bool = False, exchangekey: str = None, isMobile: bool = False, isSecure: bool = True, admin_key: str = None, connect: bool = True, session: t.Optional[aiohttp.ClientSession] = None ) -> None: self.ws = WS( ws_url=ws_url, username=username, enable_voice_chat=enablevoicechat, exchange_key=exchangekey, connect=connect, isMobile=isMobile, isSecure=isSecure, session=session ) self.username = username self.rest = RestApi(rest_url,admin_key,session) self.emitter = self.ws.emitter self._registered_listeners: t.List[t.Tuple[t.Any, t.Callable, t.Type[Event]]] = []
[docs] def add_event_hooks(self, obj): """ Scans the provided class ``obj`` for functions decorated with :func:`listener`, and sets them up to process Lavalink events. """ for attr_name in dir(obj): method = getattr(obj, attr_name) events = getattr(method, "_kxspy_events", None) if not events: continue if not events: self.emitter.add_listener(event=None, func=method) else: for ev in events: self.emitter.add_listener(event=ev, func=method)
[docs] def remove_event_hooks(self, obj: t.Any): """ Removes all previously registered listeners for an object. """ to_remove = [r for r in self._registered_listeners if r[0] == obj] for _, method, ev in to_remove: try: self.emitter.remove_listener(event=ev, func=method) _LOG.debug(f"Removed listener: {method.__name__} for {ev}") except Exception as e: _LOG.warning(f"Failed to remove listener {method.__name__}: {e}") self._registered_listeners.remove((_, method, ev))
[docs] async def connect(self): """Connect to Kxs Network.""" await self.ws.connect()
[docs] async def close(self): """Close connection to Kxs Network.""" await self.ws.close()
[docs] async def join_game(self, gameId): """Join a game by its ID.""" await self.ws.send({"op": 3, "d": {"gameId": gameId,"user": self.username}})
[docs] async def leave_game(self): """Leave the current game.""" await self.ws.send({"op": 4, "d": {}})
[docs] async def report_kill(self,killer: str, killed: str): """Report a kill in the game""" await self.ws.send({"op": 5, "d": {"killer":killer,"killed":killed}})
[docs] async def check_version(self): """Check for the latest version of Kxs""" await self.ws.send({"op": 6, "d": {}})
[docs] async def send_message(self,text: str): """Send a message to the in-game chat""" await self.ws.send({"op": 7, "d": {"text":text}})
[docs] async def update_voicechat(self,isVoiceChat: bool): """Update the voice chat status""" await self.ws.send({"op": 98, "d": {"isVoiceChat":isVoiceChat}})
[docs] async def send_voicedata(self, audio_data: Union[bytes, bytearray,np.ndarray, List[int]], user_id: Optional[str] = None): """Update the voice chat status""" if isinstance(audio_data, (bytes, bytearray)): int16_array = np.frombuffer(audio_data, dtype=np.int16) data_to_send = int16_array.tolist() elif isinstance(audio_data, np.ndarray) and audio_data.dtype == np.int16: data_to_send = audio_data.tolist() elif isinstance(audio_data, list): data_to_send = audio_data else: raise TypeError( "audio_data must be bytes, bytearray, numpy.ndarray[int16], or list[int]" ) await self.ws.send({"op": 99, "d": data_to_send, "u":user_id or self.ws.uuid})
[docs] async def ws_latency(self): """Send the latency of websocket""" return await self.ws.measure_latency()