Source code for test.demo_infrared

import asyncio

from mini.apis.api_observe import ObserveInfraredDistance
from mini.apis.api_sound import StartPlayTTS
from mini.dns.dns_browser import WiFiDevice
from mini.pb2.codemao_observeinfrareddistance_pb2 import ObserveInfraredDistanceResponse
from test.test_connect import test_connect, shutdown
from test.test_connect import test_get_device_by_name, test_start_run_program


[docs]async def test_ObserveInfraredDistance(): """Monitor infrared distance demo Monitor infrared distance events, the robot reports the detected infrared distance to the nearest obstacle in front of you When the returned infrared distance is less than 500, stop monitoring and broadcast "Detected infrared distance xxx" (xxx is the infrared distance value) """ # 红外监听对象 observer: ObserveInfraredDistance = ObserveInfraredDistance() # 定义处理器 # ObserveInfraredDistanceResponse.distance def handler(msg: ObserveInfraredDistanceResponse): print("distance = {0}".format(str(msg.distance))) if msg.distance < 500: observer.stop() asyncio.create_task(__tts(msg.distance)) observer.set_handler(handler) observer.start() await asyncio.sleep(0)
async def __tts(distance: int): result = await StartPlayTTS(text=f"Detected infrared distance {distance}").execute() print(f"tts over {result}") asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop) if __name__ == '__main__': device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name()) if device: asyncio.get_event_loop().run_until_complete(test_connect(device)) asyncio.get_event_loop().run_until_complete(test_start_run_program()) asyncio.get_event_loop().run_until_complete(test_ObserveInfraredDistance()) # The event listener object is defined, and event_loop.run_forver() must be asyncio.get_event_loop().run_forever() asyncio.get_event_loop().run_until_complete(shutdown())