mini.apis.api_action 源代码

#!/usr/bin/env python3
import enum

from ..apis.base_api import BaseApi, DEFAULT_TIMEOUT
from ..apis.cmdid import _PCProgramCmdId
from ..pb2.codemao_getactionlist_pb2 import GetActionListRequest, GetActionListResponse
from ..pb2.codemao_moverobot_pb2 import MoveRobotRequest, MoveRobotResponse
from ..pb2.codemao_playaction_pb2 import PlayActionRequest, PlayActionResponse
from ..pb2.codemao_playcustomaction_pb2 import PlayCustomActionRequest, PlayCustomActionResponse
from ..pb2.codemao_stopaction_pb2 import StopActionRequest, StopActionResponse
from ..pb2.codemao_stopcustomaction_pb2 import StopCustomActionRequest, StopCustomActionResponse
from ..pb2.pccodemao_message_pb2 import Message


[文档]class PlayAction(BaseApi): """执行内置动作api 机器人执行一个指定名称的内置动作 动作名称可用GetActionList获取 Args: is_serial (bool): 是否等待回复,默认True action_name (str): 动作名称,不能为none或空字符串 #PlayActionResponse.isSuccess : 是否成功 #PlayActionResponse.resultCode : 返回码 """ def __init__(self, is_serial: bool = True, action_name: str = None): """执行动作api初始化 """ assert isinstance(action_name, str) and action_name is not None and len( action_name), 'PlayAction actionName should be available' self.__is_serial = is_serial self.__action_name = action_name
[文档] async def execute(self): """发送执行动作指令 Returns: PlayActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = PlayActionRequest() request.actionName = self.__action_name cmd_id = _PCProgramCmdId.PLAY_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """解析回复指令 Args: message (Message):待解析的Message对象 Returns: PlayActionResponse """ if isinstance(message, Message): data = message.bodyData response = PlayActionResponse() response.ParseFromString(data) return response else: return None
[文档]class StopAllAction(BaseApi): """停止所有动作api 停止所有正在执行的动作, 停止指定的自定义动作, 如果动作是一个不可打断的动作, StopCustomActionResponse.resultCode = 403 Args is_serial (bool): 是否等待回复,默认True """ def __init__(self, is_serial: bool = True): self.__is_serial = is_serial
[文档] async def execute(self): """ 发送停止所有动作指令 Returns: StopActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = StopActionRequest() cmd_id = _PCProgramCmdId.STOP_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: StopActionResponse #StopCustomActionResponse.isSuccess : 是否成功 #StopCustomActionResponse.resultCode : 返回码 """ if isinstance(message, Message): data = message.bodyData response = StopActionResponse() response.ParseFromString(data) return response else: return None
[文档]@enum.unique class MoveRobotDirection(enum.Enum): """机器人移动方向 FORWARD : 向前 BACKWARD : 向后 LEFTWARD : 向左 RIGHTWARD : 向右 """ FORWARD = 3 # 向前 BACKWARD = 4 # 向后 LEFTWARD = 1 # 向左 RIGHTWARD = 2 # 向右
[文档]class MoveRobot(BaseApi): """控制机器人移动api 控制机器人往某个方向(MoveRobotDirection)移动n步 Args: is_serial (bool): 是否等待回复,默认True direction (MoveRobotDirection): 机器人移动方向,默认FORWARD,向前移动 step (int): 步数,默认1步 #MoveRobotResponse.isSuccess : 是否成功  #MoveRobotResponse.code : 返回码 """ def __init__(self, is_serial: bool = True, direction: MoveRobotDirection = MoveRobotDirection.FORWARD, step: int = 1): assert isinstance(direction, MoveRobotDirection), 'MoveRobot : direction should be MoveRobotDirection instance' assert isinstance(step, int) and step > 0, 'MoveRobot : step should be Positive' self.__is_serial = is_serial self.__direction = direction.value self.__step = step
[文档] async def execute(self): """发送机器人移动指令 Returns: MoveRobotResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = MoveRobotRequest() request.direction = self.__direction request.step = self.__step cmd_id = _PCProgramCmdId.MOVE_ROBOT_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: MoveRobotResponse """ if isinstance(message, Message): data = message.bodyData response = MoveRobotResponse() response.ParseFromString(data) return response else: return None
[文档]@enum.unique class RobotActionType(enum.Enum): """ 机器人动作类型 INNER(内置):机器人内置的不可修改的动作文件 CUSTOM(自定义): 放置在sdcard/customize/actions目录下可被开发者修改的动作文件 """ INNER = 0 # 内置 CUSTOM = 1 # 自定义
[文档]class GetActionList(BaseApi): """获取机器人动作列表api 获取存储在机器人本地(内置/自定义)的动作文件列表 Args: is_serial (bool): 是否等待回复,默认True action_type (RobotActionType): 动作类型,默认为INNER,内置动作 #GetActionListResponse.actionList ([str]) : 动作列表,str数组 #GetActionListResponse.isSuccess : 是否成功 #GetActionListResponse.resultCode : 返回码 """ def __init__(self, is_serial: bool = True, action_type: RobotActionType = RobotActionType.INNER): assert isinstance(action_type, RobotActionType), 'GetActionList : action_type should be RobotActionType ' \ 'instance ' self.__is_serial = is_serial self.__action_type = action_type.value
[文档] async def execute(self): """发送获取机器人动作列表指令 Returns: GetActionListResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = GetActionListRequest() request.actionType = self.__action_type cmd_id = _PCProgramCmdId.GET_ACTION_LIST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: GetActionListResponse """ if isinstance(message, Message): data = message.bodyData response = GetActionListResponse() response.ParseFromString(data) return response else: return None
[文档]class PlayCustomAction(BaseApi): """执行自定义动作api 让机器人执行一个指定名称的自定义动作 动作名称可用GetActionList获取 Args: is_serial (bool): 是否等待回复,默认True action_name (str): 自定义动作名称,不可为空或者None #PlayCustomActionResponse.isSuccess : 是否成功 #PlayCustomActionResponse.resultCode : 返回码 """ def __init__(self, is_serial: bool = True, action_name: str = None): assert isinstance(action_name, str) and len(action_name) > 0, 'PlayCustomAction : actionName should be ' \ 'available ' self.__is_serial = is_serial self.__action_name = action_name
[文档] async def execute(self): """发送执行自定义动作指令 Returns: PlayCustomActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = PlayCustomActionRequest() request.actionName = self.__action_name cmd_id = _PCProgramCmdId.PLAY_CUSTOM_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: PlayCustomActionResponse """ if isinstance(message, Message): data = message.bodyData response = PlayCustomActionResponse() response.ParseFromString(data) return response else: return None
[文档]class StopCustomAction(BaseApi): """停止自定义动作api 停止指定的自定义动作, 如果动作是一个不可打断的动作, StopCustomActionResponse.resultCode = 403 Args: is_serial (bool): 是否等待回复,默认True action_name (str): 自定义动作名称,不可为空或None #StopCustomActionResponse.isSuccess : 是否成功 #StopCustomActionResponse.resultCode : 返回码 """ def __init__(self, is_serial: bool = True, action_name: str = None): assert isinstance(action_name, str) and len(action_name) > 0, 'StopCustomAction actionName should be available' self.__is_serial = is_serial self.__action_name = action_name
[文档] async def execute(self): """执行停止自定义动作指令 Returns: StopCustomActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = StopCustomActionRequest() request.actionName = self.__action_name cmd_id = _PCProgramCmdId.STOP_CUSTOM_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: StopCustomActionResponse """ if isinstance(message, Message): data = message.bodyData response = StopCustomActionResponse() response.ParseFromString(data) return response else: return None