test.demo_touch 源代码

import asyncio

from mini.apis.api_behavior import StartBehavior
from mini.apis.api_observe import ObserveHeadRacket, HeadRacketType
from mini.dns.dns_browser import WiFiDevice
from mini.pb2.codemao_observeheadracket_pb2 import ObserveHeadRacketResponse
from test.test_connect import test_connect, shutdown
from test.test_connect import test_get_device_by_name, test_start_run_program


# 测试, 触摸监听
[文档]async def test_ObserveHeadRacket(): """监听拍头事件demo 监听拍头事件,当机器人头部被拍击时,上报拍头类型 当机器人头部被双击时,停止监听,并跳一个舞蹈 # ObserveHeadRacketResponse.type: # class HeadRacketType(enum.Enum): # SINGLE_CLICK = 1 # 单击 # LONG_PRESS = 2 # 长按 # DOUBLE_CLICK = 3 # 双击 """ # 创建监听 observer: ObserveHeadRacket = ObserveHeadRacket() # 事件处理器 # ObserveHeadRacketResponse.type: # @enum.unique # class HeadRacketType(enum.Enum): # SINGLE_CLICK = 1 # 单击 # LONG_PRESS = 2 # 长按 # DOUBLE_CLICK = 3 # 双击 def handler(msg: ObserveHeadRacketResponse): # 监听到一个事件后,停止监听, print("{0}".format(str(msg.type))) if msg.type == HeadRacketType.DOUBLE_CLICK.value: observer.stop() # 执行个舞动 asyncio.create_task(__dance()) observer.set_handler(handler) # 启动 observer.start() await asyncio.sleep(0)
async def __dance(): await StartBehavior(name="dance_0002").execute() # 结束event_loop asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop) if __name__ == '__main__': device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name()) if device: asyncio.get_event_loop().run_until_complete(test_connect(device)) asyncio.get_event_loop().run_until_complete(test_start_run_program()) asyncio.get_event_loop().run_until_complete(test_ObserveHeadRacket()) # 定义了事件监听对象,必须让event_loop.run_forver() asyncio.get_event_loop().run_forever() asyncio.get_event_loop().run_until_complete(shutdown())