import asyncio
import logging
from asyncio.futures import Future
import enum
from google.protobuf import message as _message
from typing import Any, Set, Optional
from mini import MoveRobotDirection, MiniApiResultType, MouthLampMode, \
MouthLampColor, ServicePlatform, LanType
from mini.pb2.codemao_faceanalyze_pb2 import FaceAnalyzeResponse
from mini.pb2.codemao_facedetect_pb2 import FaceDetectResponse
from mini.pb2.codemao_facerecognise_pb2 import FaceRecogniseResponse
from mini.pb2.codemao_getaudiolist_pb2 import GetAudioListResponse
from mini.pb2.codemao_getinfrareddistance_pb2 import GetInfraredDistanceResponse
from mini.pb2.codemao_getregisterfaces_pb2 import GetRegisterFacesResponse
from mini.pb2.codemao_recogniseobject_pb2 import RecogniseObjectResponse
from mini.pb2.codemao_takepicture_pb2 import TakePictureResponse
from .channels.websocket_client import AbstractMsgHandler as _AbstractMsgHandler
from .channels.websocket_client import ubt_websocket as _websocket
from .dns.dns_browser import WiFiDeviceListener, WiFiDevice
from .dns.dns_browser import browser as _browser
_log = logging.getLogger(__name__)
_log.addHandler(logging.StreamHandler())
if _log.level == logging.NOTSET:
_log.setLevel(logging.INFO)
browser = _browser()
websocket = _websocket()
[docs]def set_log_level(level: int, save_file: str = None):
"""Set the sdk log level
Args:
level: logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR
save_file: Need to save to a file, fill in the log file path
"""
_log.setLevel(level)
from .dns.dns_browser import log as log1
log1.setLevel(level)
from .channels.websocket_client import log as log2
log2.setLevel(level)
from .dns.zeroconf import log as log3
log3.setLevel(level)
log3.addHandler(logging.StreamHandler())
if save_file is not None:
file_handler = logging.FileHandler(save_file)
_log.addHandler(file_handler)
log1.addHandler(file_handler)
log2.addHandler(file_handler)
[docs]@enum.unique
class RobotType(enum.Enum):
"""
Robot Product Type
"""
DEDU = 1
"""
AlphaMini China Education Edition
"""
MINI = 2
"""
AlphaMini China Standard Edition
"""
EDU = 3
"""
AlphaMini Oversea Education Edition
"""
KOR = 4
"""
AlphaMini Korea Edition
"""
OTOC = 5
"""
AlphaMini Oversea ToC Edition
"""
[docs]def set_robot_type(robot: RobotType):
"""Set the robot product type to be linked
Args:
robot: Value is: RobotType.EDU, RobotType.KOR, RobotType.MINI or RobotType.DEDU
"""
if robot == RobotType.MINI:
from .dns import dns_browser
dns_browser.service_type = "_Mini_mini_channel_server._tcp.local."
elif robot == RobotType.DEDU:
from .dns import dns_browser
dns_browser.service_type = "_Dedu_mini_channel_server._tcp.local."
elif robot == RobotType.EDU:
from .dns import dns_browser
dns_browser.service_type = "_Edu_mini_channel_server._tcp.local."
elif robot == RobotType.KOR:
from .dns import dns_browser
dns_browser.service_type = "_Kor_mini_channel_server._tcp.local."
elif robot == RobotType.OTOC:
from .dns import dns_browser
dns_browser.service_type = "_OToC_mini_channel_server._tcp.local."
else:
print(f"不支持的机器人产品类型")
class _GetWiFiDeviceListListener(WiFiDeviceListener):
"""Obtain the monitoring of robot equipment in batches
Args:
devices: Set[WiFiDevice] or None
"""
devices: Set[WiFiDevice]
def __init__(self, devices):
self.devices: Set[WiFiDevice] = devices or set()
def on_device_updated(self, device: WiFiDevice) -> None:
"""
机器人设备更新了
Args:
device: WiFiDevice
"""
self.devices.update([device])
def on_device_removed(self, device: WiFiDevice) -> None:
"""
机器人设备从局域网中移除了
Args:
device: WFiDevice
"""
self.devices.remove(device)
def on_device_found(self, device: WiFiDevice) -> None:
"""
扫描到一个机器人设备
Args:
device: WFiDevice
"""
self.devices.add(device)
async def _get_user_input(devices: tuple) -> int:
try:
i: int = 0
for device in devices:
print('{0}.{1}'.format(i, device))
i += 1
num_text = input(f'请输入选择连接的机器人序号:')
except Exception as e:
raise e
else:
return int(num_text)
def _start_scan(loop: asyncio.AbstractEventLoop, name: str) -> Future:
"""
开启一个扫描机器人设备的Future
Args:
loop: 当前事件loop
name: 指定设备名称
Returns:
asyncio.Future
"""
fut = loop.create_future()
class _InnerLister(WiFiDeviceListener):
@staticmethod
def set_result(future: Future, device: WiFiDevice):
if future.cancelled() or future.done():
return
_log.info(f"found device : {device}")
future.set_result(device)
def on_device_found(self, device: WiFiDevice) -> None:
if device.name.endswith(name):
if fut.cancelled() or fut.done():
return
loop.run_in_executor(None, browser.stop_scan)
loop.call_soon(_InnerLister.set_result, fut, device)
def on_device_updated(self, device: WiFiDevice) -> None:
if device.name.endswith(name):
if fut.cancelled() or fut.done():
return
loop.run_in_executor(None, browser.stop_scan)
loop.call_soon(_InnerLister.set_result, fut, device)
def on_device_removed(self, device: WiFiDevice) -> None:
if device.name.endswith(name):
if fut.cancelled() or fut.done():
return
loop.call_soon(_InnerLister.set_result, fut, device)
_log.info("start scanning...")
browser.add_listener(_InnerLister())
browser.start_scan(0)
return fut
async def _get_device_by_name(name: str, timeout: int) -> Optional[WiFiDevice]:
"""
获取当前局域网内,指定名字的机器人设备信息
Args:
name: 设备序列号
timeout: 扫描超时时间
Returns:
Optional[WiFiDevice]
"""
async def start_scan_async():
return await _start_scan(asyncio.get_running_loop(), name)
try:
device: WiFiDevice = await asyncio.wait_for(start_scan_async(), timeout)
return device
except asyncio.TimeoutError:
_log.warning(f'scan device timeout')
return None
finally:
browser.stop_scan()
_log.info("stop scan finished.")
async def _get_device_list(timeout: int) -> tuple:
"""
获取当前局域网内所有机器人设备信息
Args:
timeout: 超时时间
Returns:
Optional[WiFiDevice]
"""
devices: Set[WiFiDevice] = set()
browser.add_listener(_GetWiFiDeviceListListener(devices))
browser.start_scan(0)
await asyncio.sleep(timeout)
browser.remove_all_listener()
browser.stop_scan()
return tuple(devices)
async def _connect(device: WiFiDevice) -> bool:
"""
连接机器人设备
Args:
device (WiFiDevice): 指定机器人设备对象
Returns:
bool: 是否连接设备成功
"""
return await websocket.connect(device.address)
def _register_msg_handler(cmd: int, handler: _AbstractMsgHandler):
"""
注册命令监听器
Args:
cmd: 支持的命令请查看: mini.apis.cmdid
handler: 命令处理器
"""
websocket.register_msg_handler(cmd, handler)
def _unregister_msg_handler(cmd: int, handler: _AbstractMsgHandler):
"""
反注册命令监听器
Args:
cmd: 支持的命令请查看: mini.apis.cmdid
handler: 命令处理器
"""
websocket.unregister_msg_handler(cmd, handler)
async def _send_msg(cmd: int, message: _message.Message, timeout: int) -> Any:
"""
发送一个消息给机器人
Args:
cmd:支持的命令请查看: mini.apis.cmdid
message: 消息类在: mini.pb2 包内
timeout: 超时时间
"""
return await websocket.send_msg(cmd, message, timeout)
async def _release():
"""
断开链接,释放资源
"""
await websocket.shutdown()
# -----------------------------------------------------------------#
[docs]async def get_device_by_name(name: str, timeout: int) -> Optional[WiFiDevice]:
"""
Get the robot device information with the specified name in the current LAN
Args:
name: device serial number
timeout: scan timeout
Returns:
Optional[WiFiDevice]
"""
return await _get_device_by_name(name, timeout)
[docs]async def get_device_list(timeout: int) -> tuple:
"""Get all the robot device information in the current LAN
Args:
timeout: timeout
Returns:
Optional[WiFiDevice]
"""
return await _get_device_list(timeout)
[docs]async def connect(device: WiFiDevice) -> bool:
"""Connect robot equipment
Args:
device (WiFiDevice): Specify the robot device object
Returns:
bool: Whether the device is connected successfully
"""
return await _connect(device)
[docs]async def release():
"""
Disconnect the link and release resources
"""
await _release()
[docs]async def enter_program() -> bool:
"""Enter programming mode api
Robot enters programming mode
Returns:
bool
"""
from mini.apis.api_setup import StartRunProgram
(resultType, response) = await StartRunProgram().execute()
await asyncio.sleep(6)
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def quit_program() -> bool:
"""Exit programming mode api
Robot exits programming mode
Returns:
bool
"""
from mini.apis.api_setup import StopRunProgram
return await StopRunProgram(is_serial=False).execute()
[docs]async def play_action(action_name: str = None) -> bool:
"""Execute built-in action api
The robot performs a built-in action with a specified name
The action name can be obtained with get_action_list
Args:
action_name (str): action name
Returns:
bool
"""
from mini.apis.api_action import PlayAction
block: PlayAction = PlayAction(True, action_name)
(resultType, response) = await block.execute()
_log.info(f'play_action result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def stop_action() -> bool:
"""Stop action
If the action is an uninterruptible action, stop_action returns False
Returns:
bool
"""
from mini.apis.api_action import StopAllAction
block: StopAllAction = StopAllAction()
(resultType, response) = await block.execute()
_log.info(f'stop_action result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def play_custom_action(action_name: str = None) -> bool:
"""Execute custom action api
Let the robot perform a custom action with a specified name and wait for the result
The action name can be obtained by get_custom_action_list
Args:
action_name (str): custom action name
Returns:
bool
"""
from mini.apis.api_action import PlayCustomAction
block: PlayCustomAction = PlayCustomAction(True, action_name)
(resultType, response) = await block.execute()
_log.info(f'play_custom_action result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def stop_custom_action() -> bool:
"""Stop custom action
If the action is an uninterruptible action, stop_action returns False
Returns:
bool
"""
from mini.apis.api_action import StopCustomAction
block: StopCustomAction = StopCustomAction()
(resultType, response) = await block.execute()
_log.info(f'stop_custom_action result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def move(direction: MoveRobotDirection = MoveRobotDirection.FORWARD,
step: int = 1) -> bool:
"""Control robot movement
Control the robot to move 10 steps to the left (LEFTWARD) and wait for the execution result
Args:
direction (MoveRobotDirection): direction
step (int): number of steps
Returns:
bool
"""
from mini.apis.api_action import MoveRobot
block: MoveRobot = MoveRobot(True, direction, step)
(resultType, response) = await block.execute()
_log.info(f'move result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def get_action_list() -> list:
"""Get action list
Get the list of actions built into the robot system, and wait for the reply result
Returns:
[]: Action list
"""
from mini.apis.api_action import GetActionList
from mini import RobotActionType
block: GetActionList = GetActionList(True, RobotActionType.INNER)
(resultType, response) = await block.execute()
if resultType == MiniApiResultType.Success and response.isSuccess:
return response.actionList
else:
return []
[docs]async def get_custom_action_list() -> list:
"""Get a list of custom actions
Get the action list under the robot/sdcard/customize/actions and wait for the reply result
Returns:
[]: Custom action list
"""
from mini.apis.api_action import GetActionList
from mini import RobotActionType
block: GetActionList = GetActionList(True, RobotActionType.INNER)
(resultType, response) = await block.execute()
if resultType == MiniApiResultType.Success and response.isSuccess:
return response.actionList
else:
return []
[docs]async def wiki(query: str) -> bool:
"""Query encyclopedia demo
Query encyclopedia, for example: query content "you must choose", and wait for the result, the robot broadcasts the
query result
Args:
query (str): query keyword
Returns:
bool
"""
from mini.apis.api_content import QueryWiKi
block: QueryWiKi = QueryWiKi(True, query)
(resultType, response) = await block.execute()
_log.info(f'wiki result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def translate(query: str,
from_lan: LanType = None, to_lan: LanType = None,
platform: ServicePlatform = ServicePlatform.BAIDU) -> bool:
"""translation
For example: Use Baidu Translate to translate "张学友" from Chinese to English, and wait for the result,
the robot broadcasts the translation result:
translate(query="张学友",from_lan=CN,to_lan=EN,platform=GOOGLE)
Args:
query (str): keyword
from_lan (LanType): source language
to_lan (LanType): target language
platform (ServicePlatform): GOOGLE, Only Support Google
Returns:
bool
"""
from mini.apis.api_content import StartTranslate
block: StartTranslate = StartTranslate(True, query, from_lan=from_lan, to_lan=to_lan, platform=platform)
(resultType, response) = await block.execute()
_log.info(f'translate result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def play_expression(express_name: str) -> bool:
"""Do an Expression
Let the robot play a built-in expression of express_name and wait for the reply result
Args:
express_name (str): emoticon file name, such as: "codemao1"
Returns:
bool
"""
from mini.apis.api_expression import PlayExpression
block: PlayExpression = PlayExpression(True, express_name)
(resultType, response) = await block.execute()
_log.info(f'play expression result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def play_behavior(behavior_name: str) -> bool:
"""Do a dance
Let the robot start to dance a dance named behavior_name and wait for the response result
Args:
behavior_name (str): e.g. "dance_0004"
Returns:
bool
"""
from mini.apis.api_behavior import StartBehavior
block: StartBehavior = StartBehavior(True, behavior_name)
(resultType, response) = await block.execute()
_log.info(f'play behavior result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def stop_behavior() -> bool:
"""stop dance
Returns:
bool
"""
from mini.apis.api_behavior import StopBehavior
block: StopBehavior = StopBehavior(True)
(resultType, response) = await block.execute()
_log.info(f'stop behavior result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def set_MouthLamp_NormalMode(color: MouthLampColor = MouthLampColor.RED, duration: int = 1000) -> bool:
"""
Set mouth light always on mode and wait for the result
Args:
color (MouthLampColor): Mouth lamp color, default RED
duration (int): unit ms, duration
Returns:
bool
"""
return await _set_mouthlamp_mode(mode=MouthLampMode.NORMAL, color=color, duration=duration)
[docs]async def set_MouthLamp_BreathMode(breath_duration: int = 1000,
color: MouthLampColor = MouthLampColor.RED, duration: int = 1000) -> bool:
"""
Set mouth light breathing mode and wait for the result
Args:
breath_duration (int): unit ms, the duration of one breath
color (MouthLampColor): Mouth lamp color, default RED
duration (int): unit ms, duration
Returns:
bool
"""
return await _set_mouthlamp_mode(mode=MouthLampMode.BREATH, breath_duration=breath_duration, color=color,
duration=duration)
async def _set_mouthlamp_mode(mode: MouthLampMode = MouthLampMode.NORMAL, color: MouthLampColor = MouthLampColor.RED,
duration: int = 1000, breath_duration: int = 1000) -> bool:
"""Set mouth light mode
Set the robot's mouth light to normal mode, green and always on for 3s, and wait for the reply result
When mode=NORMAL, the duration parameter works, indicating how long it will be on
When mode=BREATH, the breath_duration parameter indicates how often to breathe
After the setting takes effect, the robot will immediately return the
setting result (it has nothing to do with the set duration parameter)
Args:
mode: mouth light mode, 0: normal mode, 1: breathing mode
color: the color of the mouth light, 1: red, 2: green, 3: blue
duration: duration, in milliseconds, -1 means always on
breath_duration: the duration of one blink, in milliseconds
Returns:
bool
"""
from mini.apis.api_expression import SetMouthLamp
block: SetMouthLamp = SetMouthLamp(True, mode, color, duration, breath_duration)
(resultType, response) = await block.execute()
_log.info(f'set MouthLamp mode result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def switch_MouthLamp(is_open: bool = True) -> bool:
"""Switch mouth light
Turn on and off the robot's mouth light and wait for the result
Args:
is_open: bool
Returns:
bool
"""
from mini.apis.api_expression import ControlMouthLamp
block: ControlMouthLamp = ControlMouthLamp(True, is_open)
(resultType, response) = await block.execute()
_log.info(f'switch MouthLamp result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def play_tts(text: str) -> bool:
"""Play tts
Make the robot start playing a period of tts and wait for the result
Args:
text: str, for example: "Hello, I am Wukong, la la la",
Returns:
bool
"""
from mini.apis.api_sound import StartPlayTTS
block: StartPlayTTS = StartPlayTTS(True, text)
(resultType, response) = await block.execute()
_log.info(f'play tts result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def stop_tts() -> bool:
"""Stop speech synthesis broadcast, and wait for the result
Returns:
bool
"""
from mini.apis.api_sound import StopPlayTTS
block: StopPlayTTS = StopPlayTTS(True)
(resultType, response) = await block.execute()
_log.info(f'stop tts result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def play_online_audio(url: str) -> bool:
"""Play online sound effects
Make the robot play an online sound effect,
And wait for the result
Args:
url: str, for example: http://yun.lnpan.com/music/download/ring/000/075/5653bae83917a892589b372782175dd8.amr
Supported formats are mp3, amr, wav, etc.
Returns:
bool
"""
from mini import AudioStorageType
from mini.apis.api_sound import PlayAudio
block: PlayAudio = PlayAudio(True,
url,
AudioStorageType.NET_PUBLIC)
(resultType, response) = await block.execute()
_log.info(f'play online audio result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def play_local_audio(local_file: str) -> bool:
"""Play local sound effects
Make the robot play a local built-in sound effect, the sound effect name is "read_016", and wait for the result
Args:
local_file:
Returns:
bool
"""
from mini.apis.api_sound import PlayAudio
from mini import AudioStorageType
block: PlayAudio = PlayAudio(True,
local_file,
AudioStorageType.PRESET_LOCAL)
(resultType, response) = await block.execute()
_log.info(f'play local audio result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def stop_audio() -> bool:
"""Stop all audio that is playing
Stop all sound effects and wait for the result
Returns:
bool
"""
from mini.apis.api_sound import StopAllAudio
block: StopAllAudio = StopAllAudio(True)
(resultType, response) = await block.execute()
_log.info(f'stop audio result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def get_system_audio_list() -> GetAudioListResponse:
"""Get a list of sound effects
Get the list of sound effects built into the robot and wait for the result
#GetAudioListResponse.audio ([Audio]): audio effect list
#Audio.name: Audio effect name
#Audio.suffix: audio suffix
#GetAudioListResponse.isSuccess: Is it successful, True or False
#GetAudioListResponse.resultCode: Result code
Returns:
GetAudioListResponse
"""
from mini.apis.api_sound import FetchAudioList
from mini import AudioSearchType
block: FetchAudioList = FetchAudioList(True, search_type=AudioSearchType.INNER)
(resultType, response) = await block.execute()
_log.info(f'stop audio result:{response}')
return response
[docs]async def get_custom_audio_list() -> GetAudioListResponse:
"""Get a list of sound effects
Get the list of sound effects placed by the robot developer under /sdcard/customize/music/, and wait for the result
#GetAudioListResponse.audio ([Audio]): Audio effect list
#Audio.name: Audio effect name
#Audio.suffix: audio suffix
#GetAudioListResponse.isSuccess: Is it successful, True or False
#GetAudioListResponse.resultCode: Result code
Returns:
GetAudioListResponse
"""
from mini.apis.api_sound import FetchAudioList
from mini import AudioSearchType
block: FetchAudioList = FetchAudioList(True, search_type=AudioSearchType.CUSTOM)
(resultType, response) = await block.execute()
_log.info(f'stop audio result:{response}')
return response
[docs]async def change_volume(volume: float = 0.5) -> bool:
"""Adjust the robot volume demo
Set the robot volume to 0~1 and wait for the reply result
Args:
volume: float default 0.5
Returns:
bool
"""
from mini.apis.api_sound import ChangeRobotVolume
block: ChangeRobotVolume = ChangeRobotVolume(True, volume)
(resultType, response) = await block.execute()
_log.info(f'change volume result:{response}')
return resultType == MiniApiResultType.Success and response.isSuccess
[docs]async def face_detect() -> FaceDetectResponse:
"""Face count detection
Do a face count detection and wait for the reply result
#FaceDetectResponse.count: the number of faces
#FaceDetectResponse.isSuccess: Is it successful, True or False
#FaceDetectResponse.resultCode: Return code
Returns:
FaceDetectResponse
"""
from mini.apis.api_sence import FaceDetect
block: FaceDetect = FaceDetect(True, 10)
(resultType, response) = await block.execute()
_log.info(f'face detect result:{response}')
return response
[docs]async def face_analysis() -> FaceAnalyzeResponse:
"""Face analysis (gender)
Do a face information (gender, age) detection, and wait for the reply result
When multiple people are in front of the camera, return the face information that accounts for the
largest proportion of the screen.
Return value: Example {"age": 24, "gender": 99, "height": 238, "width": 238}
age: age
gender: [1, 100], females less than 50 are females, males greater than 50
height: the height of the face in the camera image
width: the width of the face in the camera image
#FaceAnalyzeResponse.faceInfos: face information array [FaceInfoResponse]
#FaceInfoResponse.gender (int) :[1,100], females less than 50, males greater than 50
#FaceInfoResponse.age: age
#FaceInfoResponse.width: The width of the face in the camera screen
#FaceInfoResponse.height: The height of the face in the camera screen
#FaceAnalyzeResponse.isSuccess: Is it successful, True or False
#FaceAnalyzeResponse.resultCode: Return code
Returns:
FaceAnalyzeResponse
"""
from mini.apis.api_sence import FaceAnalysis
block: FaceAnalysis = FaceAnalysis(True, 10)
(resultType, response) = await block.execute()
_log.info(f'face analysis result:{response}')
return response
[docs]async def face_recognise() -> FaceRecogniseResponse:
"""Face recognition
Do a face recognition test and wait for the result
#FaceRecogniseResponse.faceInfos: [FaceInfoResponse] face information array
#FaceInfoResponse.id: face id
#FaceInfoResponse.name: name, if it is a stranger, the default name is "stranger"
#FaceInfoResponse.gender: gender
#FaceInfoResponse.age: age
#FaceRecogniseResponse.isSuccess: Is it successful, True or False
#FaceRecogniseResponse.resultCode: Return code
Returns:
RecogniseObjectResponse
"""
from mini.apis.api_sence import FaceRecognise
(resultType, response) = await FaceRecognise(True, 10).execute()
_log.info(f'face recognise result:{response}')
return response
[docs]async def flower_recognise() -> RecogniseObjectResponse:
"""Flower and grass recognition
Let the robot do a flower and grass recognition (you need to manually place the flower or flower photo in front of the robot),
and wait for the result.
#RecogniseObjectResponse.objects: Recognition result array [str]
#RecogniseObjectResponse.isSuccess: Is it successful, True or False
#RecogniseObjectResponse.resultCode: Return code
Returns:
RecogniseObjectResponse
"""
from mini.apis.api_sence import ObjectRecognise
from mini import ObjectRecogniseType
block: ObjectRecognise = ObjectRecognise(True, ObjectRecogniseType.FLOWER, 10)
(resultType, response) = await block.execute()
_log.info(f'flower_recognise result:{response}')
return response
[docs]async def fruit_recognise() -> RecogniseObjectResponse:
"""Fruit recognition
Let the robot do a fruit recognition (you need to manually put the fruit or fruit photo in front of the robot),
and wait for the result.
#RecogniseObjectResponse.objects: Recognition result array [str]
#RecogniseObjectResponse.isSuccess: Is it successful
#RecogniseObjectResponse.resultCode: Return code
Returns:
RecogniseObjectResponse
"""
from mini.apis.api_sence import ObjectRecognise
from mini import ObjectRecogniseType
block: ObjectRecognise = ObjectRecognise(True, ObjectRecogniseType.FRUIT, 10)
(resultType, response) = await block.execute()
_log.info(f'fruit_recognize result:{response}')
return response
[docs]async def gesture_recognise() -> RecogniseObjectResponse:
"""Gesture Recognition
Let the robot do a user gesture recognition, (need to make a gesture in front of the robot), and wait for the result
#RecogniseObjectResponse.objects: Recognition result array [str]
#RecogniseObjectResponse.isSuccess: Is it successful
#RecogniseObjectResponse.resultCode: Return code
Returns:
RecogniseObjectResponse
"""
from mini.apis.api_sence import ObjectRecognise
from mini import ObjectRecogniseType
block: ObjectRecognise = ObjectRecognise(True, ObjectRecogniseType.GESTURE, 10)
(resultType, response) = await block.execute()
_log.info(f'gesture_recognize result:{response}')
return response
[docs]async def take_picture() -> TakePictureResponse:
"""Take pictures
Let the robot find the face before taking a picture and wait for the result
#TakePictureResponse.isSuccess: Is it successful, True or False
#TakePictureResponse.code: Return code
#TakePictureResponse.picPath: The storage path of the photo in the robot
Returns:
TakePictureResponse
"""
from mini.apis.api_sence import TakePicture
from mini import TakePictureType
(resultType, response) = await TakePicture(take_picture_type=TakePictureType.FINDFACE).execute()
_log.info(f'take picture result:{response}')
return response
[docs]async def get_register_faces() -> GetRegisterFacesResponse:
"""Get registered face information
Get all the face information registered in the robot and wait for the result
#GetRegisterFacesResponse.faceInfos: [FaceInfoResponse] face information array
#FaceInfoResponse.id: face id
#FaceInfoResponse.name: name
#FaceInfoResponse.gender: gender
#FaceInfoResponse.age: age
#GetRegisterFacesResponse.isSuccess: Is it successful, True or False
#GetRegisterFacesResponse.resultCode: Return code
Returns:
GetRegisterFacesResponse
"""
from mini.apis.api_sence import GetRegisterFaces
(resultType, response) = await GetRegisterFaces().execute()
_log.info(f'get register faces result:{response}')
return response
[docs]async def get_infrared_distance() -> GetInfraredDistanceResponse:
"""Infrared distance detection
Get the infrared distance detected by the current robot and wait for the result
#GetInfraredDistanceResponse.distance: Infrared distance
Returns:
GetInfraredDistanceResponse
"""
from mini.apis.api_sence import GetInfraredDistance
(resultType, response) = await GetInfraredDistance().execute()
_log.info(f'get infrared distance result:{response}')
return response