import abc
import asyncio
import logging
import threading
from asyncio import Future
from typing import Type, Any
import websockets
import websockets.exceptions
from google.protobuf import message as _message
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
from ..channels import msg_utils as msg_utils
from ..pb2.pccodemao_message_pb2 import Message
log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
if log.level == logging.NOTSET:
    log.setLevel(logging.WARNING)
[docs]class AbstractMsgHandler(abc.ABC):
    def __init__(self, identify=0, msg_clazz: type(_message.Message) = Message):
        if issubclass(msg_clazz, _message.Message):
            self.__msg_clazz = msg_clazz or Message
            self.__identify = identify
        else:
            class_name = type(self).__name__
            raise Exception(f'{class_name} is not subclass of {Message.__name__}')
[docs]    @abc.abstractmethod
    def handle_msg(self, message: _message.Message):
        pass 
[docs]    def parse_msg(self, body_data):
        if self.msg_clazz == Message:
            log.info('no need parse body message.')
            return body_data
        return self.msg_clazz().ParseFromString(body_data) 
    @property
    def msg_clazz(self):
        return self.__msg_clazz
    @property
    def identify(self):
        return self.__identify
    def __repr__(self):
        class_name = type(self).__name__
        msg_clazz_name = type(self.msg_clazz).__name__
        return '({!r}, {!r}, {!r})'.format(class_name, msg_clazz_name, self.__identify) 
[docs]class DefaultMsgHandler(AbstractMsgHandler):
[docs]    def handle_msg(self, message: _message.Message):
        log.warning(f'handle msg: {message}')  
class _CoroutineHandler(AbstractMsgHandler):
    def __init__(self, identify, future: Future):
        super().__init__(identify)
        self.coroutine = None
        self.__future = future
    def handle_msg(self, message: _message.Message):
        log.debug(f'receiver message = {message}')
        if self.__future.cancelled() or self.__future.done():
            return
        self.__future.set_result(message)
class _MessageDispatcher(object):
    def __init__(self):
        self.__handlers = {}
    def add_handler(self, cmd, msg_handler: AbstractMsgHandler = DefaultMsgHandler(Message)):
        if isinstance(msg_handler, AbstractMsgHandler):
            self.handlers.setdefault(cmd, []).append(msg_handler)
        else:
            class_name = type(msg_handler).__name__
            raise Exception(f'{class_name} is not instance of {AbstractMsgHandler.__name__}')
    def remove_handler(self, cmd):
        del self.handlers[cmd]
    def remove_handler0(self, cmd, handler: AbstractMsgHandler):
        self.handlers.get(cmd).remove(handler)
    def __len__(self):
        return len(self.handlers)
    def __iter__(self):
        return tuple(self.handlers)
    @property
    def handlers(self):
        return self.__handlers
    def __repr__(self):
        return '{}({!r})'.format(*self)
    def dispatch(self, message: Message):
        header = message.header
        handler_list = self.handlers.get(header.command)
        if handler_list is not None:
            found: bool = False
            for handler in handler_list:
                if header.id == str(handler.identify):
                    log.debug(f'find handler = {handler}')
                    found = True
                    if header.target == -1:
                        log.warning(f"cmd={header.command} is unsupported by current robot.")
                    else:
                        handler.handle_msg(message)
                    if isinstance(handler, _CoroutineHandler):
                        log.debug(f'remove cmd={header.command} handlers.')
                        self.remove_handler(header.command)
            if not found:
                log.warning(f'1.ignore: cmd={header.command}, cmd no handlers')
        else:
            log.warning(f'2.ignore:cmd={header.command}, cmd no handlers.')
class _UBTWebSocketClient(object):
    __init_flag = False
    _instance_lock = threading.Lock()
    def __init__(self):
        if not _UBTWebSocketClient.__init_flag:
            _UBTWebSocketClient.__init_flag = True
            log.info(f'init {_UBTWebSocketClient.__name__}')
            self._client = None
            self.__ip = 'localhost'
            self.__port = 8800
            self.__dispatcher = _MessageDispatcher()
            self.__generator = msg_utils.id_generator()
    def __new__(cls, *args, **kwargs):
        if not hasattr(_UBTWebSocketClient, "_instance"):
            with _UBTWebSocketClient._instance_lock:
                if not hasattr(_UBTWebSocketClient, "_instance"):
                    _UBTWebSocketClient._instance = object.__new__(cls)
        return _UBTWebSocketClient._instance
    @property
    def ip(self):
        return self.__ip
    @property
    def port(self):
        return self.__port
    @property
    def alive(self):
        return self._client and (not self._client.closed)
    def __format__(self, format_spec=''):
        return 'ip={!r},port={!r}'.format(self.__ip, self.__port)
    def __repr__(self):
        class_name = type(self).__name__
        return '{!r}({!r}, {!r})'.format(class_name, self.__ip, self.__port)
    async def connect(self, ip_address='localhost', port=8800, timeout=15000) -> bool:
        self.__ip = ip_address
        self.__port = port
        return await asyncio.wait_for(self.__connect(), timeout)
    async def __connect(self) -> bool:
        if self.alive:
            await self._client.close(reason='exit for reconnecting.')
        try:
            log.info(f'connect begin')
            self._client = await websockets.connect('ws://{}:{!r}'.format(self.ip, self.port))
            log.info(f'connect success')
            # self.__loop()
            asyncio.create_task(self.__loop())
            # asyncio.get_running_loop().run_forever()
            return True
        except OSError as error:
            log.error(f'WebSocket server no startUp: error{error}')
            return False
    async def __loop(self):
        log.debug(f'begin loop.')
        while self.alive:
            _data = None
            try:
                _data = await self._client.recv()
                # do parse
                _bytes = msg_utils.base64_decode(_data)
                msg = msg_utils.parse_msg(_bytes)
                log.info(f"recv msg: {msg}")
                # do dispatch
                self.__dispatch(msg)
            except Exception as e:
                if isinstance(e, ConnectionClosedOK):
                    log.warning("connection closed ok!")
                else:
                    log.exception('recv fail : %r', e)
        log.debug(f'end loop.')
        # loop = asyncio.get_running_loop()
        # if loop is not None:
        #     loop.close()
    def __dispatch(self, message: Message):
        self.__dispatcher.dispatch(message)
    async def send_msg0(self, cmd, message: _message.Message) -> bool:
        if self.alive:
            client = self._client
            identify = 0
            pccode_mao_message: Message = msg_utils.build_request_msg(cmd, send_serial=identify, request=message)
            async def send0():
                try:
                    await client.send(msg_utils.base64_encode(pccode_mao_message.SerializeToString()))
                    log.info(
                        'send cmd={!r}, identify={!r}, message={!r}'.format(pccode_mao_message.header.command,
                                                                            pccode_mao_message.header.id,
                                                                            message))
                    return True
                except (ConnectionClosed, ConnectionClosedOK) as close:
                    log.warning(f'Waring: connect is closed: {close}')
                    return False
                except (ConnectionError, ConnectionResetError, ConnectionAbortedError, ConnectionRefusedError) as error:
                    log.warning(f'Waring: connect error: {error}')
                    return False
            try:
                await asyncio.create_task(send0())
                return True
            except Exception as e:
                log.warning(f'receiver response failure: {e}')
                return False
        else:
            log.warning(f"Waring: Client not connected!")
            return False
    def register_msg_handler(self, cmd, handler: AbstractMsgHandler):
        self.__dispatcher.add_handler(cmd, handler)
    def unregister_msg_handler(self, cmd, handler: AbstractMsgHandler = None):
        if handler is None:
            self.__dispatcher.remove_handler(cmd)
        else:
            self.__dispatcher.remove_handler0(cmd, handler)
    async def send_msg(self, cmd, message: _message.Message, timeout) -> Any:
        if self.alive:
            client = self._client
            identify = self.generate_id()
            pccode_mao_message: Message = msg_utils.build_request_msg(cmd, send_serial=identify, request=message)
            future = asyncio.get_running_loop().create_future()
            handler = _CoroutineHandler(identify, future)
            log.debug(f'register cmd={pccode_mao_message.header.command} handler={handler}')
            self.register_msg_handler(pccode_mao_message.header.command, handler)
            async def send1():
                try:
                    await client.send(msg_utils.base64_encode(pccode_mao_message.SerializeToString()))
                    log.info(
                        'send cmd={!r}, identify={!r}, message={!r}'.format(pccode_mao_message.header.command,
                                                                            pccode_mao_message.header.id,
                                                                            message))
                    return await future
                except (
                        ConnectionClosed, ConnectionClosedOK, ConnectionError, ConnectionResetError,
                        ConnectionAbortedError,
                        ConnectionRefusedError) as close:
                    log.warning(f'connect close. {close}')
                except Exception as e2:
                    log.warning(f'send message failure: {e2}')
            try:
                return await asyncio.wait_for(send1(), timeout)
            except Exception as e:
                log.warning(f'recv response  failure: {e}')
                return None
        else:
            log.warning(f'client is not alive')
            return None
    def generate_id(self) -> int:
        return self.__generator.send(0)
    async def shutdown(self):
        if self.alive:
            self.__generator.close()
            await self._client.close(reason="client closed")
        else:
            log.warning(f'client is not alive.')
ubt_websocket: Type[_UBTWebSocketClient] = _UBTWebSocketClient