Source code for test.demo_touch

import asyncio

from mini.apis.api_behavior import StartBehavior
from mini.apis.api_observe import ObserveHeadRacket, HeadRacketType
from mini.dns.dns_browser import WiFiDevice
from mini.pb2.codemao_observeheadracket_pb2 import ObserveHeadRacketResponse
from test.test_connect import test_connect, shutdown
from test.test_connect import test_get_device_by_name, test_start_run_program


# 测试, 触摸监听
[docs]async def test_ObserveHeadRacket(): """Monitor head event demo Monitor the head event, and report the head type when the robot's head is tapped When the head of the robot is double-clicked, stop monitoring and dance a dance # ObserveHeadRacketResponse.type: # class HeadRacketType(enum.Enum): # SINGLE_CLICK = 1 # Click # LONG_PRESS = 2 # Long press # DOUBLE_CLICK = 3 # Double click """ # 创建监听 observer: ObserveHeadRacket = ObserveHeadRacket() # 事件处理器 # ObserveHeadRacketResponse.type: # @enum.unique # class HeadRacketType(enum.Enum): # SINGLE_CLICK = 1 # Click # LONG_PRESS = 2 # Long press # DOUBLE_CLICK = 3 # Double click def handler(msg: ObserveHeadRacketResponse): # After listening to an event, stop listening, print("{0}".format(str(msg.type))) if msg.type == HeadRacketType.DOUBLE_CLICK.value: observer.stop() # 执行个舞动 asyncio.create_task(__dance()) observer.set_handler(handler) # 启动 observer.start() await asyncio.sleep(0)
async def __dance(): await StartBehavior(name="dance_0002").execute() # 结束event_loop asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop) if __name__ == '__main__': device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name()) if device: asyncio.get_event_loop().run_until_complete(test_connect(device)) asyncio.get_event_loop().run_until_complete(test_start_run_program()) asyncio.get_event_loop().run_until_complete(test_ObserveHeadRacket()) # The event listener object is defined, and event_loop.run_forver() must be asyncio.get_event_loop().run_forever() asyncio.get_event_loop().run_until_complete(shutdown())