Source code for mini.apis.api_action

#!/usr/bin/env python3
import enum

from ..apis.base_api import BaseApi, DEFAULT_TIMEOUT
from ..apis.cmdid import _PCProgramCmdId
from ..pb2.codemao_getactionlist_pb2 import GetActionListRequest, GetActionListResponse
from ..pb2.codemao_moverobot_pb2 import MoveRobotRequest, MoveRobotResponse
from ..pb2.codemao_playaction_pb2 import PlayActionRequest, PlayActionResponse
from ..pb2.codemao_playcustomaction_pb2 import PlayCustomActionRequest, PlayCustomActionResponse
from ..pb2.codemao_stopaction_pb2 import StopActionRequest, StopActionResponse
from ..pb2.codemao_stopcustomaction_pb2 import StopCustomActionRequest, StopCustomActionResponse
from ..pb2.pccodemao_message_pb2 import Message


[docs]class PlayAction(BaseApi): """Execute built-in action API The robot performs a built-in action with a specified name The action name can be obtained with GetActionList APi Args: is_serial (bool): Whether to wait for a reply, the default is True action_name (str): Action name, cannot be none or empty string #PlayActionResponse.isSuccess : True or False #PlayActionResponse.resultCode : Result Code """ def __init__(self, is_serial: bool = True, action_name: str = None): """执行动作api初始化 """ assert isinstance(action_name, str) and action_name is not None and len( action_name), 'PlayAction actionName should be available' self.__is_serial = is_serial self.__action_name = action_name
[docs] async def execute(self): """Send instructions to execute actions Returns: PlayActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = PlayActionRequest() request.actionName = self.__action_name cmd_id = _PCProgramCmdId.PLAY_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """解析回复指令 Args: message (Message):待解析的Message对象 Returns: PlayActionResponse """ if isinstance(message, Message): data = message.bodyData response = PlayActionResponse() response.ParseFromString(data) return response else: return None
[docs]class StopAllAction(BaseApi): """Stop all action API Stop all actions being performed, stop the specified custom action. if the action is an uninterruptible action, StopCustomActionResponse.resultCode = 403 Args is_serial (bool): Whether to wait for a reply, the default is True """ def __init__(self, is_serial: bool = True): self.__is_serial = is_serial
[docs] async def execute(self): """ Send command to stop all actions Returns: StopActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = StopActionRequest() cmd_id = _PCProgramCmdId.STOP_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: StopActionResponse #StopCustomActionResponse.isSuccess : 是否成功 #StopCustomActionResponse.resultCode : 返回码 """ if isinstance(message, Message): data = message.bodyData response = StopActionResponse() response.ParseFromString(data) return response else: return None
[docs]@enum.unique class MoveRobotDirection(enum.Enum): """Robot moving direction FORWARD : move forward BACKWARD : move backward LEFTWARD : move leftward RIGHTWARD : move rightward """ FORWARD = 3 # 向前 BACKWARD = 4 # 向后 LEFTWARD = 1 # 向左 RIGHTWARD = 2 # 向右
[docs]class MoveRobot(BaseApi): """Control robot moving-API Control the robot to move n steps in a certain direction (MoveRobotDirection) Args: is_serial (bool): Whether to wait for a reply, the default is True direction (MoveRobotDirection): Robot movement direction, default FORWARD, move forward step (int): Number of steps, default 1 step #MoveRobotResponse.isSuccess : True or False #MoveRobotResponse.code : Result Code """ def __init__(self, is_serial: bool = True, direction: MoveRobotDirection = MoveRobotDirection.FORWARD, step: int = 1): assert isinstance(direction, MoveRobotDirection), 'MoveRobot : direction should be MoveRobotDirection instance' assert isinstance(step, int) and step > 0, 'MoveRobot : step should be Positive' self.__is_serial = is_serial self.__direction = direction.value self.__step = step
[docs] async def execute(self): """Send robot movement instructions Returns: MoveRobotResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = MoveRobotRequest() request.direction = self.__direction request.step = self.__step cmd_id = _PCProgramCmdId.MOVE_ROBOT_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: MoveRobotResponse """ if isinstance(message, Message): data = message.bodyData response = MoveRobotResponse() response.ParseFromString(data) return response else: return None
[docs]@enum.unique class RobotActionType(enum.Enum): """ Robot action resource type INNER (built-in): the unmodifiable motion file built into the robot CUSTOM (custom): Action files that can be modified by the developer placed in the sdcard/customize/actions directory """ INNER = 0 # 内置 CUSTOM = 1 # 自定义
[docs]class GetActionList(BaseApi): """Get robot action resource list API Get a list of action files stored in the robot's local (built-in/custom) Args: is_serial (bool): Whether to wait for a reply, the default is True action_type (RobotActionType): Action type, default is INNER, built-in action #GetActionListResponse.actionList ([str]): Action list, str array #GetActionListResponse.isSuccess: Is it successful, True Or False #GetActionListResponse.resultCode: Result Code """ def __init__(self, is_serial: bool = True, action_type: RobotActionType = RobotActionType.INNER): assert isinstance(action_type, RobotActionType), 'GetActionList : action_type should be RobotActionType ' \ 'instance ' self.__is_serial = is_serial self.__action_type = action_type.value
[docs] async def execute(self): """Send command to get list of robot actions Returns: GetActionListResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = GetActionListRequest() request.actionType = self.__action_type cmd_id = _PCProgramCmdId.GET_ACTION_LIST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: GetActionListResponse """ if isinstance(message, Message): data = message.bodyData response = GetActionListResponse() response.ParseFromString(data) return response else: return None
[docs]class PlayCustomAction(BaseApi): """xecute custom action api Let the robot perform a custom action with a specified name. Action name can be obtained with GetActionList. Args: is_serial (bool): Whether to wait for a reply, the default is True action_name (str): custom action name, cannot be empty or None #PlayCustomActionResponse.isSuccess: Is it successful #PlayCustomActionResponse.resultCode: Return code """ def __init__(self, is_serial: bool = True, action_name: str = None): assert isinstance(action_name, str) and len(action_name) > 0, 'PlayCustomAction : actionName should be ' \ 'available ' self.__is_serial = is_serial self.__action_name = action_name
[docs] async def execute(self): """execute custom action instructions Returns: PlayCustomActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = PlayCustomActionRequest() request.actionName = self.__action_name cmd_id = _PCProgramCmdId.PLAY_CUSTOM_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: PlayCustomActionResponse """ if isinstance(message, Message): data = message.bodyData response = PlayCustomActionResponse() response.ParseFromString(data) return response else: return None
[docs]class StopCustomAction(BaseApi): """Stop custom action API Stop the specified custom action, if the action is an uninterruptible action, StopCustomActionResponse.resultCode = 403 Args: is_serial (bool): Whether to wait for a reply, the default is True action_name (str): custom action name, cannot be empty or None #StopCustomActionResponse.isSuccess: Is it successful #StopCustomActionResponse.resultCode: Return code """ def __init__(self, is_serial: bool = True, action_name: str = None): assert isinstance(action_name, str) and len(action_name) > 0, 'StopCustomAction actionName should be available' self.__is_serial = is_serial self.__action_name = action_name
[docs] async def execute(self): """Execute stop custom action command Returns: StopCustomActionResponse """ timeout = 0 if self.__is_serial: timeout = DEFAULT_TIMEOUT request = StopCustomActionRequest() request.actionName = self.__action_name cmd_id = _PCProgramCmdId.STOP_CUSTOM_ACTION_REQUEST.value return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: StopCustomActionResponse """ if isinstance(message, Message): data = message.bodyData response = StopCustomActionResponse() response.ParseFromString(data) return response else: return None