简介¶
这是优必选 AlphaMini
机器人 Python SDK 使用文档, 文档介绍SDK安装环境, 机器人支持Python的软件版本, 以及接口简要说明.
一. 安装环境¶
1.1 下载并安装Python¶
windows系统, 安装时默认勾选 “配置环境变量”
Linux/Unix系统, 参考 https://wiki.python.org/moin/BeginnersGuide/Download 下载安装介绍
验证安装
python –version & pip –version
1.2 安装/卸载AlphaMini
sdk¶
pip install alphamini
pip uninstall alphamini
1.3 安装PyCharm
¶
建议安装Professional,但是需要激活
1.4 准备一台悟空版机器人¶
如果是悟空教育版, 其版本号需要大于1.3.0; 如果是悟空标准版, 其版本号需要大于2.0.5
给机器人配上网,确保机器人和PC在同一个局域网内
1.5 下载demo
并运行¶
将demo导入pycharm, 结构如下
选中文件,右键运行 “Run xxx.py”
二. API
介绍¶
2.1 公共接口¶
导入包 :
import mini.mini_sdk as MiniSdk
2.1.1. 扫描局域网内机器人¶
# 搜索指定序列号(在机器人屁股后面)的机器人,可以只输入序列号尾部字符即可,长度任意, 建议5个字符以上可以准确匹配,10秒超时
# 搜索的结果WiFiDevice, 包含机器人名称,ip,port等信息
async def test_get_device_by_name():
result: WiFiDevice = await MiniSdk.get_device_by_name("00018", 10)
print(f"test_get_device_by_name result:{result}")
return result
# 搜索指定序列号(在机器人屁股后面)的机器人,
async def test_get_device_list():
results = await MiniSdk.get_device_list(10)
print(f"test_get_device_list results = {results}")
return results
2.1.2. 连接机器人¶
根据扫描到的机器人结果,调用MiniSdk.connect可连接机器人
# MiniSdk.connect 返回值为bool, 这里忽略返回值
async def test_connect(dev: WiFiDevice):
await MiniSdk.connect(dev)
2.1.3. 让机器人进入编程模式¶
机器人进入编程模式,可防止机器人在执行命令时被其他技能中断
# 进入编程模式,机器人有个tts播报,这里通过asyncio.sleep 让当前协程等6秒返回,让机器人播完
async def test_start_run_program():
await StartRunProgram().execute()
await asyncio.sleep(6)
2.1.4. 断开连接并释放资源¶
async def shutdown():
await asyncio.sleep(1)
await MiniSdk.release()
2.1.5. sdk日志开关¶
# 默认的日志级别是Warning, 设置为INFO
MiniSdk.set_log_level(logging.INFO)
2.2 API
简介¶
导入api
from mini.apis import *
每个api返回是个元组,元组第一个元素是bool类型,第二个元素是response(protobuf值), 如果第一个元素是False,则第二个元素是None值
每个api有个execute()方法, 这是一个async方法, 触发执行
每个api都有个is_serial参数,默认为True,表示接口可以串行执行, 可以await获取执行结果, is_serial=False,表示只需将指令发送给机器人,await不需要等机器人执行完结果再返回
2.2.1 声音控制¶
# 测试text合成声音
async def test_play_tts():
"""测试播放tts
使机器人开始播放一段tts,内容为"你好, 我是悟空, 啦啦啦",并等待结果
#ControlTTSResponse.isSuccess : 是否成功
#ControlTTSResponse.resultCode : 返回码
"""
# is_serial:串行执行
# text:要合成的文本
block: StartPlayTTS = StartPlayTTS(text="你好, 我是悟空, 啦啦啦")
# 返回元组, response是个ControlTTSResponse
(resultType, response) = await block.execute()
print(f'test_play_tts result: {response}')
# StartPlayTTS block的response包含resultCode和isSuccess
# 如果resultCode !=0 可以通过errors.get_speech_error_str(response.resultCode)) 查询错误描述信息
print('resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_speech_error_str(response.resultCode)))
# 测试播放音效(在线)
async def test_play_online_audio():
"""测试播放在线音效
使机器人播放一段在线音效,例如:"http://hao.haolingsheng.com/ring/000/995/52513bb6a4546b8822c89034afb8bacb.mp3"
支持格式有mp3,amr,wav 等
并等待结果
#PlayAudioResponse.isSuccess : 是否成功
#PlayAudioResponse.resultCode : 返回码
"""
# 播放音效, url表示要播放的音效列表
block: PlayAudio = PlayAudio(
url="http://hao.haolingsheng.com/ring/000/995/52513bb6a4546b8822c89034afb8bacb.mp3",
storage_type=AudioStorageType.NET_PUBLIC)
# response是个PlayAudioResponse
(resultType, response) = await block.execute()
print(f'test_play_online_audio result: {response}')
print('resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_speech_error_str(response.resultCode)))
# 测试获取机器人的音效资源
async def test_get_audio_list():
"""测试播放本地音效
使机器人播放一段本地内置音效,音效名称为"read_016",并等待结果
#PlayAudioResponse.isSuccess : 是否成功
#PlayAudioResponse.resultCode : 返回码
"""
block: PlayAudio = PlayAudio(
url="read_016",
storage_type=AudioStorageType.PRESET_LOCAL)
# response是个PlayAudioResponse
(resultType, response) = await block.execute()
print(f'test_play_local_audio result: {response}')
print('resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_speech_error_str(response.resultCode)))
# 测试停止正在播放的声音
async def test_stop_audio():
"""测试停止所有正在播放的音频
先播放一段tts,3s后,停止所有所有音效,并等待结果
#StopAudioResponse.isSuccess : 是否成功
#StopAudioResponse.resultCode : 返回码
"""
# 设置is_serial=False, 表示只需将指令发送给机器人,await不需要等机器人执行完结果再返回
block: StartPlayTTS = StartPlayTTS(is_serial=False, text="你让我说,让我说,不要打断我,不要打断我,不要打断我")
response = await block.execute()
print(f'test_stop_audio.play_tts: {response}')
await asyncio.sleep(3)
# 停止所有声音
block: StopAllAudio = StopAllAudio()
(resultType, response) = await block.execute()
print(f'test_stop_audio:{response}')
block: StartPlayTTS = StartPlayTTS(text="第二次, 你让我说,让我说,不要打断我,不要打断我,不要打断我")
asyncio.create_task(block.execute())
print(f'test_stop_audio.play_tts: {response}')
await asyncio.sleep(3)
2.2.2 红外/视觉/摄像头等传感器接口¶
通过这些传感器接口,获取一次传感器结果
# 测试人脸侦测
async def test_face_detect():
"""测试人脸个数侦测
侦测人脸个数,10s超时,并等待回复结果
#FaceDetectResponse.count : 人脸个数
#FaceDetectResponse.isSuccess : 是否成功
#FaceDetectResponse.resultCode : 返回码
"""
# timeout: 指定侦测时长
block: FaceDetect = FaceDetect(timeout=10)
# response: FaceDetectResponse
(resultType, response) = await block.execute()
print(f'test_face_detect result: {response}')
# 测试人脸分析(性别)
async def test_face_analysis():
"""测试人脸分析(性别)
侦测人脸信息(性别、年龄),超时时间10s,并等待回复结果
当多人存在摄像头前时,返回占画面比例最大的那个人脸信息
返回值:示例 {"age": 24, "gender": 99, "height": 238, "width": 238}
age: 年龄
gender:[1, 100], 小于50为女性,大于50为男性
height:人脸在摄像头画面中的高度
width:人脸在摄像头画面中的宽度
"""
block: FaceAnalysis = FaceAnalysis(timeout=10)
# response: FaceAnalyzeResponse
(resultType, response) = await block.execute()
print(f'test_face_analysis result: {response}')
print('code = {0}, error={1}'.format(response.resultCode, errors.get_vision_error_str(response.resultCode)))
# 测试物体识别:识别花,10s超时
async def test_object_recognise_flower():
"""测试物体(花)识别
让机器人识别花(需手动把花或花的照片放到机器人面前),超时10s,并等待结果
#RecogniseObjectResponse.objects : 物体名数组[str]
#RecogniseObjectResponse.isSuccess : 是否成功
#RecogniseObjectResponse.resultCode : 返回码
"""
# object_type: 支持FLOWER, FRUIT, GESTURE 三类物体
block: ObjectRecognise = ObjectRecognise(object_type=ObjectRecogniseType.FLOWER, timeout=10)
# response : RecogniseObjectResponse
(resultType, response) = await block.execute()
print(f'test_object_recognise_flower result: {response}')
# 测试物体识别:识别水果,10s超时
async def test_object_recognise_fruit():
"""测试物体(水果)识别
让机器人识别花(需手动把水果或水果的照片放到机器人面前),超时10s,并等待结果
#RecogniseObjectResponse.objects : 物体名数组[str]
#RecogniseObjectResponse.isSuccess : 是否成功
#RecogniseObjectResponse.resultCode : 返回码
"""
# object_type: 支持FLOWER, FRUIT, GESTURE 三类物体
block: ObjectRecognise = ObjectRecognise(object_type=ObjectRecogniseType.FRUIT, timeout=10)
# response : RecogniseObjectResponse
(resultType, response) = await block.execute()
print(f'test_object_recognise_fruit result: {response}')
# 测试物体识别:识别手势,10s超时
async def test_object_recognise_gesture():
"""测试物体(手势)识别
让机器人识别花(需手动在机器人面前作出手势),超时10s,并等待结果
#RecogniseObjectResponse.objects : 物体名数组[str]
#RecogniseObjectResponse.isSuccess : 是否成功
#RecogniseObjectResponse.resultCode : 返回码
"""
# object_type: 支持FLOWER, FRUIT, GESTURE 三类物体
block: ObjectRecognise = ObjectRecognise(object_type=ObjectRecogniseType.GESTURE, timeout=10)
# response : RecogniseObjectResponse
(resultType, response) = await block.execute()
print(f'test_object_recognise_gesture result: {response}')
# 测试拍照
async def test_take_picture():
"""测试拍照
让机器人立即拍照,并等待结果
#TakePictureResponse.isSuccess : 是否成功
#TakePictureResponse.code : 返回码
#TakePictureResponse.picPath : 照片在机器人里的存储路径
"""
# response: TakePictureResponse
# take_picture_type: IMMEDIATELY-立即拍照, FINDFACE-找到人脸再拍照 两种拍照效果
(resultType, response) = await TakePicture(take_picture_type=TakePictureType.IMMEDIATELY).execute()
print(f'test_take_picture result: {response}')
# 测试人脸识别
async def test_face_recognise():
"""测试人脸识别
让机器人进行人脸识别检测,超时10s,并等待结果
#FaceRecogniseResponse.faceInfos : [FaceInfoResponse] 人脸信息数组
FaceInfoResponse.id : 人脸id
FaceInfoResponse.name : 姓名,如果是陌生人,则默认name为"stranger"
FaceInfoResponse.gender : 性别
FaceInfoResponse.age : 年龄
#FaceRecogniseResponse.isSuccess : 是否成功
#FaceRecogniseResponse.resultCode : 返回码
Returns:
"""
# response : FaceRecogniseResponse
(resultType, response) = await FaceRecognise(timeout=10).execute()
print(f'test_face_recognise result: {response}')
# 测试获取红外探测距离
async def test_get_infrared_distance():
"""测试红外距离检测
获取当前机器人检测到的红外距离,并等待结果
#GetInfraredDistanceResponse.distance : 红外距离
"""
# response: GetInfraredDistanceResponse
(resultType, response) = await GetInfraredDistance().execute()
print(f'test_get_infrared_distance result: {response}')
# 测试获取目前机器人内注册的人脸个数
async def test_get_register_faces():
"""测试获取已注册的人脸信息
获取在机器人中已注册的所有人脸信息,并等待结果
#GetRegisterFacesResponse.faceInfos : [FaceInfoResponse] 人脸信息数组
#FaceInfoResponse.id : 人脸id
#FaceInfoResponse.name : 姓名
#FaceInfoResponse.gender : 性别
#FaceInfoResponse.age : 年龄
#GetRegisterFacesResponse.isSuccess : 是否成功
#GetRegisterFacesResponse.resultCode : 返回码
Returns:
"""
# reponse : GetRegisterFacesResponse
(resultType, response) = await GetRegisterFaces().execute()
print(f'test_get_register_faces result: {response}')
2.2.3 机器人表现力¶
# 测试让眼睛演示个表情
async def test_play_expression():
"""测试播放表情
让机器人播放一个名为"codemao1"的内置表情,并等待回复结果
#PlayExpressionResponse.isSuccess : 是否成功
#PlayExpressionResponse.resultCode : 返回码
"""
block: PlayExpression = PlayExpression(express_name="codemao1")
# response: PlayExpressionResponse
(resultType, response) = await block.execute()
print(f'test_play_expression result: {response}')
# 测试, 让机器人跳舞/停止跳舞
async def test_control_behavior():
"""测试控制表现力
让机器人开始跳一个名为"dance_0004"的舞蹈,并等待回复结果
"""
# control_type: START, STOP
block: StartBehavior = StartBehavior(name="dance_0004")
# response ControlBehaviorResponse
(resultType, response) = await block.execute()
print(f'test_control_behavior result: {response}')
print(
'resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_express_error_str(response.resultCode)))
# 测试, 设置嘴巴灯颜色为绿色 常亮
async def test_set_mouth_lamp():
# mode: 嘴巴灯模式,0:普通模式,1:呼吸模式
# color: 嘴巴灯颜色,1:红色,2:绿色
# duration: 持续时间,单位为毫秒,-1表示常亮
# breath_duration: 闪烁一次时长,单位为毫秒
"""测试设置嘴巴灯
设置机器人嘴巴灯正常模式、绿色、常亮3s,并等待回复结果
当mode=NORMAL时,duration参数起作用,表示常亮多久时间
当mode=BREATH,breath_duration参数起作用,表示多久呼吸一次
#SetMouthLampResponse.isSuccess : 是否成功
#SetMouthLampResponse.resultCode : 返回码
"""
block: SetMouthLamp = SetMouthLamp(color=MouthLampColor.GREEN, mode=MouthLampMode.NORMAL,
duration=3000, breath_duration=1000)
# response:SetMouthLampResponse
(resultType, response) = await block.execute()
print(f'test_set_mouth_lamp result: {response}')
# 测试,开关嘴巴灯
async def test_control_mouth_lamp():
"""测试控制嘴巴灯
让机器人嘴巴灯关闭,并等待结果
#ControlMouthResponse.isSuccess : 是否成功
#ControlMouthResponse.resultCode : 返回码
"""
# is_open: True,False
# response :ControlMouthResponse
(resultType, response) = await ControlMouthLamp(is_open=False).execute()
print(f'test_control_mouth_lamp result: {response}')
2.2.4 第三方内容接口¶
查询百科, 翻译
# 测试, 查询wiki
async def test_query_wiki():
"""查询百科demo
查询百科,查询内容"优必选",并等待结果,机器人播报查询结果
#WikiResponse.isSuccess : 是否成功
#WikiResponse.resultCode : 返回码
"""
# query:查询关键字
block: QueryWiKi = QueryWiKi(query='优必选')
# response : WikiResponse
(resultType, response) = await block.execute()
print(f'test_query_wiki result: {response}')
# 测试翻译接口
async def test_start_translate():
"""翻译demo
使用百度翻译,把"张学友",从中文翻译成英文,并等待结果,机器人播报翻译结果
#TranslateResponse.isSuccess : 是否成功
#TranslateResponse.resultCode : 返回码
# query: 关键字
# from_lan: 源语言
# to_lan: 目标语言
# platform: BAIDU, GOOGLE, TENCENT
"""
block: StartTranslate = StartTranslate(query="张学友", from_lan=LanType.CN, to_lan=LanType.EN)
# response: TranslateResponse
(resultType, response) = await block.execute()
print(f'test_start_translate result: {response}')
2.2.5 运动控制¶
# 测试, 执行一个动作文件
async def test_play_action():
"""执行一个动作demo
控制机器人执行一个指定名称的本地(内置/自定义)动作,并等待执行结果回复
动作名称可用GetActionList获取
#PlayActionResponse.isSuccess : 是否成功
#PlayActionResponse.resultCode : 返回码
"""
# action_name: 动作文件名, 可以通过GetActionList获取机器人支持的动作
block: PlayAction = PlayAction(action_name='018')
# response: PlayActionResponse
(resultType, response) = await block.execute()
print(f'test_play_action result:{response}')
# 测试, 控制机器人,向前/后/左/右 移动
async def test_move_robot():
"""控制机器人移动demo
控制机器人往左(LEFTWARD)移动10步,并等待执行结果
#MoveRobotResponse.isSuccess : 是否成功
#MoveRobotResponse.code : 返回码
"""
# step: 移动几步
# direction: 方向,枚举类型
block: MoveRobot = MoveRobot(step=10, direction=MoveRobotDirection.LEFTWARD)
# response : MoveRobotResponse
(resultType, response) = await block.execute()
print(f'test_move_robot result:{response}')
# 测试, 获取支持的动作文件列表
async def test_get_action_list():
"""获取动作列表demo
获取机器人内置的动作列表,等待回复结果
"""
# action_type: INNER 是指机器人内置的不可修改的动作文件, CUSTOM 是放置在sdcard/customize/action目录下可被开发者修改的动作
block: GetActionList = GetActionList(action_type=RobotActionType.INNER)
# response:GetActionListResponse
(resultType, response) = await block.execute()
print(f'test_get_action_list result:{response}')
2.2.6 事件监听类接口¶
2.2.6.1 触摸监听¶
# 测试, 触摸监听
async def test_ObserveHeadRacket():
# 创建监听
observer: ObserveHeadRacket = ObserveHeadRacket()
# 事件处理器
# ObserveHeadRacketResponse.type:
# @enum.unique
# class HeadRacketType(enum.Enum):
# SINGLE_CLICK = 1 # 单击
# LONG_PRESS = 2 # 长按
# DOUBLE_CLICK = 3 # 双击
def handler(msg: ObserveHeadRacketResponse):
# 监听到一个事件后,停止监听,
observer.stop()
print("{0}".format(str(msg.type)))
# 执行个舞动
asyncio.create_task(__dance())
observer.set_handler(handler)
#启动
observer.start()
await asyncio.sleep(0)
async def __dance():
await ControlBehavior(name="dance_0002").execute()
# 结束event_loop
asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
# 程序入口
if __name__ == '__main__':
device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
if device:
asyncio.get_event_loop().run_until_complete(test_connect(device))
asyncio.get_event_loop().run_until_complete(test_start_run_program())
asyncio.get_event_loop().run_until_complete(test_ObserveHeadRacket())
asyncio.get_event_loop().run_forever() # 定义了事件监听对象,必须让event_loop.run_forver
asyncio.get_event_loop().run_until_complete(shutdown())
2.2.6.2 语音识别监听¶
async def __tts():
block: PlayTTS = PlayTTS(text="你好, 我是悟空, 啦里啦,啦里啦")
response = await block.execute()
print(f'tes_play_tts: {response}')
# 测试监听语音识别
async def test_speech_recognise():
# 语音监听对象
observe: ObserveSpeechRecognise = ObserveSpeechRecognise()
# 处理器
# SpeechRecogniseResponse.text
# SpeechRecogniseResponse.isSuccess
# SpeechRecogniseResponse.resultCode
def handler(msg: SpeechRecogniseResponse):
print(f'=======handle speech recognise:{msg}')
print("{0}".format(str(msg.text)))
if str(msg.text) == "悟空":
#监听到"悟空", tts打个招呼
asyncio.create_task(__tts())
elif str(msg.text) == "结束":
#监听到结束, 停止监听
observe.stop()
#结束event_loop
asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
observe.set_handler(handler)
#启动
observe.start()
await asyncio.sleep(0)
if __name__ == '__main__':
device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
if device:
asyncio.get_event_loop().run_until_complete(test_connect(device))
asyncio.get_event_loop().run_until_complete(test_start_run_program())
asyncio.get_event_loop().run_until_complete(test_speech_recognise())
asyncio.get_event_loop().run_forever() # 定义了事件监听对象,必须让event_loop.run_forver
asyncio.get_event_loop().run_until_complete(shutdown())
2.2.6.3 红外监听¶
async def test_ObserveInfraredDistance():
#红外监听对象
observer: ObserveInfraredDistance = ObserveInfraredDistance()
# 定义处理器
# ObserveInfraredDistanceResponse.distance
def handler(msg: ObserveInfraredDistanceResponse):
print("distance = {0}".format(str(msg.distance)))
if msg.distance < 500:
observer.stop()
asyncio.create_task(__tts())
observer.set_handler(handler)
observer.start()
await asyncio.sleep(0)
async def __tts():
result = await PlayTTS(text="是不是有人在啊, 你是谁啊").execute()
print(f"tts over {result}")
asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
if __name__ == '__main__':
device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
if device:
asyncio.get_event_loop().run_until_complete(test_connect(device))
asyncio.get_event_loop().run_until_complete(test_start_run_program())
asyncio.get_event_loop().run_until_complete(test_ObserveInfraredDistance())
asyncio.get_event_loop().run_forever() # 定义了事件监听对象,必须让event_loop.run_forver
asyncio.get_event_loop().run_until_complete(shutdown())
2.2.6.4 人脸识别/检测监听¶
#测试, 检测到注册的人脸,则上报事件, 如果陌生人,返回"stranger"
async def test_ObserveFaceRecognise():
observer: ObserveFaceRecognise = ObserveFaceRecognise()
# FaceRecogniseTaskResponse.faceInfos: [FaceInfoResponse]
# FaceInfoResponse.id, FaceInfoResponse.name,FaceInfoResponse.gender,FaceInfoResponse.age
# FaceRecogniseTaskResponse.isSuccess
# FaceRecogniseTaskResponse.resultCode
def handler(msg: FaceRecogniseTaskResponse):
print(f"{msg}")
if msg.isSuccess and msg.faceInfos:
observer.stop()
asyncio.create_task(__tts(msg.faceInfos[0].name))
observer.set_handler(handler)
observer.start()
await asyncio.sleep(0)
async def __tts(name):
await PlayTTS(text=f'你好, {name}').execute()
asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
# 人脸检测,检测到人脸,则上报事件
async def test_ObserveFaceDetect():
observer: ObserveFaceDetect = ObserveFaceDetect()
# FaceDetectTaskResponse.count
# FaceDetectTaskResponse.isSuccess
# FaceDetectTaskResponse.resultCode
def handler(msg: FaceDetectTaskResponse):
print(f"{msg}")
if msg.isSuccess and msg.count:
observer.stop()
asyncio.create_task(__tts(msg.count))
observer.set_handler(handler)
observer.start()
await asyncio.sleep(0)
async def __tts(count):
await PlayTTS(text=f'在我面前好像有{count}个人').execute()
asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
2.2.6.5 姿态检测监听¶
# 测试,姿态检测
async def test_ObserveRobotPosture():
#创建监听对象
observer: ObserveRobotPosture = ObserveRobotPosture()
# 事件处理器
# ObserveFallClimbResponse.status
# STAND = 1; //站立
# SPLITS_LEFT = 2; //左弓步
# SPLITS_RIGHT = 3; //右弓步
# SITDOWN = 4; //坐下
# SQUATDOWN = 5; //蹲下
# KNEELING = 6; //跪下
# LYING = 7; //侧躺
# LYINGDOWN = 8; //平躺
# SPLITS_LEFT_1 = 9; //左劈叉
# SPLITS_RIGHT_2 = 10;//右劈叉
# BEND = 11;//弯腰
def handler(msg: ObserveFallClimbResponse):
print("{0}".format(msg))
if msg.status == 8 or msg.status == 7:
observer.stop()
asyncio.create_task(__tts())
observer.set_handler(handler)
#start
observer.start()
await asyncio.sleep(0)
async def __tts():
await PlayTTS(text="我摔倒了").execute()
asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
if __name__ == '__main__':
device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
if device:
asyncio.get_event_loop().run_until_complete(test_connect(device))
asyncio.get_event_loop().run_until_complete(test_start_run_program())
asyncio.get_event_loop().run_until_complete(test_ObserveRobotPosture())
asyncio.get_event_loop().run_forever() # 定义了事件监听对象,必须让event_loop.run_forver
asyncio.get_event_loop().run_until_complete(shutdown())
三. 命令行工具¶
因为机器人里内置了一个Python执行环境(机器人支持python3.8), 通过pip命令(Python包管理工具), 即可将开发者开发的库安装到机器人本地。
开发者可以在pc上调试完自己的Python程序后,利用sdk的命令行工具,将程序安装到机器人内并触发执行。命令行工具可以做什么:
利用命令行工具, 用户可以将编写好的py程序打包成一个pypi压缩包, 发送给机器人安装。
利用命令行工具, 用户可以
增,删,改,查
机器人内pypi安装包。利用命令行工具, 用户还触发py程序在机器人端执行。
另外, 这些工具有对应的Python接口支持, 通过import mini.pkg_tool as Tool
可导入Python接口。
支持命令行工具的AlphaMini
SDK版本,需要大于等于1.0.0, 同时机器人也需要满足版本要求:
悟空教育版需大于等于v1.4
悟空标准版需大于2.0.5
开发者将工程打包成.whl压缩文件, 需要在工程的根目录下编写setup.py配置文件, 关于如何配置setup.py, 可以参考demo中setup.py文件的配置。
3.1 打包¶
3.1.1 配置环境¶
首先确保pc已安装最新setuptools 和 wheel和twine ,下面是安装/更新命令
python -m pip install --user --upgrade setuptools wheel twine
3.1.2 setup.py¶
在你工程目录下编写setup.py, 参考demo:
import setuptools
setuptools.setup(
name="tts_demo", # 你的库(程序)名字
version="0.0.2", # 版本号
author='Gino Deng', # 开发者名,
author_email='jingjing.deng@ubtrobot.com', # 邮箱,可选
description="demo with mini_sdk", #短描述
long_description='demo with mini_sdk,xxxxxxx', #长描述, 可选
long_description_content_type="text/markdown", #长描述内容的格式, 可选
license="GPLv3", # 版权
packages=setuptools.find_packages(), #*注意*:这里默认取工程目录下所有包含__init__.py的子目录
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
install_requires=[ # 依赖模块
'alphamini',
],
entry_points={
'console_scripts': [ # 命令行入口定义
'tts_demo = play_tts.test_playTTS:main'
],
},
)
setup.py各参数介绍如下:
--name 包名称
--version (-V) 包版本
--author 程序的作者
--author_email 程序的作者的邮箱地址
--maintainer 维护者
--maintainer_email 维护者的邮箱地址
--url 程序的官网地址
--license 程序的授权信息
--description 程序的简单描述
--long_description 程序的详细描述
--platforms 程序适用的软件平台列表
--classifiers 程序的所属分类列表
--keywords 程序的关键字列表
--packages 需要处理的包目录(包含__init__.py的文件夹)
--py_modules 需要打包的Python文件列表
--download_url 程序的下载地址
--cmdclass
--data_files 打包时需要打包的数据文件,如图片,配置文件等
--scripts 安装时需要执行的脚步列表
--package_dir 告诉setuptools哪些目录下的文件被映射到哪个源码包。一个例子:package_dir = {'': 'lib'},表示“root package”中的模块都在lib 目录中。
--requires 定义依赖哪些模块
--provides定义可以为哪些模块提供依赖
--find_packages() 对于简单工程来说,手动增加packages参数很容易,刚刚我们用到了这个函数,它默认在和setup.py同一目录下搜索各个含有 __init__.py的包。
其实我们可以将包统一放在一个src目录中,另外,这个包内可能还有aaa.txt文件和data数据文件夹。另外,也可以排除一些特定的包
find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"])
--install_requires = ["requests"] 需要安装的依赖包
--entry_points 动态发现服务和插件,下面详细讲
下列entry_points中: console_scripts 指明了命令行工具的名称;在tts_demo = play_tts.test_playTTS:main
中,等号前面指明了工具包的名称,等号后面的内容指明了程序的入口地址。
entry_points={
'console_scripts': [ # 命令行入口定义
'tts_demo = play_tts.test_playTTS:main'
],
}
这里可以有多条记录,这样一个项目就可以制作多个命令行工具了,比如:
#... 省略其他
entry_points = {
'console_scripts': [
'foo = demo:test',
'bar = demo:test',
]}
3.1.3 打包¶
在你的工程目录下编写好setup.py后, 就可以通过命令行工具来打包了
方式一: 通过Python命令,调用setup.py打包, 在工程目录下运行如下shell命令:
python setup.py sdist bdist_wheel
方式二: 通过
AlphaMini
内的命令行工具
setup_py_pkg "你的py工程目录"
当然,也可以使用AlphaMini
内Tool包
方式三:Python代码打包
import mini.pkg_tool as Tool
# 将当前目录下的tts_demo工程打包成py wheel, 返回值为打包生成后的whl路径
pkg_path = Tool.setup_py_pkg("tts_demo")
print(f'{pkg_path}')
以tts_demo为例, 打包成功后,生成了一个dist目录, *.whl文件就是可以安装的文件, 如下图:
3.2 安装¶
将一个打包好的*.whl程序安装到指定机器人里,需要确保机器人和PC在同一个局域网内, 因为需要将.whl文件上传到机器人, 并安装在机器人内(让机器人执行pip install xx.whl
指令), 所以要指定机器人序列号, 可以填机器人序列号后5位;
方式一:通过
AlphaMini
命令行安装, –type 指定机器人类型,mini
–表示悟空标准版机器人,dedu
–表示悟空国内教育版本机器人
install_py_pkg '打包好的.whl路径' "机器人序列号" --type "dedu"
方式二:通过Python代码
import mini.pkg_tool as Tool
from mini import mini_sdk as MiniSdk
# 设置机器人类型
MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
# 安装当前目录下simple_socket-0.0.2-py3-none-any.whl文件
Tool.install_py_pkg(package_path="simple_socket-0.0.2-py3-none-any.whl", robot_id="机器人序列号")
3.3 查询¶
可以查询安装到机器人内的py程序信息, 即发送pip list
命令给机器人执行, 并返回命令结果, 需要确保机器人和PC在同一个局域网内, 因为需要查询机器人内的安装包信息, 所以要指定机器人序列号, 可以填机器人序列号后5位
3.3.1 所有py模块¶
列出机器人内模块列表
方式一: 通过命令行列出
list_py_pkg "0090" --type "dedu"
方式二: 通过Python代码
import mini.pkg_tool as Tool
from mini import mini_sdk as MiniSdk
# 设置机器人类型
MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
# 列出机器人内py包
list_info = Tool.list_py_pkg(robot_id="0090")
print(f'{list_info}')
3.3.2 指定py模块¶
查询指定模块详细信息, 即发送pip show xx
命令给机器人执行, 并返回命令结果, 在下面的例子中, xx代表 tts_demo
; 需要确保机器人和PC在同一个局域网内, 因为需要查询机器人内的安装包信息, 所以要指定机器人序列号, 可以填机器人序列号后5位
方式一: 通过命令行
query_py_pkg "tts_demo" "0090" --type "dedu"
方式二: 通过Python代码
import mini.pkg_tool as Tool
from mini import mini_sdk as MiniSdk
# 设置机器人类型
MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
# 查询tts_demo包信息
info = Tool.query_py_pkg(pkg_name="tts_demo", robot_id="0090")
print(f'{info}')
3.4 卸载¶
可以卸载安装到机器人内的程序, 即发送pip uninstall -y xx
命令给机器人执行, 并返回命令结果, 在下面的例子中, xx代表 tts_demo
; 需要确保机器人和PC在同一个局域网内, 因为需要卸载机器人内的安装包信息, 所以要指定机器人序列号, 可以填机器人序列号后5位
方式一: 通过命令行
uninstall_py_pkg "tts_demo" "0090" --type "dedu"
方式二: 通过Python代码
import mini.pkg_tool as Tool
from mini import mini_sdk as MiniSdk
# 设置机器人类型
MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
# 卸载tts_demo
Tool.uninstall_py_pkg(pkg_name="tts_demo", robot_id="0090")
3.5 运行¶
可以在机器人内运行py程序, 即发送xx
命令给机器人执行, 并返回命令结果, 所以, 机器人内需要有xx
命令, 在下面的例子中, xx代表 tts_demo
; 需要确保机器人和PC在同一个局域网内, 因为需要让机器人执行命令, 所以要指定机器人序列号, 可以填机器人序列号后5位
方式一: 通过命令行
# 触发tts_demo程序运行
run_py_pkg 'tts_demo' '0090' --type "dedu"
方式二: 通过Python代码
import mini.pkg_tool as Tool
from mini import mini_sdk as MiniSdk
# 设置机器人类型
MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
# 触发tts_demo脱机执行
Tool.run_py_pkg("tts_demo", robot_id="0090")
3.6 执行shell命令¶
通过前面的描述, 可以理解为机器人其实内建了一个终端环境, Python是其下的一个可执行程序, 发送pip install alphamini
命令给机器人, 安装/卸载第三方Python库
方式一: 通过命令行, 安装/卸载
AlphaMini
run_cmd "pip install alphamini" "0090" --type "dedu"
run_cmd "pip uninstall -y alphamini" "0090" --type "dedu"
方式二: 通过Python代码, 安装/卸载 alphamini python sdk
import mini.pkg_tool as Tool
from mini import mini_sdk as MiniSdk
# 设置机器人类型
MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
# 执行一个shell命令
Tool.run_py_pkg("pip install alphamini", robot_id="0090")
Tool.run_py_pkg("pip uninstall -y alpahmini", robot_id="0090")
还可以发送命令pkg install -f xx
给机器人, 安装其他第三方可执行程序或库, 在下面的例子中xx
表示第三方库名, pkg
是机器人内置的包扩展命令
run_cmd "pkg install -f xx" "0090" --type "dedu"
run_cmd "pkg uninstall -f xx" "0090" --type "dedu"