test.demo_face_detect 源代码

import asyncio

from mini.apis.api_observe import ObserveFaceDetect
from mini.apis.api_sound import StartPlayTTS
from mini.dns.dns_browser import WiFiDevice
from mini.pb2.codemao_facedetecttask_pb2 import FaceDetectTaskResponse
from test.test_connect import test_connect, shutdown
from test.test_connect import test_get_device_by_name, test_start_run_program


[文档]async def test_ObserveFaceDetect(): """人脸个数检测demo 人脸个数检测,检测到人脸,则上报事件 当检测到人脸个数大于等于1个时,停止监听,并播报"在我面前好像有xx个人脸"(xx为人脸个数) """ observer: ObserveFaceDetect = ObserveFaceDetect() # FaceDetectTaskResponse.count # FaceDetectTaskResponse.isSuccess # FaceDetectTaskResponse.resultCode def handler(msg: FaceDetectTaskResponse): print(f"{msg}") if msg.isSuccess and msg.count: observer.stop() asyncio.create_task(__tts(msg.count)) observer.set_handler(handler) observer.start() await asyncio.sleep(0)
async def __tts(count): await StartPlayTTS(text=f'在我面前好像有{count}个人').execute() asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop) if __name__ == '__main__': device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name()) if device: asyncio.get_event_loop().run_until_complete(test_connect(device)) asyncio.get_event_loop().run_until_complete(test_start_run_program()) asyncio.get_event_loop().run_until_complete(test_ObserveFaceDetect()) # 定义了事件监听对象,必须让event_loop.run_forver() asyncio.get_event_loop().run_forever() asyncio.get_event_loop().run_until_complete(shutdown())