#!/usr/bin/env python3
import enum
from mini.pb2.codemao_speechrecognise_pb2 import SpeechRecogniseRequest, SpeechRecogniseResponse
from ..apis.base_api import BaseApi, DEFAULT_TIMEOUT
from ..apis.cmdid import _PCProgramCmdId
from ..pb2.codemao_faceanalyze_pb2 import FaceAnalyzeRequest, FaceAnalyzeResponse
from ..pb2.codemao_facedetect_pb2 import FaceDetectRequest, FaceDetectResponse
from ..pb2.codemao_facerecognise_pb2 import FaceRecogniseRequest, FaceRecogniseResponse
from ..pb2.codemao_getinfrareddistance_pb2 import GetInfraredDistanceRequest, GetInfraredDistanceResponse
from ..pb2.codemao_getregisterfaces_pb2 import GetRegisterFacesRequest, GetRegisterFacesResponse
from ..pb2.codemao_recogniseobject_pb2 import RecogniseObjectRequest, RecogniseObjectResponse
from ..pb2.codemao_takepicture_pb2 import TakePictureRequest, TakePictureResponse
from ..pb2.pccodemao_message_pb2 import Message
[文档]class FaceDetect(BaseApi):
"""检测人脸个数api
Args:
is_serial (bool): 是否等待回复,默认True
timeout (int): 超时时间,必须大于0
#FaceDetectResponse.count : 人脸个数
#FaceDetectResponse.isSuccess : 是否成功
#FaceDetectResponse.resultCode : 返回码
#FaceDetectResponse.commandId
"""
def __init__(self, is_serial: bool = True, timeout: int = 10):
assert isinstance(timeout, int) and timeout > 0, 'FaceDetect : timeout should be positive'
self.__is_serial = is_serial
self.__timeout = timeout
[文档] async def execute(self):
"""
执行检测人脸个数指令
Returns:
FaceDetectResponse
"""
timeout = 0
if self.__is_serial:
timeout = DEFAULT_TIMEOUT
request = FaceDetectRequest()
request.timeout = self.__timeout
cmd_id = _PCProgramCmdId.FACE_DETECT_REQUEST.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
FaceDetectResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = FaceDetectResponse()
response.ParseFromString(data)
return response
else:
return None
[文档]class FaceAnalysis(BaseApi):
"""人脸分析api
通过人脸识别,分析性别、年龄
当多人存在摄像头前时,返回脸面积最大的那个
Args:
is_serial (bool): 是否等待回复,默认True
timeout (int): 超时时间
返回示例:FaceAnalyzeResponse{"age":24,"gender":99,"height":238,"width":238}
#FaceAnalyzeResponse.faceInfos : 人脸信息数组[FaceInfoResponse]
#FaceInfoResponse.gender (int) :[0,100],小于50为女性,大于50为男性
#FaceInfoResponse.age : 年龄
#FaceInfoResponse.width : 人脸在摄像头画面中的宽度
#FaceInfoResponse.height : 人脸在摄像头画面中的高度
#FaceAnalyzeResponse.isSuccess : 是否成功
#FaceAnalyzeResponse.resultCode : 返回码
"""
def __init__(self, is_serial: bool = True, timeout: int = 10):
assert isinstance(timeout, int) and timeout > 0, 'FaceAnalysis : timeout should be positive'
self.__is_serial = is_serial
self.__timeout = timeout
[文档] async def execute(self):
"""
执行人脸分析指令
Returns:
FaceAnalyzeResponse
"""
timeout = 0
if self.__is_serial:
timeout = DEFAULT_TIMEOUT
request = FaceAnalyzeRequest()
request.timeout = self.__timeout
cmd_id = _PCProgramCmdId.FACE_ANALYSIS_REQUEST.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
FaceAnalyzeResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = FaceAnalyzeResponse()
response.ParseFromString(data)
return response
else:
return None
[文档]@enum.unique
class ObjectRecogniseType(enum.Enum):
"""
物体识别类型
FRUIT : 水果
GESTURE : 手势
FLOWER : 花
"""
FRUIT = 1 # 水果
GESTURE = 2 # 手势
FLOWER = 3 # 花
[文档]class ObjectRecognise(BaseApi):
"""物体识别api
机器人通过摄像头识别相应的物体(水果/手势/花)
Args:
is_serial (bool): 是否等待回复,默认True
object_type (ObjectRecogniseType): 物体识别类型,默认FRUIT,水果
timeout (int): 超时时间
#RecogniseObjectResponse.objects : 物体名数组[str]
#RecogniseObjectResponse.isSuccess : 是否成功
#RecogniseObjectResponse.resultCode : 返回码
"""
def __init__(self, is_serial: bool = True, object_type: ObjectRecogniseType = ObjectRecogniseType.FRUIT,
timeout: int = 10):
assert isinstance(timeout, int) and timeout > 0, 'ObjectRecognise : timeout should be positive'
assert isinstance(object_type, ObjectRecogniseType), 'ObjectRecognise : objectType should be ' \
'ObjectRecogniseType instance '
self.__is_serial = is_serial
self.__object_type = object_type.value
self.__timeout = timeout
[文档] async def execute(self):
"""
执行物体识别指令
Returns:
RecogniseObjectResponse
"""
timeout = 0
if self.__is_serial:
timeout = DEFAULT_TIMEOUT
request = RecogniseObjectRequest()
request.objectType = self.__object_type
request.timeout = self.__timeout
cmd_id = _PCProgramCmdId.RECOGNISE_OBJECT_REQUEST.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
RecogniseObjectResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = RecogniseObjectResponse()
response.ParseFromString(data)
return response
else:
return None
[文档]class FaceRecognise(BaseApi):
"""人脸识别api
机器人通过摄像头识别出所有的人脸信息(熟人/陌生人)
Args:
is_serial (bool): 是否等待回复,默认True
timeout (int): 超时时间
#FaceRecogniseResponse.faceInfos([FaceInfoResponse]) : 人脸信息数组
#FaceInfoResponse.id : 人脸id
#FaceInfoResponse.name : 姓名,如果是陌生人,则默认name为"stranger"
#FaceRecogniseResponse.isSuccess : 是否成功
#FaceRecogniseResponse.resultCode : 返回码
#FaceRecogniseResponse.commandId
"""
def __init__(self, is_serial: bool = True, timeout: int = 10):
assert isinstance(timeout, int) and timeout > 0, 'ObjectRecognise : timeout should be positive'
self.__is_serial = is_serial
self.__timeout = timeout
[文档] async def execute(self):
"""
执行人脸识别指令
Returns:
FaceRecogniseResponse
"""
timeout = 0
if self.__is_serial:
timeout = DEFAULT_TIMEOUT
request = FaceRecogniseRequest()
request.timeout = self.__timeout
cmd_id = _PCProgramCmdId.FACE_RECOGNISE_REQUEST.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
FaceRecogniseResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = FaceRecogniseResponse()
response.ParseFromString(data)
return response
else:
return None
[文档]@enum.unique
class TakePictureType(enum.Enum):
"""
拍照类型
IMMEDIATELY : 立即拍照
FINDFACE : 寻找人脸拍照
"""
IMMEDIATELY = 0 # 立即拍照
FINDFACE = 1 # 寻找人脸拍照
[文档]class TakePicture(BaseApi):
"""拍照api
控制机器人拍照
Args:
is_serial (bool): 是否等待回复,默认True
take_picture_type (TakePictureType): 拍照类型,默认IMMEDIATELY,立即拍照
#TakePictureResponse.isSuccess : 是否成功
#TakePictureResponse.code : 返回码
#TakePictureResponse.picPath : 照片在机器人里的存储路径(sdcard/)
"""
def __init__(self, is_serial: bool = True, take_picture_type: TakePictureType = TakePictureType.IMMEDIATELY):
assert isinstance(take_picture_type, TakePictureType), 'TakePicture : take_picture_type should be ' \
'TakePictureType instance '
self.__is_serial = is_serial
self.__type = take_picture_type.value
[文档] async def execute(self):
"""
执行拍照指令
Returns:
TakePictureResponse
"""
timeout = 0
if self.__is_serial:
timeout = DEFAULT_TIMEOUT
request = TakePictureRequest()
request.type = self.__type
cmd_id = _PCProgramCmdId.TAKE_PICTURE_REQUEST.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
TakePictureResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = TakePictureResponse()
response.ParseFromString(data)
return response
else:
return None
[文档]class GetInfraredDistance(BaseApi):
"""获取红外距离api
获取距离机器人正面最近的障碍物的红外距离
Args:
is_serial (bool): 是否等待回复,默认True
#GetInfraredDistanceResponse.distance : 红外距离
"""
def __init__(self, is_serial: bool = True):
self.__is_serial = is_serial
[文档] async def execute(self):
"""
执行获取红外距离的指令
Returns:
GetInfraredDistanceResponse
"""
timeout = 0
if self.__is_serial:
timeout = DEFAULT_TIMEOUT
request = GetInfraredDistanceRequest()
cmd_id = _PCProgramCmdId.GET_INFRARED_DISTANCE_REQUEST.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
GetInfraredDistanceResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = GetInfraredDistanceResponse()
response.ParseFromString(data)
return response
else:
return None
[文档]class GetRegisterFaces(BaseApi):
"""获取已注册的人脸列表api
获取机器人中已注册的人脸列表数据
Args:
is_serial (bool): 是否等待回复,默认True
#GetRegisterFacesResponse.faceInfos([FaceInfoResponse]) : 人脸信息数组
#FaceInfoResponse.id : 人脸id
#FaceInfoResponse.name : 姓名
#GetRegisterFacesResponse.isSuccess : 是否成功
#GetRegisterFacesResponse.resultCode : 返回码
"""
def __init__(self, is_serial: bool = True):
self.__is_serial = is_serial
[文档] async def execute(self):
"""
执行获取已注册人脸列表指令
Returns:
GetRegisterFacesResponse
"""
timeout = 0
if self.__is_serial:
timeout = DEFAULT_TIMEOUT
request = GetRegisterFacesRequest()
cmd_id = _PCProgramCmdId.GET_REGISTER_FACES_REQUEST.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
GetRegisterFacesResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = GetRegisterFacesResponse()
response.ParseFromString(data)
return response
else:
return None
[文档]class StartSpeechRecognise(BaseApi):
"""机器人语音识别Apiapi
控制机器人开始录音
Args:
is_serial (bool): 是否等待回复,默认True
time_limit (int): 录音时长,单位ms,默认60000ms,即60s
#ControlRobotRecordResponse.isSuccess : 是否成功
#ControlRobotRecordResponse.resultCode : 返回码
#ControlRobotRecordResponse.id : 生成的录音文件名
"""
def __init__(self, is_serial: bool = True, time_limit: int = 60000, asr_text: str = ""):
assert isinstance(time_limit, int) and time_limit > 0, f'{self.__class__.__name__} : time_limit should be ' \
f'positive '
self.__isSerial = is_serial
self.__timeLimit = time_limit
self.__asrText = asr_text
[文档] async def execute(self):
"""
执行语音识别指令
Returns:
SpeechRecogniseResponse
"""
timeout = 0
if self.__isSerial:
timeout = DEFAULT_TIMEOUT
request = SpeechRecogniseRequest()
request.timeLimit = self.__timeLimit
request.asrText = self.__asrText
cmd_id = _PCProgramCmdId.SPEECH_RECOGNISE.value
return await self.send(cmd_id, request, timeout)
def _parse_msg(self, message):
"""
Args:
message (Message):待解析的Message对象
Returns:
SpeechRecogniseResponse
"""
if isinstance(message, Message):
data = message.bodyData
response = SpeechRecogniseResponse()
response.ParseFromString(data)
return response
else:
return None