import asyncio
from mini.apis.api_observe import ObserveFaceDetect
from mini.apis.api_sound import StartPlayTTS
from mini.dns.dns_browser import WiFiDevice
from mini.pb2.codemao_facedetecttask_pb2 import FaceDetectTaskResponse
from test.test_connect import test_connect, shutdown
from test.test_connect import test_get_device_by_name, test_start_run_program
[文档]async def test_ObserveFaceDetect():
"""人脸个数检测demo
人脸个数检测,检测到人脸,则上报事件
当检测到人脸个数大于等于1个时,停止监听,并播报"在我面前好像有xx个人脸"(xx为人脸个数)
"""
observer: ObserveFaceDetect = ObserveFaceDetect()
# FaceDetectTaskResponse.count
# FaceDetectTaskResponse.isSuccess
# FaceDetectTaskResponse.resultCode
def handler(msg: FaceDetectTaskResponse):
print(f"{msg}")
if msg.isSuccess and msg.count:
observer.stop()
asyncio.create_task(__tts(msg.count))
observer.set_handler(handler)
observer.start()
await asyncio.sleep(0)
async def __tts(count):
await StartPlayTTS(text=f'在我面前好像有{count}个人').execute()
asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
if __name__ == '__main__':
device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
if device:
asyncio.get_event_loop().run_until_complete(test_connect(device))
asyncio.get_event_loop().run_until_complete(test_start_run_program())
asyncio.get_event_loop().run_until_complete(test_ObserveFaceDetect())
# 定义了事件监听对象,必须让event_loop.run_forver()
asyncio.get_event_loop().run_forever()
asyncio.get_event_loop().run_until_complete(shutdown())