Source code for mini.apis.base_api

#!/usr/bin/env python3
import abc
import asyncio
import enum
from abc import ABC
from typing import Callable, Union

from ..channels.websocket_client import ubt_websocket as _UBTWebSocket, AbstractMsgHandler

DEFAULT_TIMEOUT = 300

socket = _UBTWebSocket()


[docs]@enum.unique class MiniApiResultType(enum.Enum): """ Api return result status type Success: Received robot reply Timeout: Reply to the robot received within the timeout period Unsupported: The instruction is not supported """ Success = 1 # Success Timeout = 2 # Timeout Unsupported = 3 # Unsupported command
[docs]class BaseApi(abc.ABC): """Message api base class """
[docs] async def send(self, cmd_id: int, message, timeout: int) -> Union[object, bool]: """Send message method Note: Called internally by subclass functions, subclass instances cannot be called. Args: cmd_id (int): Supported command id, for example: mini.apis.cmdid.PLAY_ACTION_REQUEST message (Message): Supported message entity, for example: mini.pb2.PlayActionRequest timeout (int): timeout time. When timeout<=0, it means that there is no need to wait for the robot to reply.When timeout>0, it means that it needs to wait for the robot to reply. Returns: If the instruction is not supported, return tuple(MiniApiResultType.Unsupported,None) If the instruction is supported When timeout<=0, bool is returned, indicating whether the message was sent successfully When timeout>0 If the message times out or fails to be sent, tuple(MiniApiResultType.Timeout,None) is returned If the message has a reply, it will return tuple(MiniApiResultType.Success,result), result is the corresponding reply message """ assert cmd_id >= 0, 'cmdId should not be negative number in BaseApi' assert message is not None, 'message should not be none in BaseApi' # 通用的发送消息逻辑 if timeout <= 0: return await socket.send_msg0(cmd_id, message) else: result = await socket.send_msg(cmd_id, message, timeout) if result: if result.header.target == -1: import logging log = logging.getLogger(__name__) log.addHandler(logging.StreamHandler()) log.setLevel(logging.WARNING) log.warning(f'当前机器人版本不支持命令:cmd={message.header.command}, 请升级机器人系统版本.') return MiniApiResultType.Unsupported, None else: return MiniApiResultType.Success, self._parse_msg(result) else: return MiniApiResultType.Timeout, None
[docs] async def execute(self): """Send instructions After serializing the supported message, write it to the socket Implemented by subclasses """ raise NotImplementedError()
def _parse_msg(self, message): """解析回复指令 将收到的Message对象包含的消息数据反序列化为相应的response 由子类实现,仅供子类内部调用 Args: message: Message对象 """ raise NotImplementedError()
[docs]class BaseApiNeedResponse(BaseApi, abc.ABC): """ The message api base class that needs to be replied, timeout cannot be empty """
[docs] async def send(self, cmd_id, data, timeout: int): """Override the parent method Check timeout, must be >0 """ assert timeout > 0, 'timeout should be Positive number in BaseApiNeedResponse' return await super().send(cmd_id, data, timeout)
[docs]class BaseApiNoNeedResponse(BaseApi, ABC): """Message api base class without reply """
[docs] async def send(self, cmd_id, message, timeout: int = 0): """Override the parent method Set timeout to 0 """ return await super().send(cmd_id, message, 0)
[docs]class BaseEventApi(BaseApiNoNeedResponse, AbstractMsgHandler, ABC): """Event class message api base class When the event handler is registered, the event message actively pushed by the robot Args: cmd_id (int): registered instruction id message (Message): the registration message sent to the robot is_repeat (bool): Whether to repeat the monitoring, the default is True timeout (int): timeout time, the default is 0 handler (Callable): event message handler, f(message) """ def __init__(self, cmd_id: int, message, is_repeat: bool = True, timeout: int = 0, handler: 'Callable[..., None]' = None): """初始化事件类消息 """ super().__init__() self.__cmd_id = cmd_id self.__request = message self.__is_repeat = is_repeat self.__timeout = timeout self.__handler = handler if is_repeat: self.__repeatCount = -1 else: self.__repeatCount = 1
[docs] def set_handler(self, handler: 'Callable[..., None]' = None): """Set up event message handler Args: handler (Callable): event message handler, f(message) """ self.__handler = handler
[docs] def start(self): """Start the listener """ # 发送消息 asyncio.create_task(self.send(cmd_id=self.__cmd_id, message=self.__request)) # 注册监听 socket.register_msg_handler(cmd=self.__cmd_id, handler=self)
[docs] def stop(self): """Stop listener The subclass needs to notify the robot to stop reporting the event """ # 移除消息监听 socket.unregister_msg_handler(cmd=self.__cmd_id, handler=self)
# AbstractMsgHandler
[docs] def handle_msg(self, message): # 处理监听次数 if self.__repeatCount > 0: # 有监听次数 self.__handle_msg(message) self.__repeatCount -= 1 elif self.__repeatCount == -1: # 无限监听 self.__handle_msg(message)
def __handle_msg(self, message): if message.header.target == -1: import logging log = logging.getLogger(__name__) log.addHandler(logging.StreamHandler()) log.setLevel(logging.WARNING) log.warning(f'当前机器人版本不支持命令:cmd={message.header.command}, 请升级机器人系统版本.') return if self.__handler is not None: self.__handler(self._parse_msg(message))