简介

这是优必选 AlphaMini机器人 Python SDK 使用文档, 文档介绍SDK安装环境, 机器人支持Python的软件版本, 以及接口简要说明.

一. 安装环境

1.1 下载并安装Python

1.2 安装/卸载AlphaMini sdk

  • pip install alphamini

  • pip uninstall alphamini

1.3 安装PyCharm

1.4 准备一台悟空版机器人

  • 如果是悟空教育版, 其版本号需要大于1.3.0; 如果是悟空标准版, 其版本号需要大于2.0.5

  • 给机器人配上网,确保机器人和PC在同一个局域网内

1.5 下载demo并运行

二. API介绍

2.1 公共接口

  • 导入包 :

    import mini.mini_sdk as MiniSdk
    

2.1.1. 扫描局域网内机器人

 
  # 搜索指定序列号(在机器人屁股后面)的机器人,可以只输入序列号尾部字符即可,长度任意, 建议5个字符以上可以准确匹配,10秒超时
  # 搜索的结果WiFiDevice, 包含机器人名称,ip,port等信息
  async def test_get_device_by_name():  
    result: WiFiDevice = await MiniSdk.get_device_by_name("00018", 10) 
    print(f"test_get_device_by_name result:{result}")
    return result

  # 搜索指定序列号(在机器人屁股后面)的机器人,
  async def test_get_device_list():
    results = await MiniSdk.get_device_list(10)
    print(f"test_get_device_list results = {results}")
    return results  

2.1.2. 连接机器人

根据扫描到的机器人结果,调用MiniSdk.connect可连接机器人


# MiniSdk.connect 返回值为bool, 这里忽略返回值
async def test_connect(dev: WiFiDevice):
    await MiniSdk.connect(dev)

2.1.3. 让机器人进入编程模式

机器人进入编程模式,可防止机器人在执行命令时被其他技能中断

# 进入编程模式,机器人有个tts播报,这里通过asyncio.sleep 让当前协程等6秒返回,让机器人播完 
async def test_start_run_program():
    await StartRunProgram().execute()
    await asyncio.sleep(6)

2.1.4. 断开连接并释放资源

async def shutdown():
    await asyncio.sleep(1)
    await MiniSdk.release()

2.1.5. sdk日志开关

# 默认的日志级别是Warning, 设置为INFO
MiniSdk.set_log_level(logging.INFO)

2.2 API简介

  • 导入api

    from mini.apis import *
    
  • 每个api返回是个元组,元组第一个元素是bool类型,第二个元素是response(protobuf值), 如果第一个元素是False,则第二个元素是None值

  • 每个api有个execute()方法, 这是一个async方法, 触发执行

  • 每个api都有个is_serial参数,默认为True,表示接口可以串行执行, 可以await获取执行结果, is_serial=False,表示只需将指令发送给机器人,await不需要等机器人执行完结果再返回

2.2.1 声音控制


# 测试text合成声音
async def test_play_tts():
    """测试播放tts

    使机器人开始播放一段tts,内容为"你好, 我是悟空, 啦啦啦",并等待结果

    #ControlTTSResponse.isSuccess : 是否成功

    #ControlTTSResponse.resultCode : 返回码

    """
    # is_serial:串行执行
    # text:要合成的文本
    block: StartPlayTTS = StartPlayTTS(text="你好, 我是悟空, 啦啦啦")
    # 返回元组, response是个ControlTTSResponse
    (resultType, response) = await block.execute()

    print(f'test_play_tts result: {response}')
    # StartPlayTTS block的response包含resultCode和isSuccess
    # 如果resultCode !=0 可以通过errors.get_speech_error_str(response.resultCode)) 查询错误描述信息
    print('resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_speech_error_str(response.resultCode)))

# 测试播放音效(在线)
async def test_play_online_audio():
    """测试播放在线音效

    使机器人播放一段在线音效,例如:"http://hao.haolingsheng.com/ring/000/995/52513bb6a4546b8822c89034afb8bacb.mp3"

    支持格式有mp3,amr,wav 等

    并等待结果

    #PlayAudioResponse.isSuccess : 是否成功

    #PlayAudioResponse.resultCode : 返回码

    """
    # 播放音效, url表示要播放的音效列表
    block: PlayAudio = PlayAudio(
        url="http://hao.haolingsheng.com/ring/000/995/52513bb6a4546b8822c89034afb8bacb.mp3",
        storage_type=AudioStorageType.NET_PUBLIC)
    # response是个PlayAudioResponse
    (resultType, response) = await block.execute()

    print(f'test_play_online_audio result: {response}')
    print('resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_speech_error_str(response.resultCode)))

# 测试获取机器人的音效资源
async def test_get_audio_list():
    """测试播放本地音效

    使机器人播放一段本地内置音效,音效名称为"read_016",并等待结果

    #PlayAudioResponse.isSuccess : 是否成功

    #PlayAudioResponse.resultCode : 返回码

    """

    block: PlayAudio = PlayAudio(
        url="read_016",
        storage_type=AudioStorageType.PRESET_LOCAL)
    # response是个PlayAudioResponse
    (resultType, response) = await block.execute()

    print(f'test_play_local_audio result: {response}')
    print('resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_speech_error_str(response.resultCode)))

# 测试停止正在播放的声音
async def test_stop_audio():
    """测试停止所有正在播放的音频

    先播放一段tts,3s后,停止所有所有音效,并等待结果

    #StopAudioResponse.isSuccess : 是否成功 

    #StopAudioResponse.resultCode : 返回码

    """
    # 设置is_serial=False, 表示只需将指令发送给机器人,await不需要等机器人执行完结果再返回
    block: StartPlayTTS = StartPlayTTS(is_serial=False, text="你让我说,让我说,不要打断我,不要打断我,不要打断我")
    response = await block.execute()
    print(f'test_stop_audio.play_tts: {response}')
    await asyncio.sleep(3)

    # 停止所有声音
    block: StopAllAudio = StopAllAudio()
    (resultType, response) = await block.execute()

    print(f'test_stop_audio:{response}')

    block: StartPlayTTS = StartPlayTTS(text="第二次, 你让我说,让我说,不要打断我,不要打断我,不要打断我")
    asyncio.create_task(block.execute())
    print(f'test_stop_audio.play_tts: {response}')
    await asyncio.sleep(3)

2.2.2 红外/视觉/摄像头等传感器接口

  • 通过这些传感器接口,获取一次传感器结果

# 测试人脸侦测
async def test_face_detect():
    """测试人脸个数侦测

    侦测人脸个数,10s超时,并等待回复结果

    #FaceDetectResponse.count : 人脸个数

    #FaceDetectResponse.isSuccess : 是否成功

    #FaceDetectResponse.resultCode : 返回码

    """
    # timeout: 指定侦测时长
    block: FaceDetect = FaceDetect(timeout=10)
    # response: FaceDetectResponse
    (resultType, response) = await block.execute()

    print(f'test_face_detect result: {response}')


# 测试人脸分析(性别)
async def test_face_analysis():
    """测试人脸分析(性别)

    侦测人脸信息(性别、年龄),超时时间10s,并等待回复结果

    当多人存在摄像头前时,返回占画面比例最大的那个人脸信息

    返回值:示例 {"age": 24, "gender": 99, "height": 238, "width": 238}

    age: 年龄

    gender:[1, 100], 小于50为女性,大于50为男性

    height:人脸在摄像头画面中的高度

    width:人脸在摄像头画面中的宽度

    """
    block: FaceAnalysis = FaceAnalysis(timeout=10)
    # response: FaceAnalyzeResponse
    (resultType, response) = await block.execute()

    print(f'test_face_analysis result: {response}')
    print('code = {0}, error={1}'.format(response.resultCode, errors.get_vision_error_str(response.resultCode)))


# 测试物体识别:识别花,10s超时
async def test_object_recognise_flower():
    """测试物体(花)识别

    让机器人识别花(需手动把花或花的照片放到机器人面前),超时10s,并等待结果

    #RecogniseObjectResponse.objects : 物体名数组[str]

    #RecogniseObjectResponse.isSuccess : 是否成功

    #RecogniseObjectResponse.resultCode : 返回码

    """
    # object_type: 支持FLOWER, FRUIT, GESTURE 三类物体
    block: ObjectRecognise = ObjectRecognise(object_type=ObjectRecogniseType.FLOWER, timeout=10)
    # response : RecogniseObjectResponse
    (resultType, response) = await block.execute()

    print(f'test_object_recognise_flower result: {response}')


# 测试物体识别:识别水果,10s超时
async def test_object_recognise_fruit():
    """测试物体(水果)识别

    让机器人识别花(需手动把水果或水果的照片放到机器人面前),超时10s,并等待结果

    #RecogniseObjectResponse.objects : 物体名数组[str]

    #RecogniseObjectResponse.isSuccess : 是否成功

    #RecogniseObjectResponse.resultCode : 返回码

    """
    # object_type: 支持FLOWER, FRUIT, GESTURE 三类物体
    block: ObjectRecognise = ObjectRecognise(object_type=ObjectRecogniseType.FRUIT, timeout=10)
    # response : RecogniseObjectResponse
    (resultType, response) = await block.execute()

    print(f'test_object_recognise_fruit result: {response}')


# 测试物体识别:识别手势,10s超时
async def test_object_recognise_gesture():
    """测试物体(手势)识别

    让机器人识别花(需手动在机器人面前作出手势),超时10s,并等待结果

    #RecogniseObjectResponse.objects : 物体名数组[str]

    #RecogniseObjectResponse.isSuccess : 是否成功

    #RecogniseObjectResponse.resultCode : 返回码

    """
    # object_type: 支持FLOWER, FRUIT, GESTURE 三类物体
    block: ObjectRecognise = ObjectRecognise(object_type=ObjectRecogniseType.GESTURE, timeout=10)
    # response : RecogniseObjectResponse
    (resultType, response) = await block.execute()

    print(f'test_object_recognise_gesture result: {response}')


# 测试拍照
async def test_take_picture():
    """测试拍照

    让机器人立即拍照,并等待结果

    #TakePictureResponse.isSuccess : 是否成功

    #TakePictureResponse.code : 返回码

    #TakePictureResponse.picPath : 照片在机器人里的存储路径

    """
    # response: TakePictureResponse
    # take_picture_type: IMMEDIATELY-立即拍照, FINDFACE-找到人脸再拍照 两种拍照效果
    (resultType, response) = await TakePicture(take_picture_type=TakePictureType.IMMEDIATELY).execute()

    print(f'test_take_picture result: {response}')


# 测试人脸识别
async def test_face_recognise():
    """测试人脸识别

    让机器人进行人脸识别检测,超时10s,并等待结果


    #FaceRecogniseResponse.faceInfos : [FaceInfoResponse] 人脸信息数组

        FaceInfoResponse.id : 人脸id

        FaceInfoResponse.name : 姓名,如果是陌生人,则默认name为"stranger"

        FaceInfoResponse.gender : 性别

        FaceInfoResponse.age : 年龄

    #FaceRecogniseResponse.isSuccess : 是否成功

    #FaceRecogniseResponse.resultCode : 返回码

    Returns:

    """
    # response : FaceRecogniseResponse
    (resultType, response) = await FaceRecognise(timeout=10).execute()

    print(f'test_face_recognise result: {response}')


# 测试获取红外探测距离
async def test_get_infrared_distance():
    """测试红外距离检测

    获取当前机器人检测到的红外距离,并等待结果

    #GetInfraredDistanceResponse.distance : 红外距离

    """
    # response: GetInfraredDistanceResponse
    (resultType, response) = await GetInfraredDistance().execute()

    print(f'test_get_infrared_distance result: {response}')


# 测试获取目前机器人内注册的人脸个数
async def test_get_register_faces():
    """测试获取已注册的人脸信息

    获取在机器人中已注册的所有人脸信息,并等待结果

    #GetRegisterFacesResponse.faceInfos : [FaceInfoResponse] 人脸信息数组

        #FaceInfoResponse.id : 人脸id

        #FaceInfoResponse.name : 姓名

        #FaceInfoResponse.gender : 性别

        #FaceInfoResponse.age : 年龄

    #GetRegisterFacesResponse.isSuccess : 是否成功

    #GetRegisterFacesResponse.resultCode : 返回码

    Returns:

    """
    # reponse : GetRegisterFacesResponse
    (resultType, response) = await GetRegisterFaces().execute()

    print(f'test_get_register_faces result: {response}')

2.2.3 机器人表现力

# 测试让眼睛演示个表情
async def test_play_expression():
    """测试播放表情

    让机器人播放一个名为"codemao1"的内置表情,并等待回复结果

    #PlayExpressionResponse.isSuccess : 是否成功

    #PlayExpressionResponse.resultCode : 返回码

    """
    block: PlayExpression = PlayExpression(express_name="codemao1")
    # response: PlayExpressionResponse
    (resultType, response) = await block.execute()

    print(f'test_play_expression result: {response}')


# 测试, 让机器人跳舞/停止跳舞
async def test_control_behavior():
    """测试控制表现力

    让机器人开始跳一个名为"dance_0004"的舞蹈,并等待回复结果

    """
    # control_type: START, STOP
    block: StartBehavior = StartBehavior(name="dance_0004")
    # response ControlBehaviorResponse
    (resultType, response) = await block.execute()

    print(f'test_control_behavior result: {response}')
    print(
        'resultCode = {0}, error = {1}'.format(response.resultCode, errors.get_express_error_str(response.resultCode)))


# 测试, 设置嘴巴灯颜色为绿色 常亮
async def test_set_mouth_lamp():
    # mode: 嘴巴灯模式,0:普通模式,1:呼吸模式

    # color: 嘴巴灯颜色,1:红色,2:绿色 

    # duration: 持续时间,单位为毫秒,-1表示常亮

    # breath_duration: 闪烁一次时长,单位为毫秒

    """测试设置嘴巴灯

    设置机器人嘴巴灯正常模式、绿色、常亮3s,并等待回复结果

    当mode=NORMAL时,duration参数起作用,表示常亮多久时间

    当mode=BREATH,breath_duration参数起作用,表示多久呼吸一次

    #SetMouthLampResponse.isSuccess : 是否成功

    #SetMouthLampResponse.resultCode : 返回码

    """

    block: SetMouthLamp = SetMouthLamp(color=MouthLampColor.GREEN, mode=MouthLampMode.NORMAL,
                                       duration=3000, breath_duration=1000)
    # response:SetMouthLampResponse
    (resultType, response) = await block.execute()

    print(f'test_set_mouth_lamp result: {response}')


# 测试,开关嘴巴灯
async def test_control_mouth_lamp():
    """测试控制嘴巴灯

    让机器人嘴巴灯关闭,并等待结果

    #ControlMouthResponse.isSuccess : 是否成功

    #ControlMouthResponse.resultCode : 返回码

    """
    # is_open: True,False
    # response :ControlMouthResponse
    (resultType, response) = await ControlMouthLamp(is_open=False).execute()

    print(f'test_control_mouth_lamp result: {response}')

2.2.4 第三方内容接口

  • 查询百科, 翻译

# 测试, 查询wiki
async def test_query_wiki():
    """查询百科demo

    查询百科,查询内容"优必选",并等待结果,机器人播报查询结果

    #WikiResponse.isSuccess : 是否成功

    #WikiResponse.resultCode : 返回码

    """
    # query:查询关键字
    block: QueryWiKi = QueryWiKi(query='优必选')
    # response : WikiResponse
    (resultType, response) = await block.execute()

    print(f'test_query_wiki result: {response}')


# 测试翻译接口
async def test_start_translate():
    """翻译demo

    使用百度翻译,把"张学友",从中文翻译成英文,并等待结果,机器人播报翻译结果

    #TranslateResponse.isSuccess : 是否成功

    #TranslateResponse.resultCode : 返回码

    # query: 关键字

    # from_lan: 源语言

    # to_lan: 目标语言
    
    # platform: BAIDU, GOOGLE, TENCENT

    """

    block: StartTranslate = StartTranslate(query="张学友", from_lan=LanType.CN, to_lan=LanType.EN)
    # response: TranslateResponse
    (resultType, response) = await block.execute()

    print(f'test_start_translate result: {response}')

2.2.5 运动控制

# 测试, 执行一个动作文件
async def test_play_action():
    """执行一个动作demo

    控制机器人执行一个指定名称的本地(内置/自定义)动作,并等待执行结果回复

    动作名称可用GetActionList获取

    #PlayActionResponse.isSuccess : 是否成功

    #PlayActionResponse.resultCode : 返回码

    """
    # action_name: 动作文件名, 可以通过GetActionList获取机器人支持的动作
    block: PlayAction = PlayAction(action_name='018')
    # response: PlayActionResponse
    (resultType, response) = await block.execute()

    print(f'test_play_action result:{response}')


# 测试, 控制机器人,向前/后/左/右 移动
async def test_move_robot():
    """控制机器人移动demo

    控制机器人往左(LEFTWARD)移动10步,并等待执行结果

    #MoveRobotResponse.isSuccess : 是否成功 

    #MoveRobotResponse.code : 返回码

    """
    # step: 移动几步
    # direction: 方向,枚举类型
    block: MoveRobot = MoveRobot(step=10, direction=MoveRobotDirection.LEFTWARD)
    # response : MoveRobotResponse
    (resultType, response) = await block.execute()

    print(f'test_move_robot result:{response}')


# 测试, 获取支持的动作文件列表
async def test_get_action_list():
    """获取动作列表demo

    获取机器人内置的动作列表,等待回复结果

    """
    # action_type: INNER 是指机器人内置的不可修改的动作文件, CUSTOM 是放置在sdcard/customize/action目录下可被开发者修改的动作
    block: GetActionList = GetActionList(action_type=RobotActionType.INNER)
    # response:GetActionListResponse
    (resultType, response) = await block.execute()

    print(f'test_get_action_list result:{response}')

2.2.6 事件监听类接口

2.2.6.1 触摸监听
# 测试, 触摸监听
async def test_ObserveHeadRacket():
    # 创建监听
    observer: ObserveHeadRacket = ObserveHeadRacket()

    # 事件处理器
    # ObserveHeadRacketResponse.type:
    # @enum.unique
    # class HeadRacketType(enum.Enum):
    #     SINGLE_CLICK = 1  # 单击
    #     LONG_PRESS = 2  # 长按
    #     DOUBLE_CLICK = 3  # 双击
    def handler(msg: ObserveHeadRacketResponse):
        # 监听到一个事件后,停止监听,
        observer.stop()
        print("{0}".format(str(msg.type)))
        # 执行个舞动
        asyncio.create_task(__dance())

    observer.set_handler(handler)
    #启动
    observer.start()
    await asyncio.sleep(0)


async def __dance():
    await ControlBehavior(name="dance_0002").execute()
    # 结束event_loop
    asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)

# 程序入口
if __name__ == '__main__':
    device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
    if device:
        asyncio.get_event_loop().run_until_complete(test_connect(device))
        asyncio.get_event_loop().run_until_complete(test_start_run_program())
        asyncio.get_event_loop().run_until_complete(test_ObserveHeadRacket())
        asyncio.get_event_loop().run_forever() # 定义了事件监听对象,必须让event_loop.run_forver
        asyncio.get_event_loop().run_until_complete(shutdown())
2.2.6.2 语音识别监听
async def __tts():
    block: PlayTTS = PlayTTS(text="你好, 我是悟空, 啦里啦,啦里啦")
    response = await block.execute()
    print(f'tes_play_tts: {response}')

# 测试监听语音识别
async def test_speech_recognise():
    # 语音监听对象
    observe: ObserveSpeechRecognise = ObserveSpeechRecognise()

    # 处理器
    # SpeechRecogniseResponse.text
    # SpeechRecogniseResponse.isSuccess
    # SpeechRecogniseResponse.resultCode
    def handler(msg: SpeechRecogniseResponse):
        print(f'=======handle speech recognise:{msg}')
        print("{0}".format(str(msg.text)))
        if str(msg.text) == "悟空":
            #监听到"悟空", tts打个招呼
            asyncio.create_task(__tts())

        elif str(msg.text) == "结束":
            #监听到结束, 停止监听
            observe.stop()
            #结束event_loop
            asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)

    observe.set_handler(handler)
    #启动
    observe.start()
    await asyncio.sleep(0)


if __name__ == '__main__':
    device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
    if device:
        asyncio.get_event_loop().run_until_complete(test_connect(device))
        asyncio.get_event_loop().run_until_complete(test_start_run_program())
        asyncio.get_event_loop().run_until_complete(test_speech_recognise())
        asyncio.get_event_loop().run_forever()  # 定义了事件监听对象,必须让event_loop.run_forver
        asyncio.get_event_loop().run_until_complete(shutdown())
2.2.6.3 红外监听
async def test_ObserveInfraredDistance():
    #红外监听对象
    observer: ObserveInfraredDistance = ObserveInfraredDistance()

    # 定义处理器
    # ObserveInfraredDistanceResponse.distance
    def handler(msg: ObserveInfraredDistanceResponse):
        print("distance = {0}".format(str(msg.distance)))
        if msg.distance < 500:
            observer.stop()
            asyncio.create_task(__tts())

    observer.set_handler(handler)
    observer.start()
    await asyncio.sleep(0)


async def __tts():
    result = await PlayTTS(text="是不是有人在啊, 你是谁啊").execute()
    print(f"tts over {result}")
    asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)


if __name__ == '__main__':
    device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
    if device:
        asyncio.get_event_loop().run_until_complete(test_connect(device))
        asyncio.get_event_loop().run_until_complete(test_start_run_program())
        asyncio.get_event_loop().run_until_complete(test_ObserveInfraredDistance())
        asyncio.get_event_loop().run_forever()  # 定义了事件监听对象,必须让event_loop.run_forver
        asyncio.get_event_loop().run_until_complete(shutdown())
2.2.6.4 人脸识别/检测监听
#测试, 检测到注册的人脸,则上报事件, 如果陌生人,返回"stranger"
async def test_ObserveFaceRecognise():
    observer: ObserveFaceRecognise = ObserveFaceRecognise()

    
    # FaceRecogniseTaskResponse.faceInfos: [FaceInfoResponse]
    # FaceInfoResponse.id, FaceInfoResponse.name,FaceInfoResponse.gender,FaceInfoResponse.age
    # FaceRecogniseTaskResponse.isSuccess
    # FaceRecogniseTaskResponse.resultCode
    def handler(msg: FaceRecogniseTaskResponse):
        print(f"{msg}")
        if msg.isSuccess and msg.faceInfos:
            observer.stop()
            asyncio.create_task(__tts(msg.faceInfos[0].name))

    observer.set_handler(handler)
    observer.start()
    await asyncio.sleep(0)


async def __tts(name):
    await PlayTTS(text=f'你好, {name}').execute()
    asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)

# 人脸检测,检测到人脸,则上报事件
async def test_ObserveFaceDetect():
    observer: ObserveFaceDetect = ObserveFaceDetect()

    # FaceDetectTaskResponse.count
    # FaceDetectTaskResponse.isSuccess
    # FaceDetectTaskResponse.resultCode
    def handler(msg: FaceDetectTaskResponse):
        print(f"{msg}")
        if msg.isSuccess and msg.count:
            observer.stop()
            asyncio.create_task(__tts(msg.count))

    observer.set_handler(handler)
    observer.start()
    await asyncio.sleep(0)


async def __tts(count):
    await PlayTTS(text=f'在我面前好像有{count}个人').execute()
    asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)    
2.2.6.5 姿态检测监听
# 测试,姿态检测
async def test_ObserveRobotPosture():
    #创建监听对象
    observer: ObserveRobotPosture = ObserveRobotPosture()
    # 事件处理器
    # ObserveFallClimbResponse.status
    #     STAND = 1; //站立
    #     SPLITS_LEFT = 2; //左弓步
    #     SPLITS_RIGHT = 3; //右弓步
    #     SITDOWN = 4; //坐下
    #     SQUATDOWN = 5; //蹲下
    #     KNEELING = 6; //跪下
    #     LYING = 7; //侧躺
    #     LYINGDOWN = 8; //平躺
    #     SPLITS_LEFT_1 = 9; //左劈叉
    #     SPLITS_RIGHT_2 = 10;//右劈叉
    #     BEND = 11;//弯腰
    def handler(msg: ObserveFallClimbResponse):
        print("{0}".format(msg))
        if msg.status == 8 or msg.status == 7:
            observer.stop()
            asyncio.create_task(__tts())

    observer.set_handler(handler)
    #start
    observer.start()
    await asyncio.sleep(0)


async def __tts():
    await PlayTTS(text="我摔倒了").execute()
    asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)


if __name__ == '__main__':
    device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
    if device:
        asyncio.get_event_loop().run_until_complete(test_connect(device))
        asyncio.get_event_loop().run_until_complete(test_start_run_program())
        asyncio.get_event_loop().run_until_complete(test_ObserveRobotPosture())
        asyncio.get_event_loop().run_forever() # 定义了事件监听对象,必须让event_loop.run_forver
        asyncio.get_event_loop().run_until_complete(shutdown())

三. 命令行工具

因为机器人里内置了一个Python执行环境(机器人支持python3.8), 通过pip命令(Python包管理工具), 即可将开发者开发的库安装到机器人本地。

开发者可以在pc上调试完自己的Python程序后,利用sdk的命令行工具,将程序安装到机器人内并触发执行。命令行工具可以做什么:

  • 利用命令行工具, 用户可以将编写好的py程序打包成一个pypi压缩包, 发送给机器人安装。

  • 利用命令行工具, 用户可以增,删,改,查机器人内pypi安装包。

  • 利用命令行工具, 用户还触发py程序在机器人端执行。

另外, 这些工具有对应的Python接口支持, 通过import mini.pkg_tool as Tool 可导入Python接口。

支持命令行工具的AlphaMiniSDK版本,需要大于等于1.0.0, 同时机器人也需要满足版本要求:

  • 悟空教育版需大于等于v1.4

  • 悟空标准版需大于2.0.5

开发者将工程打包成.whl压缩文件, 需要在工程的根目录下编写setup.py配置文件, 关于如何配置setup.py, 可以参考demo中setup.py文件的配置。

3.1 打包

3.1.1 配置环境

首先确保pc已安装最新setuptools 和 wheel和twine ,下面是安装/更新命令

    python -m pip install --user --upgrade setuptools wheel twine

3.1.2 setup.py

在你工程目录下编写setup.py, 参考demo:

import setuptools
setuptools.setup(
    name="tts_demo", # 你的库(程序)名字
    version="0.0.2", # 版本号
    author='Gino Deng', # 开发者名,
    author_email='jingjing.deng@ubtrobot.com', # 邮箱,可选
    description="demo with mini_sdk", #短描述
    long_description='demo with mini_sdk,xxxxxxx', #长描述, 可选
    long_description_content_type="text/markdown",  #长描述内容的格式, 可选
    license="GPLv3", # 版权
    packages=setuptools.find_packages(), #*注意*:这里默认取工程目录下所有包含__init__.py的子目录
    classifiers=[  
        "Programming Language :: Python :: 3",
        "Programming Language :: Python :: 3.6",
        "Programming Language :: Python :: 3.7",
        "Programming Language :: Python :: 3.8",
        "License :: OSI Approved :: MIT License",
        "Operating System :: OS Independent",
    ],
    install_requires=[ # 依赖模块
        'alphamini',
    ],
    entry_points={ 
        'console_scripts': [ # 命令行入口定义
            'tts_demo = play_tts.test_playTTS:main'
        ],
    },
)

setup.py各参数介绍如下:

--name 包名称
--version (-V) 包版本
--author 程序的作者
--author_email 程序的作者的邮箱地址
--maintainer 维护者
--maintainer_email 维护者的邮箱地址
--url 程序的官网地址
--license 程序的授权信息
--description 程序的简单描述
--long_description 程序的详细描述
--platforms 程序适用的软件平台列表
--classifiers 程序的所属分类列表
--keywords 程序的关键字列表
--packages 需要处理的包目录(包含__init__.py的文件夹) 
--py_modules 需要打包的Python文件列表
--download_url 程序的下载地址
--cmdclass 
--data_files 打包时需要打包的数据文件,如图片,配置文件等
--scripts 安装时需要执行的脚步列表
--package_dir 告诉setuptools哪些目录下的文件被映射到哪个源码包。一个例子:package_dir = {'': 'lib'},表示“root package”中的模块都在lib 目录中。
--requires 定义依赖哪些模块 
--provides定义可以为哪些模块提供依赖 
--find_packages() 对于简单工程来说,手动增加packages参数很容易,刚刚我们用到了这个函数,它默认在和setup.py同一目录下搜索各个含有 __init__.py的包。

                          其实我们可以将包统一放在一个src目录中,另外,这个包内可能还有aaa.txt文件和data数据文件夹。另外,也可以排除一些特定的包

                          find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"])

--install_requires = ["requests"] 需要安装的依赖包
--entry_points 动态发现服务和插件,下面详细讲

下列entry_points中: console_scripts 指明了命令行工具的名称;在tts_demo = play_tts.test_playTTS:main中,等号前面指明了工具包的名称,等号后面的内容指明了程序的入口地址。

entry_points={ 
        'console_scripts': [ # 命令行入口定义
            'tts_demo = play_tts.test_playTTS:main'
        ],
    }

这里可以有多条记录,这样一个项目就可以制作多个命令行工具了,比如:

#... 省略其他
entry_points = {
         'console_scripts': [
             'foo = demo:test',
             'bar = demo:test',
         ]}

3.1.3 打包

在你的工程目录下编写好setup.py后, 就可以通过命令行工具来打包了

  • 方式一: 通过Python命令,调用setup.py打包, 在工程目录下运行如下shell命令:

        python setup.py sdist bdist_wheel
  • 方式二: 通过AlphaMini内的命令行工具

        setup_py_pkg "你的py工程目录"

当然,也可以使用AlphaMini内Tool包

  • 方式三:Python代码打包

        import mini.pkg_tool as Tool
        # 将当前目录下的tts_demo工程打包成py wheel, 返回值为打包生成后的whl路径
        pkg_path = Tool.setup_py_pkg("tts_demo")
        print(f'{pkg_path}')
  • 以tts_demo为例, 打包成功后,生成了一个dist目录, *.whl文件就是可以安装的文件, 如下图:

https://github.com/marklogg/images/blob/master/%E4%BC%81%E4%B8%9A%E5%BE%AE%E4%BF%A1%E6%88%AA%E5%9B%BE_15934215208073.png?raw=true

3.2 安装

将一个打包好的*.whl程序安装到指定机器人里,需要确保机器人和PC在同一个局域网内, 因为需要将.whl文件上传到机器人, 并安装在机器人内(让机器人执行pip install xx.whl指令), 所以要指定机器人序列号, 可以填机器人序列号后5位;

  • 方式一:通过AlphaMini命令行安装, –type 指定机器人类型, mini–表示悟空标准版机器人, dedu–表示悟空国内教育版本机器人

        install_py_pkg '打包好的.whl路径' "机器人序列号" --type "dedu"
  • 方式二:通过Python代码

        import mini.pkg_tool as Tool
        from mini import mini_sdk as MiniSdk
        # 设置机器人类型
        MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
        # 安装当前目录下simple_socket-0.0.2-py3-none-any.whl文件
        Tool.install_py_pkg(package_path="simple_socket-0.0.2-py3-none-any.whl", robot_id="机器人序列号")

3.3 查询

可以查询安装到机器人内的py程序信息, 即发送pip list 命令给机器人执行, 并返回命令结果, 需要确保机器人和PC在同一个局域网内, 因为需要查询机器人内的安装包信息, 所以要指定机器人序列号, 可以填机器人序列号后5位

3.3.1 所有py模块

列出机器人内模块列表

  • 方式一: 通过命令行列出

        list_py_pkg "0090" --type "dedu"
  • 方式二: 通过Python代码

        import mini.pkg_tool as Tool
        from mini import mini_sdk as MiniSdk
        # 设置机器人类型
        MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
        # 列出机器人内py包
        list_info = Tool.list_py_pkg(robot_id="0090")
        print(f'{list_info}')

3.3.2 指定py模块

查询指定模块详细信息, 即发送pip show xx 命令给机器人执行, 并返回命令结果, 在下面的例子中, xx代表 tts_demo; 需要确保机器人和PC在同一个局域网内, 因为需要查询机器人内的安装包信息, 所以要指定机器人序列号, 可以填机器人序列号后5位

  • 方式一: 通过命令行

         query_py_pkg "tts_demo" "0090" --type "dedu"
  • 方式二: 通过Python代码

        import mini.pkg_tool as Tool
        from mini import mini_sdk as MiniSdk
        # 设置机器人类型
        MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
        # 查询tts_demo包信息
        info = Tool.query_py_pkg(pkg_name="tts_demo", robot_id="0090")
        print(f'{info}')

3.4 卸载

可以卸载安装到机器人内的程序, 即发送pip uninstall -y xx 命令给机器人执行, 并返回命令结果, 在下面的例子中, xx代表 tts_demo; 需要确保机器人和PC在同一个局域网内, 因为需要卸载机器人内的安装包信息, 所以要指定机器人序列号, 可以填机器人序列号后5位

  • 方式一: 通过命令行

         uninstall_py_pkg "tts_demo" "0090" --type "dedu"
  • 方式二: 通过Python代码

        import mini.pkg_tool as Tool
        from mini import mini_sdk as MiniSdk
        # 设置机器人类型
        MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
        # 卸载tts_demo
        Tool.uninstall_py_pkg(pkg_name="tts_demo", robot_id="0090")

3.5 运行

可以在机器人内运行py程序, 即发送xx 命令给机器人执行, 并返回命令结果, 所以, 机器人内需要有xx命令, 在下面的例子中, xx代表 tts_demo; 需要确保机器人和PC在同一个局域网内, 因为需要让机器人执行命令, 所以要指定机器人序列号, 可以填机器人序列号后5位

  • 方式一: 通过命令行

         # 触发tts_demo程序运行
         run_py_pkg 'tts_demo' '0090' --type "dedu"
  • 方式二: 通过Python代码

        import mini.pkg_tool as Tool
        from mini import mini_sdk as MiniSdk
        # 设置机器人类型
        MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
        # 触发tts_demo脱机执行
        Tool.run_py_pkg("tts_demo", robot_id="0090")

3.6 执行shell命令

通过前面的描述, 可以理解为机器人其实内建了一个终端环境, Python是其下的一个可执行程序, 发送pip install alphamini 命令给机器人, 安装/卸载第三方Python库

  • 方式一: 通过命令行, 安装/卸载 AlphaMini

        run_cmd "pip install alphamini" "0090" --type "dedu"
        run_cmd "pip uninstall -y alphamini" "0090" --type "dedu"
  • 方式二: 通过Python代码, 安装/卸载 alphamini python sdk

        import mini.pkg_tool as Tool
        from mini import mini_sdk as MiniSdk
        # 设置机器人类型
        MiniSdk.set_robot_type(MiniSdk.RobotType.DEDU)
        # 执行一个shell命令
        Tool.run_py_pkg("pip install alphamini", robot_id="0090") 
        Tool.run_py_pkg("pip uninstall -y alpahmini", robot_id="0090")

还可以发送命令pkg install -f xx给机器人, 安装其他第三方可执行程序或库, 在下面的例子中xx表示第三方库名, pkg是机器人内置的包扩展命令

    run_cmd "pkg install -f xx" "0090" --type "dedu"
    run_cmd "pkg uninstall -f xx" "0090" --type "dedu"