3.4.模块详解
第一层:音频采集模块
模块文件
socket_connector.py- Socket 连接管理socket_audio_provider.py- 音频接收与解析tk_audio_publisher.py- 音频发布到 ROS
数据流
RK3588s 麦克风阵列
↓
Socket 服务器 (IP: 10.42.0.127, Port: 9080)
↓
SocketConnector 建立连接
↓
SocketAudioProvider 接收原始数据
↓
SocketAudioPublisher 发布到 ROS 话题
关键概念:VAD(语音活动检测)
当接收音频数据时,需要识别什么时候是说话,什么时候是静音:
VAD 状态码:
0 = 静音
1 = 开始说话(新的语句开始)
2 = 持续说话
3 = 结束说话(语句结束)
实现逻辑:
def keep_receiving_publish_audio():
while True:
audio_data, vad = socket_provider.read()
if vad == 1: # 开始说话
audio_buffer.clear() # 清空旧数据
audio_buffer.extend(audio_data)
elif vad == 2: # 持续说话
audio_buffer.extend(audio_data) # 继续追加
elif vad == 3: # 结束说话
audio_buffer.extend(audio_data)
# 整句音频已完成,发布到话题
publish_sentence(audio_buffer)
audio_buffer.clear()
代码详解:SocketConnector
class SocketConnector:
def __init__(self, ip: str, port: int):
# 创建 TCP Socket
self.client_socket = socket(AF_INET, SOCK_STREAM)
# 连接到服务器
self.client_socket.connect((ip, port))
# 设置接收超时 3 秒
self.client_socket.settimeout(3.0)
def receive_full_data(self, expected_length):
"""
接收完整数据块
由于 TCP 是流协议,可能分多次接收数据:
比如要接收 1000 字节,可能第一次收到 500,第二次收到 500
"""
received_data = bytearray()
while len(received_data) < expected_length:
# 一次最多接收 4096 字节
chunk = self.client_socket.recv(4096)
if not chunk: # 服务器关闭连接
return None
received_data.extend(chunk)
return bytes(received_data)
代码详解:SocketAudioProvider
这是从RK3588s接收音频数据的详细代码,为何这样接收?天工行者搭载的是讯飞的RK3588 AIUI多模态开发套件,其音频传输协议可参考,
其中明确说明了,0~8个字节是请求头,对应下面的 header = self.receive_full_data(9),第9个字节是vad字段,也就是下面代码中的vad = body[0],第17到n个字节就是详细的音频数据,对应下面代码中audio_data = body[8:-1]。
class SocketAudioProvider(SocketConnector):
def read(self):
"""解析音频包协议"""
# 1. 接收 9 字节头部
header = self.receive_full_data(9)
# 格式: [同步字, 用户ID, 消息类型, 消息长度, 消息ID]
# 二进制格式: 1字节 + 1字节 + 1字节 + 2字节 + 4字节
sync_head, user_id, msg_type, msg_length, msg_id = struct.unpack(
'<BBBIH', # < 表示小端序,B=字节,I=整数,H=短整数
header
)
# 2. 校验头部
if sync_head != 0xa5 or user_id != 0x01:
return None # 无效包
# 3. 接收消息体
body = self.receive_full_data(msg_length + 1)
# 格式: [VAD, 通道号, ...] + CRC校验字节
vad = body[0] # VAD 状态
audio_data = body[8:-1] # 提取音频数据
return audio_data, vad
代码详解:SocketAudioPublisher
class SocketAudioPublisher(Node):
def __init__(self):
super().__init__('tk_audio_publisher')
# 创建两个发布者
self.publisher_ = self.create_publisher(
AudioFrame, # 消息类型
'audio_frames', # 话题名
10 # 队列大小
)
self.sentence_publisher_ = self.create_publisher(
AudioFrame,
'audio_sentence_frames',
10
)
# 创建 Socket 提供者
self.audio_provider = SocketAudioProvider('10.42.0.127', 9080)
# 启动后台接收线程
self.receive_pub_thread = threading.Thread(
target=self.keep_receiving_publish_audio
)
self.receive_pub_thread.start()
def keep_receiving_publish_audio(self):
"""后台线程:持续接收并发布音频"""
audio_buffer = bytearray()
while not self.stop_event.is_set():
audio_res = self.audio_provider.read()
if audio_res is None:
continue
audio_data, vad = audio_res
if vad == 1: # 开始说话
audio_buffer.clear()
audio_buffer.extend(audio_data)
elif vad == 2: # 持续说话
audio_buffer.extend(audio_data)
elif vad == 3: # 结束说话
audio_buffer.extend(audio_data)
# 发布整句音频
msg = AudioFrame()
msg.data = bytes(audio_buffer)
self.sentence_publisher_.publish(msg)
self.get_logger().info(
f"发布了一句完整音频,长度: {len(audio_buffer)}"
)
audio_buffer.clear()
第二层:语音识别模块(ASR)
模块文件
funasr_client.py- Funasr WebSocket 客户端tk_asr_text_publisher.py- ASR 文本发布
数据流
audio_sentence_frames 话题
↓
tk_asr_text_publisher 订阅
↓
funasr_client WebSocket 发送音频
↓
x86 Funasr 服务识别
↓
WebSocket 返回识别结果
↓
asr_sentence 话题发布
核心概念:WebSocket 流式识别
// 1. 连接到 WebSocket 服务器
ws = websocket.connect("wss://192.168.41.1:10097")
// 2. 发送握手消息(告诉服务器识别参数)
ws.send({
"mode": "offline", // 离线模式
"sample_rate": 16000, // 采样率
"chunk_size": [5,10,5] // 每次发送的音频块大小
})
// 3. 分块发送音频(流式)
for chunk in audio_chunks:
ws.send_binary(chunk)
// 4. 接收识别结果(也是流式)
for result in ws.recv():
text = result["text"]
print(f"识别结果: {text}")
代码详解:FunASRClient
class FunASRClient:
def __init__(self, host="192.168.41.1", port=10097):
self.host = host
self.port = port
self.ssl_enabled = True # 使用 WSS 加密连接
# 识别参数配置
self.sample_rate = 16000
self.chunk_size = [5, 10, 5] # 音频块大小(毫秒)
self.chunk_interval = 10 # 块间隔
# WebSocket 连接(通过全局异步循环)
self.loop = GlobalAsyncLoop.get_loop()
self.connect()
async def _connect(self):
"""建立 WebSocket 连接"""
# 构造 WebSocket URI
uri = f"wss://{self.host}:{self.port}"
# SSL 上下文(关闭证书验证)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# 连接
return await websockets.connect(
uri,
subprotocols=["binary"],
ping_interval=None, # 禁用心跳
ssl=ssl_context
)
def to_text(self, audio_bytes: bytes) -> str:
"""
同步接口:输入音频字节,输出识别文本
"""
# 在异步循环中执行异步函数
future = asyncio.run_coroutine_threadsafe(
self._async_to_text(audio_bytes),
self.loop
)
return future.result(timeout=30)
async def _async_to_text(self, audio_bytes: bytes) -> str:
"""异步识别实现"""
ws = await self.try_init_connection()
# 1. 发送握手消息
start_signal = {
"mode": "offline",
"sample_rate": self.sample_rate,
"chunk_size": self.chunk_size,
"chunk_interval": self.chunk_interval,
}
await ws.send(json.dumps(start_signal))
# 2. 分块发送音频
chunk_bytes = (self.sample_rate / 1000) * 2 # 每毫秒字节数
for i in range(0, len(audio_bytes), int(chunk_bytes * 10)):
chunk = audio_bytes[i:i+int(chunk_bytes*10)]
await ws.send(chunk) # 发送二进制数据
await asyncio.sleep(0.01)
# 3. 发送结束信号
await ws.send(json.dumps({"mode": "offline"}))
# 4. 接收识别结果
result_text = ""
async for message in ws:
data = json.loads(message)
text = data.get("text", "")
if text:
result_text = text
if data.get("done"):
break
return result_text
代码详解:FunASRTextPublisher
class FunASRTextPublisher(Node):
def __init__(self):
super().__init__('tk_asr_text_publisher')
# 订阅音频话题
self.subscription = self.create_subscription(
AudioFrame,
'audio_sentence_frames',
self.audio_sentence_callback,
10
)
# 创建发布者
self.asr_sentence_publisher = self.create_publisher(
String,
'/asr_sentence',
10
)
# 音频处理队列
self.audio_queue = Queue(maxsize=1)
# Funasr 客户端
self.asr_service = FunASRClient(
host="192.168.41.1",
port=10097
)
# 后台处理线程
self.process_thread = threading.Thread(
target=self.keep_process_audio
)
self.process_thread.start()
def audio_sentence_callback(self, msg: AudioFrame):
"""
订阅回调:每当收到一句完整音频时调用
"""
# 将音频放入队列
self.audio_queue.put(msg)
def keep_process_audio(self):
"""后台线程:处理音频并识别"""
while True:
try:
# 从队列取出音频(最多等待 2 秒)
msg = self.audio_queue.get(timeout=2)
# 转换为字节
audio_bytes = bytes(msg.data)
self.get_logger().info(
f"接收到 {len(audio_bytes)} 字节音频,开始识别..."
)
# 调用 Funasr 识别
asr_sentence = self.asr_service.to_text(audio_bytes)
# 过滤过短的识别结果
if len(asr_sentence) <= 3:
self.get_logger().info(f"忽略过短结果: {asr_sentence}")
continue
# 发布识别结果
text_msg = String()
text_msg.data = asr_sentence
self.asr_sentence_publisher.publish(text_msg)
self.get_logger().info(f"发布识别结果: {asr_sentence}")
except Empty:
continue # 超时继续等待
except Exception as e:
self.get_logger().error(f"识别出错: {e}")
第三层:大模型自然语言理解模块(LLMClient)
LLMClient 是一个与大语言模型(LLM)服务交互的客户端类,专门设计用于与 Ollama 服务进行通信。采用多进程架构实现流式输出,可自动判断LLM服务是运行在orin1还是orin2,以及智能句子切分。
核心架构
多进程设计
模块使用 multiprocessing 的 spawn 方式创建子进程,原因如下:
- 避免阻塞:HTTP 流式请求在子进程执行,主进程保持响应
- 可中断性:用户可以随时中断当前请求,启动新对话
- 跨平台兼容:
spawn模式在 Windows 和 Linux 上行为一致
┌─────────────────────────────────────────────────────────┐
│ 主进程 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 句子拼接 │ ← │ Queue 读取 │ ← │ 结果处理 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
↑ Queue
┌─────────────────────────────────────────────────────────┐
│ 子进程 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ OpenAI SDK │ → │ 流式迭代 │ → │ Queue 写入 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
类属性详解
配置属性
| 属性 | 默认值 | 说明 |
|---|---|---|
max_history | 1 | 历史对话轮数(每轮=用户+助手=2条消息) |
history | deque(maxlen=2) | 使用双端队列自动限制历史长度 |
sentence_endings | "。!?.!?" | 强制断句标点 |
soft_endings | ",、;;," | 软分割标点(需配合 max_len) |
max_len | 25 | 软分割触发阈值(字符数) |
服务发现属性
| 属性 | 说明 |
|---|---|
active_llm_ip | 自动检测到的 Ollama 服务 IP |
llm_endpoint | 完整的 API 端点 URL |
api_key | API 密钥(Ollama 默认为 "ollama") |
llm_model | 使用的模型名称,默认 qwen2.5:1.5b |
执行流程
1. 初始化流程 (__init__)
开始
↓
设置历史队列 (deque)
↓
调用 test_llm_ip() 探测服务
↓
配置 API 端点、密钥、模型
↓
设置系统消息
↓
完成
2. 服务发现流程 (test_llm_ip)
该方法自动检测 Ollama 服务运行位置:
遍历候选 IP: [192.168.41.3, 192.168.41.2]
↓
对每个 IP 发起 GET /api/version 请求
↓
成功? ─── 是 ───→ 返回该 IP
↓ 否
继续下一个 IP
↓
全部失败? ───→ 返回 None,后续请求将被拒绝
设计要点:
- 使用
urllib.request而非httpx,减少依赖 - 记录详细日志便于排查网络问题
3. 流式请求流程 (stream_sentence)
这是核心方法,完整流程如下:
启动阶段
- 中断上一个子进程:调用
set_interrupted(True)终止上一个子进程 - 构建消息负载:组合系统消息、历史消息、当前用户输入
- 创建子进程:启动
_stream_worker函数
流式获取阶段
循环读取 Queue
↓
收到 None? ─── 是 ───→ 子进程结束,退出循环
↓ 否
追加到 buffer 和 assistant_response
↓
尝试断句处理
↓
Queue 超时且子进程已死? ─── 是 ───→ 退出循环
↓ 否
继续读取
句子切分逻辑
整句断句:遇到 。!?.!? 立即切分
buffer = "天空是蓝色的。这是因为"
↓ 找到句号
yield "天空是蓝色的。"
buffer = "这是因为"
强行断句:buffer 超过 25 字符时,查找软分割点
buffer = "这种现象一般被称为瑞利散射,当阳光"
|←── 如果超过 25 字符 ──→|
↓ 找到逗号
yield "这种现象一般被称为瑞利散射,"
buffer = "当阳光"
子进程函数 (_stream_worker)
职责:
- 创建 OpenAI 客户端连接 Ollama 兼容 API
- 发起流式请求
- 将每个 chunk 的文本内容放入跨进程队列以便主进程获取生成的文本
- 请求结束后放入
None作为结束信号
参数说明:
base_url: API 基础 URLmodel: 模型名称messages_payload: 完整消息列表queue: 跨进程通信队列api_key: API 密钥
set_interrupted
用途:在需要的时候可强制终止正在运行的子进程,丢弃正在生成的文本
执行步骤:
- 检查
process是否存活 - 调用
terminate()发送 SIGTERM - 调用
join(timeout=1)等待退出 - 关闭并清理 Queue
- 重置属性为 None
环境变量配置
| 变量名 | 默认值 | 说明 |
|---|---|---|
LLM_KEY | "ollama" | API 密钥,OpenAI SDK要求这个参数,由于调用的是本地的LLM,所以可以忽略 |
LLM_MODEL | "qwen2.5:1.5b" | 使用的模型 |
SYS_MESSAGE | 见代码 | 系统提示词 |
第四层:文本转语音模块(TTS)
PiperProvider 是一个基于 Piper-TTS 的文本转语音(TTS)服务提供者。该模块使用 ONNX Runtime 进行模型推理,支持 CUDA 加速,并提供可选的 PyAudio 实时播放功能。
许可证声明:本文件集成 Piper-TTS,采用 GPL-3.0-or-later 许可证。
核心架构
┌─────────────────────────────────────────────────────────┐
│ PiperProvider │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PiperVoice │ → │ ONNX Runtime│ → │ CUDA 加速 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↓ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ SynthesisConfig │ │ PyAudio 播放 │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
数据流
输入文本
↓
PiperVoice.synthesize()
↓
生成音频块 (chunks)
↓
提取 16-bit PCM 字节流
↓
输出/播放
初始化流程
__init__ 执行顺序
开始
↓
读取环境变量 MODEL_DIR
↓
构建模型路径和配置路径
↓
配置 SynthesisConfig(语音参数)
↓
调用 _load_model() 加载模型
↓
记录加载耗时
↓
init_player=True?
├─ 是 → 初始化 PyAudio 播放器
└─ 否 → 跳过
↓
完成
路径配置
默认模型路径结构:
{MODEL_DIR}/
└── piper_voices/
└── zh/
├── zh_CN-huayan-medium.onnx ## 模型文件
└── zh_CN-huayan-medium.onnx.json ## 配置文件
环境变量:
| 变量名 | 默认值 | 说明 |
|---|---|---|
MODEL_DIR | /home/nvidia/ | 模型根目录 |
TORCHDYNAMO_DISABLE | "1" | 禁用 Torch 动态编译 |
DISABLE_TORCH_COMPILE | "1" | 禁用 Torch 编译优化 |
SynthesisConfig 参数详解
语音合成配置控制输出语音的特质:
| 参数 | 当前值 | 范围建议 | 说明 |
|---|---|---|---|
length_scale | 1.1 | 0.8-1.5 | 语速控制。小于1,加快,大于1,减慢 |
noise_scale | 0.4 | 0.3-1.0 | 随机噪声。低值更清晰,高值更丰富 |
noise_w_scale | 0.5 | 0.3-1.2 | 音素时长波动。影响自然度 |
normalize_audio | True | - | 音量归一化,防止爆音 |
volume | 1.0 | 0.5-2.0 | 整体音量 |
调参建议
| 目标 | 调整方向 |
|---|---|
| 更自然 | ↑ noise_w_scale (0.8-1.2) |
| 更清晰 | ↓ noise_scale (0.3-0.6) |
| 更快语速 | ↓ length_scale (0.8-1.0) |
| 音量过小 | ↑ volume 或启用 normalize_audio |
关键方法
_load_model
职责:加载 ONNX 模型到内存
执行步骤:
- 调用
PiperVoice.load()加载模型文件 - 读取 JSON 配置文件
- 启用 CUDA 加速(
use_cuda=True) - 记录模型采样率
get_audio_param
用途:使用固定测试文本合成一小段音频,从第一个 chunk 提取生成音频参数(采样率、声道数、采样位数),用来实例化 AudioPlayer 对象。
测试文本: "你好,我是天工形者,很高兴认识你。"
↓
调用 synthesize()
↓
从第一个 chunk 获取:
- sample_rate
- sample_channels
- sample_width
↓
返回 (sample_rate, channels, sample_width)
降级策略:合成失败时返回默认值 (21000, 1, 2)
tts(text) -> bytes
用途:将文本转换为 PCM 音频字节流
流程:
输入文本
↓
piper_instance.synthesize(text, syn_config)
↓
遍历 chunks,提取 audio_int16_bytes
↓
合并所有字节流
↓
返回完整音频数据 (16-bit PCM)
返回值:可直接播放的 16-bit PCM 字节流,失败时返回空字节 b''
play(waveform: bytes)
用途:通过 PyAudio 播放音频
前置条件:初始化时 init_player=True
执行:调用已配置的 audio_stream.write() 写入音频数据
第五层:音频播放模块 AudioPlayer 类
实现了异步队列式音频播放,支持多音频源管理、播放状态追踪和线程安全的并发访问。
函数详解
wait_for_audio_ready(max_wait=5)
用途:阻塞等待音频输出设备就绪(因为orin1上直接连接麦克风通常为会报错,所以多次尝试连接,可成功连接上)
AudioPlayer 类
设计理念
采用生产者-消费者模式实现异步播放:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 调用方 │ ──→ │ Queue 队列 │ ──→ │ 播放线程 │
│ play(data) │ │ (audio_data) │ │ keep_playing │
└──────────────┘ └──────────────┘ └──────────────┘
↓
┌──────────────┐
│ PyAudio 流 │
└──────────────┘
特点:
- 调用方无需等待播放完成
- 音频队列自动排队播放
初始化流程
__init__
↓
调用 wait_for_audio_ready() 等待设备
↓
创建 PyAudio 实例,获取默认设备信息
↓
初始化锁和状态变量
↓
配置 PCM 参数 (采样率、声道、位深)
↓
打开音频输出流
↓
启动后台播放线程
PCM 参数配置
| 参数 | 默认值 | 说明 |
|---|---|---|
sample_rate | 21000 | 采样率,与 Piper-TTS 模型匹配 |
channels | 1 | 声道数(单声道) |
sample_width | 2 | 采样位数(16-bit = 2 字节) |
frames_per_buffer | 1024 | 缓冲区帧数 |
这几个参数主要是待播放音频数据的参数,播放器需要正确设置这些参数,如果设置不对,则播放的声音会失真,甚至会出现纯杂音。
关键方法
is_speaking() -> bool
用途:判断是否有音频正在播放或待播放。原理是在每段音频放入播放队列的时候,都计算该音频的预估播放时间,然后累加保存起来播放完成时间,调用本方法的时候判断当前时间和播放完成时间的先后。
判断逻辑:
检查队列是否有待播放音频
↓
有? ─── 是 ───→ 返回 True
↓ 否
检查当前时间是否小于 playback_deadline
↓
是? ───→ 返回 True(硬件缓冲区仍有数据)
↓ 否
返回 False
set_audioid(text)
用途:切换当前音频源标识,并创建对应队列
执行步骤:
- 加锁更新
audioid - 在
audio_queues_map中创建新的 Queue
play(audio_data: bytes)
用途:将音频数据加入播放队列
内部调用:try_put(self.get_audioid(), audio_data)
try_put(audioid, audio_data)
用途:向指定队列添加音频数据,队列满时丢弃最旧数据
流程:
检查 audioid 是否在映射中
↓ 否
创建新队列并加入映射
↓
尝试放入数据(超时 1 秒)
↓
队列满? ─── 是 ───→ 弹出最旧数据,放入新数据
keep_playing_audio() [后台线程]
用途:持续从队列读取音频数据并播放
主循环流程:
while stop_event 未设置:
↓
确保音频流已启动
↓
获取当前 audioid 对应的队列
↓
队列不存在或为空? ─── 是 ───→ sleep 0.01s,继续循环
↓ 否
取出音频数据
↓
调用 _mark_audio_playing() 更新截止时间
↓
写入音频流播放
_mark_audio_playing(audio_data)
用途:根据音频数据大小计算播放持续时间,更新截止时间
计算公式:
chunk_duration = len(audio_data) / (sample_rate * channels * sample_width)
playback_deadline = max(当前时间, 上一个截止时间) + chunk_duration + 输出延迟
关键点:使用 max() 确保多个音频块连续累加,而非重叠
open_stream()
用途:打开 PyAudio 输出流
重试机制:
- 最多重试 3 次
- 仅对 Invalid sample rate 错误 (-9997) 重试
- 每次重试间隔 3 秒
close()
用途:释放所有资源
清理顺序:
设置 stop_event 停止播放线程
↓
等待播放线程结束(最多 2 秒)
↓
重置 playback_deadline
↓
停止并关闭音频流
↓
终止 PyAudio 实例
第六层:完整处理流程
模块文件
tk_audio_process.py- 核心编排节点
AudioProcess 是一个 ROS2 节点,实现完整的语音对话流程:ASR 文本 → LLM 推理 → TTS 合成 → 音频播放。整体采用多线程流水线架构,各阶段通过队列解耦,支持语音打断和并行 TTS 合成。
整体流程
ASR 文本 (ROS2 topic: asr_sentence)
│
▼
on_asr_sentence ← 打断判断、生成 request_id、入队
│
▼
asr_sentence_queue
│
▼
[NLP 线程] ← LLM 流式推理,按句子拆分,写入 answer_text_queue
│
▼
answer_text_queue
│
▼
[TTS 线程 × N] ← 并行 TTS 合成,写入 audio_segment_queue
│
▼
audio_segment_queue
│
▼
[PLAYBACK 线程] ← 按 sequence_index 有序送入 AudioPlayer
│
▼
AudioPlayer ← 硬件播放
线程模型
| 线程名 | 数量 | 入口方法 |
|---|---|---|
| NLP | 1 | keep_question_to_answer_by_ollama |
| TTS(n个线程) | 由 TTS_WORKERS 环境变量控制(默认 5) | keep_answer_to_audio |
| PLAYBACK | 1 | keep_audio_segments_in_order |
所有线程均为 daemon 线程,通过 stop_event(threading.Event)协调退出。
Request ID 机制
每次新问题到来时生成一个 UUID 作为 request_id,同时写入 AudioPlayer.audioid。流水线各阶段在处理前后均调用 _is_current_request(request_id) 比对 ID,不匹配则丢弃该任务。这样无需显式取消机制,即可安全处理打断和重复问题。
关键方法
__init__
初始化有严格顺序要求:先初始化 PiperProvider(TTS 引擎)以获取音频参数(采样率、声道、位深),再用这些参数初始化 AudioPlayer,最后初始化 LLMClient 并启动三类工作线程。
on_asr_sentence(msg)
ASR 消息的 ROS2 回调入口,分三种情况处理:
- 正在播放 & 无打断词:直接忽略,防止干扰当前对话。
- 正在播放 & 含打断词("天工"/"天空"/"天宫"):生成新 request_id,清空所有队列,停止当前播放,但不启动新一轮问答。
- 空闲状态(或新问题):生成新 request_id,清空队列,将
(request_id, text)写入asr_sentence_queue。
两种会改变 request_id 的场景都会调用 llm_client.set_interrupted(True),提前终止正在运行的流式推理。
keep_question_to_answer_by_ollama(NLP 线程)
从 asr_sentence_queue 取问题,调用 llm_client.stream_sentence 流式获取 LLM 回答。每个文本片段以 (request_id, sequence_index, chunk) 格式写入 answer_text_queue,sequence_index 从 0 递增,用于后续排序。每次写入前检查 _is_current_request,若请求已过期则立即中止本轮推理。
keep_answer_to_audio(TTS 线程)
从 answer_text_queue 取文本片段,调用 tts_service.tts() 合成 PCM 数据。TTS 执行前后各检查一次 _is_current_request,过期则丢弃,避免无效计算和播放。合成结果以 (request_id, sequence_index, text, pcm_bytes) 写入 audio_segment_queue。多线程并行处理可显著降低端到端延迟。
keep_audio_segments_in_order(PLAYBACK 线程)
由于 TTS 并发执行,音频片段可能乱序到达。本线程用 pending_segments 字典缓存已合成但未按序到达的片段,next_sequence_by_request 记录每个请求下一个应播放的序号,只有满足顺序的片段才会调用 audio_player.try_put 送入播放。当 audioid 发生变更(新请求或打断)时,立即清空缓存。
_is_current_request(request_id)
将传入 ID 与 AudioPlayer.get_audioid() 比对,用于流水线各阶段快速判断任务是否仍有效。
_drain_queue(queue_obj)
非阻塞地清空指定队列,在打断场景中快速丢弃积压任务。
close
有序关闭:置位 stop_event → 中断 LLM 推理 → 依次 join NLP、TTS、PLAYBACK 线程(各设超时) → 关闭 AudioPlayer。对"线程 join 自身"的情况做了保护,防止死锁。
环境变量
| 变量 | 默认值 | 说明 |
|---|---|---|
TTS_WORKERS | 5 | TTS 并发线程数 |
依赖模块
| 模块 | 职责 |
|---|---|
PiperProvider | TTS 引擎封装(Piper-TTS,GPL-3.0) |
LLMClient | LLM 推理客户端,支持流式输出和中断 |
AudioPlayer | 音频播放器,维护播放队列、audioid 和 speaking 状态 |