mini.apis.api_observe 源代码

#!/usr/bin/env python3

# 每种传感器做一个单例对象进行监听

import asyncio
import enum

from ..apis.base_api import BaseEventApi, BaseApiNoNeedResponse
from ..apis.cmdid import _PCProgramCmdId
from ..pb2.codemao_facedetecttask_pb2 import FaceDetectTaskRequest, FaceDetectTaskResponse
from ..pb2.codemao_facerecognisetask_pb2 import FaceRecogniseTaskRequest, FaceRecogniseTaskResponse
from ..pb2.codemao_observefallclimb_pb2 import ObserveFallClimbRequest, ObserveFallClimbResponse
from ..pb2.codemao_observeheadracket_pb2 import ObserveHeadRacketRequest, ObserveHeadRacketResponse
from ..pb2.codemao_observeinfrareddistance_pb2 import ObserveInfraredDistanceRequest, ObserveInfraredDistanceResponse
from ..pb2.codemao_speechrecognise_pb2 import SpeechRecogniseRequest, SpeechRecogniseResponse
from ..pb2.codemao_stopspeechrecognise_pb2 import StopSpeechRecogniseRequest, StopSpeechRecogniseResponse
from ..pb2.pccodemao_message_pb2 import Message


[文档]class ObserveSpeechRecognise(BaseEventApi): """监听语音识别api 监听语音识别事件,机器人上报语音识别后的文字 #SpeechRecogniseResponse.text : 识别后的文字 #SpeechRecogniseResponse.isSuccess : 是否成功 #SpeechRecogniseResponse.resultCode : 返回码 """
[文档] async def execute(self): pass
def __init__(self): cmd_id = _PCProgramCmdId.SPEECH_RECOGNISE.value message = SpeechRecogniseRequest() BaseEventApi.__init__(self, cmd_id=cmd_id, message=message) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: SpeechRecogniseResponse """ if isinstance(message, Message): data = message.bodyData response = SpeechRecogniseResponse() response.ParseFromString(data) return response else: return None
[文档] def stop(self): """ 停止语音识别,停止监听语音识别 Returns: None """ asyncio.create_task(_StopSpeechRecognise().execute()) super().stop()
class _StopSpeechRecognise(BaseApiNoNeedResponse): """停止语音识别api #StopSpeechRecogniseResponse.isSuccess : 是否成功 #StopSpeechRecogniseResponse.resultCode : 返回码 """ async def execute(self): """ 执行停止语音识别指令 Returns: bool: 是否发送指令成功 """ request = StopSpeechRecogniseRequest() cmd_id = _PCProgramCmdId.STOP_SPEECH_RECOGNISE_REQUEST.value return await self.send(cmd_id, request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: StopSpeechRecogniseResponse """ if isinstance(message, Message): data = message.bodyData response = StopSpeechRecogniseResponse() response.ParseFromString(data) return response else: return None
[文档]class ObserveFaceDetect(BaseEventApi): """监听人脸个数api 监听人脸个数事件,机器人上报检测到的人脸个数 单次检测超时时间1s,侦测间隔1s # FaceDetectTaskResponse.count(int) : 人脸个数 # FaceDetectTaskResponse.isSuccess : 是否成功 # FaceDetectTaskResponse.resultCode : 返回码 """
[文档] async def execute(self): pass
def __init__(self): cmd_id = _PCProgramCmdId.FACE_DETECT_TASK_REQUEST.value request = FaceDetectTaskRequest() # 单次侦测超时时间 request.timeout = 1000 # 侦测间隔时间 request.period = 1000 # 任务延时时间 request.delay = 0 # 检测开关 request.switch = True BaseEventApi.__init__(self, cmd_id=cmd_id, message=request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: FaceDetectTaskResponse """ if isinstance(message, Message): data = message.bodyData response = FaceDetectTaskResponse() response.ParseFromString(data) return response else: return None
[文档] def stop(self): """ 停止人脸个数检测,停止监听人脸个数 Returns: None """ asyncio.create_task(_StopFaceDetect().execute()) super().stop()
class _StopFaceDetect(BaseApiNoNeedResponse): """停止人脸个数检测api """ async def execute(self): """ 执行停止人脸个数检测指令 Returns: None """ request = FaceDetectTaskRequest() # 检测开关 request.switch = False cmd_id = _PCProgramCmdId.FACE_DETECT_TASK_REQUEST.value return await self.send(cmd_id, request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: FaceDetectTaskResponse """ if isinstance(message, Message): data = message.bodyData response = FaceDetectTaskResponse() response.ParseFromString(data) return response else: return None
[文档]class ObserveFaceRecognise(BaseEventApi): """监听人脸识别api 监听人脸识别事件,机器人上报识别到的人脸信息(数组) 如果是已注册的人脸,返回人脸详细信息:id,名字,性别,年龄 如果是陌生人,返回 name: "stranger" 单次检测超时时间1s,侦测间隔1s # FaceRecogniseTaskResponse.faceInfos: [FaceInfoResponse] 人脸信息数组 # FaceInfoResponse.id, FaceInfoResponse.name,FaceInfoResponse.gender,FaceInfoResponse.age:人脸详细信息 # FaceRecogniseTaskResponse.isSuccess:是否成功 # FaceRecogniseTaskResponse.resultCode:返回码 """
[文档] async def execute(self): pass
def __init__(self): cmd_id = _PCProgramCmdId.FACE_RECOGNISE_TASK_REQUEST.value request = FaceRecogniseTaskRequest() # 单次侦测超时时间 request.timeout = 1000 # 侦测间隔时间 request.period = 1000 # 任务延时时间 request.delay = 0 # 检测开关 request.switch = True BaseEventApi.__init__(self, cmd_id=cmd_id, message=request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: FaceRecogniseTaskResponse """ if isinstance(message, Message): data = message.bodyData response = FaceRecogniseTaskResponse() response.ParseFromString(data) return response else: return None
[文档] def stop(self): """ 停止人脸识别,停止监听人脸识别 Returns: None """ asyncio.create_task(_StopFaceRecognise().execute()) super().stop()
class _StopFaceRecognise(BaseApiNoNeedResponse): """停止人脸识别api """ async def execute(self): """ 执行停止人脸识别指令 Returns: FaceRecogniseTaskResponse """ request = FaceRecogniseTaskRequest() # 检测开关 request.switch = False cmd_id = _PCProgramCmdId.FACE_RECOGNISE_TASK_REQUEST.value return await self.send(cmd_id, request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: FaceRecogniseTaskResponse """ if isinstance(message, Message): data = message.bodyData response = FaceRecogniseTaskResponse() response.ParseFromString(data) return response else: return None
[文档]class ObserveInfraredDistance(BaseEventApi): """监听红外距离api 监听红外距离事件,机器人上报检测到的与面前最近障碍物的红外距离 检测周期1s # ObserveInfraredDistanceResponse.distance:红外距离 """
[文档] async def execute(self): pass
def __init__(self): cmd_id = _PCProgramCmdId.SUBSCRIBE_INFRARED_DISTANCE_REQUEST.value request = ObserveInfraredDistanceRequest() # 检测周期 request.samplingPeriod = 1000 # 检测开关 request.isSubscribe = True BaseEventApi.__init__(self, cmd_id=cmd_id, message=request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: ObserveInfraredDistanceResponse """ if isinstance(message, Message): data = message.bodyData response = ObserveInfraredDistanceResponse() response.ParseFromString(data) return response else: return None
[文档] def stop(self): """ 停止红外距离检测,停止监听红外距离 Returns: None """ asyncio.create_task(_StopObserveInfraredDistance().execute()) super().stop()
class _StopObserveInfraredDistance(BaseApiNoNeedResponse): """停止红外距离监测api """ async def execute(self): """ 执行停止红外监测指令 Returns: ObserveInfraredDistanceResponse """ request = ObserveInfraredDistanceRequest() # 检测开关 request.isSubscribe = False cmd_id = _PCProgramCmdId.FACE_RECOGNISE_TASK_REQUEST.value return await self.send(cmd_id, request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: ObserveInfraredDistanceResponse """ if isinstance(message, Message): data = message.bodyData response = ObserveInfraredDistanceResponse() response.ParseFromString(data) return response else: return None
[文档]@enum.unique class RobotPosture(enum.Enum): """ 机器人姿势 STAND : 站立 SPLITS_LEFT : 左弓步 SPLITS_RIGHT : 右弓步 SIT_DOWN : 坐下 SQUAT_DOWN : 蹲下 KNEELING : 跪下 LYING : 侧躺 LYING_DOWN : 平躺 SPLITS_LEFT_1 : 左劈叉 SPLITS_RIGHT_2 : 右劈叉 BEND : 弯腰 """ STAND = 1 # 站立 SPLITS_LEFT = 2 # 左弓步 SPLITS_RIGHT = 3 # 右弓步 SIT_DOWN = 4 # 坐下 SQUAT_DOWN = 5 # 蹲下 KNEELING = 6 # 跪下 LYING = 7 # 侧躺 LYING_DOWN = 8 # 平躺 SPLITS_LEFT_1 = 9 # 左劈叉 SPLITS_RIGHT_2 = 10 # 右劈叉 BEND = 11 # 弯腰
[文档]class ObserveRobotPosture(BaseEventApi): """监听机器人姿态api 监听机器人姿态变化事件,机器上报当前的姿态RobotPosture(当发生姿态发生改变的时候) #ObserveFallClimbResponse.status : 机器姿态,数值对应RobotPosture """
[文档] async def execute(self): pass
def __init__(self): cmd_id = _PCProgramCmdId.SUBSCRIBE_ROBOT_POSTURE_REQUEST.value request = ObserveFallClimbRequest() # 检测开关 request.isSubscribe = True BaseEventApi.__init__(self, cmd_id=cmd_id, message=request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: ObserveFallClimbResponse """ if isinstance(message, Message): data = message.bodyData response = ObserveFallClimbResponse() response.ParseFromString(data) return response else: return None
[文档] def stop(self): """ 停止机器人姿势检测,停止监听机器人姿势 Returns: None """ asyncio.create_task(_StopObserveRobotPosture().execute()) super().stop()
class _StopObserveRobotPosture(BaseApiNoNeedResponse): """停止监听机器人姿态api """ async def execute(self): """ 执行停止机器人姿态监测 Returns: ObserveFallClimbResponse """ request = ObserveFallClimbRequest() # 检测开关 request.isSubscribe = False cmd_id = _PCProgramCmdId.SUBSCRIBE_ROBOT_POSTURE_REQUEST.value return await self.send(cmd_id, request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: ObserveFallClimbResponse """ if isinstance(message, Message): data = message.bodyData response = ObserveFallClimbResponse() response.ParseFromString(data) return response else: return None
[文档]@enum.unique class HeadRacketType(enum.Enum): """ 拍头方式 SINGLE_CLICK : 单击 LONG_PRESS : 长按 DOUBLE_CLICK : 双击 """ SINGLE_CLICK = 1 # 单击 LONG_PRESS = 2 # 长按 DOUBLE_CLICK = 3 # 双击
[文档]class ObserveHeadRacket(BaseEventApi): """监听拍头事件api 监听拍头事件,当机器人头部被拍击时,上报拍头类型 # ObserveHeadRacketResponse.type : 拍头类型,HeadRacketType """
[文档] async def execute(self): pass
def __init__(self): cmd_id = _PCProgramCmdId.SUBSCRIBE_HEAD_RACKET_REQUEST.value message = ObserveHeadRacketRequest() # 检测开关 message.isSubscribe = True BaseEventApi.__init__(self, cmd_id=cmd_id, message=message) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: ObserveHeadRacketResponse """ if isinstance(message, Message): data = message.bodyData response = ObserveHeadRacketResponse() response.ParseFromString(data) return response else: return None
[文档] def stop(self): """ 停止机器人拍头事件检测,停止监听机器人拍头事件 Returns: None """ asyncio.create_task(_StopObserveHeadRacket().execute()) super().stop()
class _StopObserveHeadRacket(BaseApiNoNeedResponse): """停止拍头事件监测api """ async def execute(self): """ 执行停止拍头事件监测指令 Returns: ObserveHeadRacketResponse """ request = ObserveHeadRacketRequest() # 检测开关 request.isSubscribe = False cmd_id = _PCProgramCmdId.SUBSCRIBE_ROBOT_POSTURE_REQUEST.value return await self.send(cmd_id, request) def _parse_msg(self, message): """ Args: message (Message):待解析的Message对象 Returns: ObserveHeadRacketResponse """ if isinstance(message, Message): data = message.bodyData response = ObserveHeadRacketResponse() response.ParseFromString(data) return response else: return None