Source code for mini.channels.websocket_client

import abc
import asyncio
import logging
import threading
from asyncio import Future
from typing import Type, Any

import websockets
import websockets.exceptions
from google.protobuf import message as _message
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK

from ..channels import msg_utils as msg_utils
from ..pb2.pccodemao_message_pb2 import Message

log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
if log.level == logging.NOTSET:
    log.setLevel(logging.WARNING)


[docs]class AbstractMsgHandler(abc.ABC): def __init__(self, identify=0, msg_clazz: type(_message.Message) = Message): if issubclass(msg_clazz, _message.Message): self.__msg_clazz = msg_clazz or Message self.__identify = identify else: class_name = type(self).__name__ raise Exception(f'{class_name} is not subclass of {Message.__name__}')
[docs] @abc.abstractmethod def handle_msg(self, message: _message.Message): pass
[docs] def parse_msg(self, body_data): if self.msg_clazz == Message: log.info('no need parse body message.') return body_data return self.msg_clazz().ParseFromString(body_data)
@property def msg_clazz(self): return self.__msg_clazz @property def identify(self): return self.__identify def __repr__(self): class_name = type(self).__name__ msg_clazz_name = type(self.msg_clazz).__name__ return '({!r}, {!r}, {!r})'.format(class_name, msg_clazz_name, self.__identify)
[docs]class DefaultMsgHandler(AbstractMsgHandler):
[docs] def handle_msg(self, message: _message.Message): log.warning(f'handle msg: {message}')
class _CoroutineHandler(AbstractMsgHandler): def __init__(self, identify, future: Future): super().__init__(identify) self.coroutine = None self.__future = future def handle_msg(self, message: _message.Message): log.debug(f'receiver message = {message}') if self.__future.cancelled() or self.__future.done(): return self.__future.set_result(message) class _MessageDispatcher(object): def __init__(self): self.__handlers = {} def add_handler(self, cmd, msg_handler: AbstractMsgHandler = DefaultMsgHandler(Message)): if isinstance(msg_handler, AbstractMsgHandler): self.handlers.setdefault(cmd, []).append(msg_handler) else: class_name = type(msg_handler).__name__ raise Exception(f'{class_name} is not instance of {AbstractMsgHandler.__name__}') def remove_handler(self, cmd): del self.handlers[cmd] def remove_handler0(self, cmd, handler: AbstractMsgHandler): self.handlers.get(cmd).remove(handler) def __len__(self): return len(self.handlers) def __iter__(self): return tuple(self.handlers) @property def handlers(self): return self.__handlers def __repr__(self): return '{}({!r})'.format(*self) def dispatch(self, message: Message): header = message.header handler_list = self.handlers.get(header.command) if handler_list is not None: found: bool = False for handler in handler_list: if header.id == str(handler.identify): log.debug(f'find handler = {handler}') found = True if header.target == -1: log.warning(f"cmd={header.command} is unsupported by current robot.") else: handler.handle_msg(message) if isinstance(handler, _CoroutineHandler): log.debug(f'remove cmd={header.command} handlers.') self.remove_handler(header.command) if not found: log.warning(f'1.ignore: cmd={header.command}, cmd no handlers') else: log.warning(f'2.ignore:cmd={header.command}, cmd no handlers.') class _UBTWebSocketClient(object): __init_flag = False _instance_lock = threading.Lock() def __init__(self): if not _UBTWebSocketClient.__init_flag: _UBTWebSocketClient.__init_flag = True log.info(f'init {_UBTWebSocketClient.__name__}') self._client = None self.__ip = 'localhost' self.__port = 8800 self.__dispatcher = _MessageDispatcher() self.__generator = msg_utils.id_generator() def __new__(cls, *args, **kwargs): if not hasattr(_UBTWebSocketClient, "_instance"): with _UBTWebSocketClient._instance_lock: if not hasattr(_UBTWebSocketClient, "_instance"): _UBTWebSocketClient._instance = object.__new__(cls) return _UBTWebSocketClient._instance @property def ip(self): return self.__ip @property def port(self): return self.__port @property def alive(self): return self._client and (not self._client.closed) def __format__(self, format_spec=''): return 'ip={!r},port={!r}'.format(self.__ip, self.__port) def __repr__(self): class_name = type(self).__name__ return '{!r}({!r}, {!r})'.format(class_name, self.__ip, self.__port) async def connect(self, ip_address='localhost', port=8800, timeout=15000) -> bool: self.__ip = ip_address self.__port = port return await asyncio.wait_for(self.__connect(), timeout) async def __connect(self) -> bool: if self.alive: await self._client.close(reason='exit for reconnecting.') try: log.info(f'connect begin') self._client = await websockets.connect('ws://{}:{!r}'.format(self.ip, self.port)) log.info(f'connect success') # self.__loop() asyncio.create_task(self.__loop()) # asyncio.get_running_loop().run_forever() return True except OSError as error: log.error(f'WebSocket server no startUp: error{error}') return False async def __loop(self): log.debug(f'begin loop.') while self.alive: _data = None try: _data = await self._client.recv() # do parse _bytes = msg_utils.base64_decode(_data) msg = msg_utils.parse_msg(_bytes) log.info(f"recv msg: {msg}") # do dispatch self.__dispatch(msg) except Exception as e: if isinstance(e, ConnectionClosedOK): log.warning("connection closed ok!") else: log.exception('recv fail : %r', e) log.debug(f'end loop.') # loop = asyncio.get_running_loop() # if loop is not None: # loop.close() def __dispatch(self, message: Message): self.__dispatcher.dispatch(message) async def send_msg0(self, cmd, message: _message.Message) -> bool: if self.alive: client = self._client identify = 0 pccode_mao_message: Message = msg_utils.build_request_msg(cmd, send_serial=identify, request=message) async def send0(): try: await client.send(msg_utils.base64_encode(pccode_mao_message.SerializeToString())) log.info( 'send cmd={!r}, identify={!r}, message={!r}'.format(pccode_mao_message.header.command, pccode_mao_message.header.id, message)) return True except (ConnectionClosed, ConnectionClosedOK) as close: log.warning(f'Waring: connect is closed: {close}') return False except (ConnectionError, ConnectionResetError, ConnectionAbortedError, ConnectionRefusedError) as error: log.warning(f'Waring: connect error: {error}') return False try: await asyncio.create_task(send0()) return True except Exception as e: log.warning(f'receiver response failure: {e}') return False else: log.warning(f"Waring: Client not connected!") return False def register_msg_handler(self, cmd, handler: AbstractMsgHandler): self.__dispatcher.add_handler(cmd, handler) def unregister_msg_handler(self, cmd, handler: AbstractMsgHandler = None): if handler is None: self.__dispatcher.remove_handler(cmd) else: self.__dispatcher.remove_handler0(cmd, handler) async def send_msg(self, cmd, message: _message.Message, timeout) -> Any: if self.alive: client = self._client identify = self.generate_id() pccode_mao_message: Message = msg_utils.build_request_msg(cmd, send_serial=identify, request=message) future = asyncio.get_running_loop().create_future() handler = _CoroutineHandler(identify, future) log.debug(f'register cmd={pccode_mao_message.header.command} handler={handler}') self.register_msg_handler(pccode_mao_message.header.command, handler) async def send1(): try: await client.send(msg_utils.base64_encode(pccode_mao_message.SerializeToString())) log.info( 'send cmd={!r}, identify={!r}, message={!r}'.format(pccode_mao_message.header.command, pccode_mao_message.header.id, message)) return await future except ( ConnectionClosed, ConnectionClosedOK, ConnectionError, ConnectionResetError, ConnectionAbortedError, ConnectionRefusedError) as close: log.warning(f'connect close. {close}') except Exception as e2: log.warning(f'send message failure: {e2}') try: return await asyncio.wait_for(send1(), timeout) except Exception as e: log.warning(f'recv response failure: {e}') return None else: log.warning(f'client is not alive') return None def generate_id(self) -> int: return self.__generator.send(0) async def shutdown(self): if self.alive: self.__generator.close() await self._client.close(reason="client closed") else: log.warning(f'client is not alive.') ubt_websocket: Type[_UBTWebSocketClient] = _UBTWebSocketClient