mini.apis.base_api 源代码

#!/usr/bin/env python3
import abc
import asyncio
import enum
from abc import ABC
from typing import Callable, Union

from ..channels.websocket_client import ubt_websocket as _UBTWebSocket, AbstractMsgHandler


socket = _UBTWebSocket()

[文档]@enum.unique class MiniApiResultType(enum.Enum): """ Api的返回结果状态类型 Success : 收到机器人回复 Timeout : 在超时时间内为收到机器人回复 Unsupported : 不支持该指令 """ Success = 1 # 成功 Timeout = 2 # 超时 Unsupported = 3 # 不支持的命令
[文档]class BaseApi(abc.ABC): """消息api基类 """
[文档] async def send(self, cmd_id: int, message, timeout: int) -> Union[object, bool]: """发送消息方法 注意:由子类函数内部调用,子类实例不可调用。 Args: cmd_id (int): 支持的命令id,例如:mini.apis.cmdid.PLAY_ACTION_REQUEST message (Message): 支持的消息实体,例如:mini.pb2.PlayActionRequest timeout (int): 超时时间,当timeout<=0时,表示不需要等待机器人回复,当timeout>0时,表示需要等待机器人回复, Returns: 如果不支持该指令,返回tuple(MiniApiResultType.Unsupported,None) 如果支持该指令 当timeout<=0时,返回bool,表示消息是否发送成功 当timeout>0时 如果消息超时或者发送失败,则返回tuple(MiniApiResultType.Timeout,None) 如果消息有回复,则返回tuple(MiniApiResultType.Success,result),result为相应的回复消息 """ assert cmd_id >= 0, 'cmdId should not be negative number in BaseApi' assert message is not None, 'message should not be none in BaseApi' # 通用的发送消息逻辑 if timeout <= 0: return await socket.send_msg0(cmd_id, message) else: result = await socket.send_msg(cmd_id, message, timeout) if result: if == -1: import logging log = logging.getLogger(__name__) log.addHandler(logging.StreamHandler()) log.setLevel(logging.WARNING) log.warning(f'当前机器人版本不支持命令:cmd={message.header.command}, 请升级机器人系统版本.') return MiniApiResultType.Unsupported, None else: return MiniApiResultType.Success, self._parse_msg(result) else: return MiniApiResultType.Timeout, None
[文档] async def execute(self): """发送指令 将支持的message序列化后,写入socket 由子类实现 """ raise NotImplementedError()
def _parse_msg(self, message): """解析回复指令 将收到的Message对象包含的消息数据反序列化为相应的response 由子类实现,仅供子类内部调用 Args: message: Message对象 """ raise NotImplementedError()
[文档]class BaseApiNeedResponse(BaseApi, abc.ABC): """ 需要回复的消息api基类,timeout不能为空 """
[文档] async def send(self, cmd_id, data, timeout: int): """重写父类方法 校验timeout,必须>0 """ assert timeout > 0, 'timeout should be Positive number in BaseApiNeedResponse' return await super().send(cmd_id, data, timeout)
[文档]class BaseApiNoNeedResponse(BaseApi, ABC): """不需要回复的消息api基类 """
[文档] async def send(self, cmd_id, message, timeout: int = 0): """重写父类方法 将timeout设置为0 """ return await super().send(cmd_id, message, 0)
[文档]class BaseEventApi(BaseApiNoNeedResponse, AbstractMsgHandler, ABC): """事件类消息api基类 当注册了事件处理器后,由机器人主动推送过来的事件消息 Args: cmd_id (int): 注册的指令id message (Message): 向机器人发送的注册消息 is_repeat (bool): 是否重复监听,默认为True timeout (int): 超时时间,默认为0 handler (Callable): 事件消息处理器,f(message) """ def __init__(self, cmd_id: int, message, is_repeat: bool = True, timeout: int = 0, handler: 'Callable[..., None]' = None): """初始化事件类消息 """ super().__init__() self.__cmd_id = cmd_id self.__request = message self.__is_repeat = is_repeat self.__timeout = timeout self.__handler = handler if is_repeat: self.__repeatCount = -1 else: self.__repeatCount = 1
[文档] def set_handler(self, handler: 'Callable[..., None]' = None): """设置事件消息处理器 Args: handler (Callable): 事件消息处理器,f(message) """ self.__handler = handler
[文档] def start(self): """启动监听器 """ # 发送消息 asyncio.create_task(self.send(cmd_id=self.__cmd_id, message=self.__request)) # 注册监听 socket.register_msg_handler(cmd=self.__cmd_id, handler=self)
[文档] def stop(self): """停止监听器 子类需要通知机器人停止事件上报 """ # 移除消息监听 socket.unregister_msg_handler(cmd=self.__cmd_id, handler=self)
# AbstractMsgHandler
[文档] def handle_msg(self, message): # 处理监听次数 if self.__repeatCount > 0: # 有监听次数 self.__handle_msg(message) self.__repeatCount -= 1 elif self.__repeatCount == -1: # 无限监听 self.__handle_msg(message)
def __handle_msg(self, message): if == -1: import logging log = logging.getLogger(__name__) log.addHandler(logging.StreamHandler()) log.setLevel(logging.WARNING) log.warning(f'当前机器人版本不支持命令:cmd={message.header.command}, 请升级机器人系统版本.') return if self.__handler is not None: self.__handler(self._parse_msg(message))