跳到主要内容
版本:V2.0.5.x

3.4.模块详解


第一层:音频采集模块

模块文件

  • socket_connector.py - Socket 连接管理
  • socket_audio_provider.py - 音频接收与解析
  • tk_audio_publisher.py - 音频发布到 ROS

数据流

RK3588s 麦克风阵列

Socket 服务器 (IP: 10.42.0.127, Port: 9080)

SocketConnector 建立连接

SocketAudioProvider 接收原始数据

SocketAudioPublisher 发布到 ROS 话题

关键概念:VAD(语音活动检测)

当接收音频数据时,需要识别什么时候是说话,什么时候是静音

VAD 状态码:
0 = 静音
1 = 开始说话(新的语句开始)
2 = 持续说话
3 = 结束说话(语句结束)

实现逻辑:

def keep_receiving_publish_audio():
while True:
audio_data, vad = socket_provider.read()

if vad == 1: # 开始说话
audio_buffer.clear() # 清空旧数据
audio_buffer.extend(audio_data)

elif vad == 2: # 持续说话
audio_buffer.extend(audio_data) # 继续追加

elif vad == 3: # 结束说话
audio_buffer.extend(audio_data)
# 整句音频已完成,发布到话题
publish_sentence(audio_buffer)
audio_buffer.clear()

代码详解:SocketConnector

class SocketConnector:
def __init__(self, ip: str, port: int):
# 创建 TCP Socket
self.client_socket = socket(AF_INET, SOCK_STREAM)

# 连接到服务器
self.client_socket.connect((ip, port))

# 设置接收超时 3 秒
self.client_socket.settimeout(3.0)

def receive_full_data(self, expected_length):
"""
接收完整数据块

由于 TCP 是流协议,可能分多次接收数据:
比如要接收 1000 字节,可能第一次收到 500,第二次收到 500
"""
received_data = bytearray()

while len(received_data) < expected_length:
# 一次最多接收 4096 字节
chunk = self.client_socket.recv(4096)

if not chunk: # 服务器关闭连接
return None

received_data.extend(chunk)

return bytes(received_data)

代码详解:SocketAudioProvider

这是从RK3588s接收音频数据的详细代码,为何这样接收?天工行者搭载的是讯飞的RK3588 AIUI多模态开发套件,其音频传输协议可参考, 其中明确说明了,0~8个字节是请求头,对应下面的 header = self.receive_full_data(9),第9个字节是vad字段,也就是下面代码中的vad = body[0],第17到n个字节就是详细的音频数据,对应下面代码中audio_data = body[8:-1]

class SocketAudioProvider(SocketConnector):
def read(self):
"""解析音频包协议"""

# 1. 接收 9 字节头部
header = self.receive_full_data(9)
# 格式: [同步字, 用户ID, 消息类型, 消息长度, 消息ID]
# 二进制格式: 1字节 + 1字节 + 1字节 + 2字节 + 4字节

sync_head, user_id, msg_type, msg_length, msg_id = struct.unpack(
'<BBBIH', # < 表示小端序,B=字节,I=整数,H=短整数
header
)

# 2. 校验头部
if sync_head != 0xa5 or user_id != 0x01:
return None # 无效包

# 3. 接收消息体
body = self.receive_full_data(msg_length + 1)
# 格式: [VAD, 通道号, ...] + CRC校验字节

vad = body[0] # VAD 状态
audio_data = body[8:-1] # 提取音频数据

return audio_data, vad

代码详解:SocketAudioPublisher

class SocketAudioPublisher(Node):
def __init__(self):
super().__init__('tk_audio_publisher')

# 创建两个发布者
self.publisher_ = self.create_publisher(
AudioFrame, # 消息类型
'audio_frames', # 话题名
10 # 队列大小
)

self.sentence_publisher_ = self.create_publisher(
AudioFrame,
'audio_sentence_frames',
10
)

# 创建 Socket 提供者
self.audio_provider = SocketAudioProvider('10.42.0.127', 9080)

# 启动后台接收线程
self.receive_pub_thread = threading.Thread(
target=self.keep_receiving_publish_audio
)
self.receive_pub_thread.start()

def keep_receiving_publish_audio(self):
"""后台线程:持续接收并发布音频"""
audio_buffer = bytearray()

while not self.stop_event.is_set():
audio_res = self.audio_provider.read()

if audio_res is None:
continue

audio_data, vad = audio_res

if vad == 1: # 开始说话
audio_buffer.clear()
audio_buffer.extend(audio_data)

elif vad == 2: # 持续说话
audio_buffer.extend(audio_data)

elif vad == 3: # 结束说话
audio_buffer.extend(audio_data)

# 发布整句音频
msg = AudioFrame()
msg.data = bytes(audio_buffer)
self.sentence_publisher_.publish(msg)

self.get_logger().info(
f"发布了一句完整音频,长度: {len(audio_buffer)}"
)

audio_buffer.clear()

第二层:语音识别模块(ASR)

模块文件

  • funasr_client.py - Funasr WebSocket 客户端
  • tk_asr_text_publisher.py - ASR 文本发布

数据流

audio_sentence_frames 话题

tk_asr_text_publisher 订阅

funasr_client WebSocket 发送音频

x86 Funasr 服务识别

WebSocket 返回识别结果

asr_sentence 话题发布

核心概念:WebSocket 流式识别

// 1. 连接到 WebSocket 服务器
ws = websocket.connect("wss://192.168.41.1:10097")

// 2. 发送握手消息(告诉服务器识别参数)
ws.send({
"mode": "offline", // 离线模式
"sample_rate": 16000, // 采样率
"chunk_size": [5,10,5] // 每次发送的音频块大小
})

// 3. 分块发送音频(流式)
for chunk in audio_chunks:
ws.send_binary(chunk)

// 4. 接收识别结果(也是流式)
for result in ws.recv():
text = result["text"]
print(f"识别结果: {text}")

代码详解:FunASRClient

class FunASRClient:
def __init__(self, host="192.168.41.1", port=10097):
self.host = host
self.port = port
self.ssl_enabled = True # 使用 WSS 加密连接

# 识别参数配置
self.sample_rate = 16000
self.chunk_size = [5, 10, 5] # 音频块大小(毫秒)
self.chunk_interval = 10 # 块间隔

# WebSocket 连接(通过全局异步循环)
self.loop = GlobalAsyncLoop.get_loop()
self.connect()

async def _connect(self):
"""建立 WebSocket 连接"""
# 构造 WebSocket URI
uri = f"wss://{self.host}:{self.port}"

# SSL 上下文(关闭证书验证)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# 连接
return await websockets.connect(
uri,
subprotocols=["binary"],
ping_interval=None, # 禁用心跳
ssl=ssl_context
)

def to_text(self, audio_bytes: bytes) -> str:
"""
同步接口:输入音频字节,输出识别文本
"""
# 在异步循环中执行异步函数
future = asyncio.run_coroutine_threadsafe(
self._async_to_text(audio_bytes),
self.loop
)

return future.result(timeout=30)

async def _async_to_text(self, audio_bytes: bytes) -> str:
"""异步识别实现"""
ws = await self.try_init_connection()

# 1. 发送握手消息
start_signal = {
"mode": "offline",
"sample_rate": self.sample_rate,
"chunk_size": self.chunk_size,
"chunk_interval": self.chunk_interval,
}
await ws.send(json.dumps(start_signal))

# 2. 分块发送音频
chunk_bytes = (self.sample_rate / 1000) * 2 # 每毫秒字节数

for i in range(0, len(audio_bytes), int(chunk_bytes * 10)):
chunk = audio_bytes[i:i+int(chunk_bytes*10)]
await ws.send(chunk) # 发送二进制数据
await asyncio.sleep(0.01)

# 3. 发送结束信号
await ws.send(json.dumps({"mode": "offline"}))

# 4. 接收识别结果
result_text = ""
async for message in ws:
data = json.loads(message)
text = data.get("text", "")
if text:
result_text = text
if data.get("done"):
break

return result_text

代码详解:FunASRTextPublisher

class FunASRTextPublisher(Node):
def __init__(self):
super().__init__('tk_asr_text_publisher')

# 订阅音频话题
self.subscription = self.create_subscription(
AudioFrame,
'audio_sentence_frames',
self.audio_sentence_callback,
10
)

# 创建发布者
self.asr_sentence_publisher = self.create_publisher(
String,
'/asr_sentence',
10
)

# 音频处理队列
self.audio_queue = Queue(maxsize=1)

# Funasr 客户端
self.asr_service = FunASRClient(
host="192.168.41.1",
port=10097
)

# 后台处理线程
self.process_thread = threading.Thread(
target=self.keep_process_audio
)
self.process_thread.start()

def audio_sentence_callback(self, msg: AudioFrame):
"""
订阅回调:每当收到一句完整音频时调用
"""
# 将音频放入队列
self.audio_queue.put(msg)

def keep_process_audio(self):
"""后台线程:处理音频并识别"""
while True:
try:
# 从队列取出音频(最多等待 2 秒)
msg = self.audio_queue.get(timeout=2)

# 转换为字节
audio_bytes = bytes(msg.data)

self.get_logger().info(
f"接收到 {len(audio_bytes)} 字节音频,开始识别..."
)

# 调用 Funasr 识别
asr_sentence = self.asr_service.to_text(audio_bytes)

# 过滤过短的识别结果
if len(asr_sentence) <= 3:
self.get_logger().info(f"忽略过短结果: {asr_sentence}")
continue

# 发布识别结果
text_msg = String()
text_msg.data = asr_sentence
self.asr_sentence_publisher.publish(text_msg)

self.get_logger().info(f"发布识别结果: {asr_sentence}")

except Empty:
continue # 超时继续等待
except Exception as e:
self.get_logger().error(f"识别出错: {e}")

第三层:大模型自然语言理解模块(LLMClient)

LLMClient 是一个与大语言模型(LLM)服务交互的客户端类,专门设计用于与 Ollama 服务进行通信。采用多进程架构实现流式输出,可自动判断LLM服务是运行在orin1还是orin2,以及智能句子切分。


核心架构

多进程设计

模块使用 multiprocessingspawn 方式创建子进程,原因如下:

  • 避免阻塞:HTTP 流式请求在子进程执行,主进程保持响应
  • 可中断性:用户可以随时中断当前请求,启动新对话
  • 跨平台兼容spawn 模式在 Windows 和 Linux 上行为一致
┌─────────────────────────────────────────────────────────┐
│ 主进程 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 句子拼接 │ ← │ Queue 读取 │ ← │ 结果处理 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
↑ Queue
┌─────────────────────────────────────────────────────────┐
│ 子进程 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ OpenAI SDK │ → │ 流式迭代 │ → │ Queue 写入 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘

类属性详解

配置属性

属性默认值说明
max_history1历史对话轮数(每轮=用户+助手=2条消息)
historydeque(maxlen=2)使用双端队列自动限制历史长度
sentence_endings"。!?.!?"强制断句标点
soft_endings",、;;,"软分割标点(需配合 max_len)
max_len25软分割触发阈值(字符数)

服务发现属性

属性说明
active_llm_ip自动检测到的 Ollama 服务 IP
llm_endpoint完整的 API 端点 URL
api_keyAPI 密钥(Ollama 默认为 "ollama")
llm_model使用的模型名称,默认 qwen2.5:1.5b

执行流程

1. 初始化流程 (__init__)

开始

设置历史队列 (deque)

调用 test_llm_ip() 探测服务

配置 API 端点、密钥、模型

设置系统消息

完成

2. 服务发现流程 (test_llm_ip)

该方法自动检测 Ollama 服务运行位置:

遍历候选 IP: [192.168.41.3, 192.168.41.2]

对每个 IP 发起 GET /api/version 请求

成功? ─── 是 ───→ 返回该 IP
↓ 否
继续下一个 IP

全部失败? ───→ 返回 None,后续请求将被拒绝

设计要点

  • 使用 urllib.request 而非 httpx,减少依赖
  • 记录详细日志便于排查网络问题

3. 流式请求流程 (stream_sentence)

这是核心方法,完整流程如下:

启动阶段
  1. 中断上一个子进程:调用 set_interrupted(True) 终止上一个子进程
  2. 构建消息负载:组合系统消息、历史消息、当前用户输入
  3. 创建子进程:启动 _stream_worker 函数
流式获取阶段
循环读取 Queue

收到 None? ─── 是 ───→ 子进程结束,退出循环
↓ 否
追加到 buffer 和 assistant_response

尝试断句处理

Queue 超时且子进程已死? ─── 是 ───→ 退出循环
↓ 否
继续读取
句子切分逻辑

整句断句:遇到 。!?.!? 立即切分

buffer = "天空是蓝色的。这是因为"
↓ 找到句号
yield "天空是蓝色的。"
buffer = "这是因为"

强行断句:buffer 超过 25 字符时,查找软分割点

buffer = "这种现象一般被称为瑞利散射,当阳光"
|←── 如果超过 25 字符 ──→|
↓ 找到逗号
yield "这种现象一般被称为瑞利散射,"
buffer = "当阳光"

子进程函数 (_stream_worker)

职责

  • 创建 OpenAI 客户端连接 Ollama 兼容 API
  • 发起流式请求
  • 将每个 chunk 的文本内容放入跨进程队列以便主进程获取生成的文本
  • 请求结束后放入 None 作为结束信号

参数说明

  • base_url: API 基础 URL
  • model: 模型名称
  • messages_payload: 完整消息列表
  • queue: 跨进程通信队列
  • api_key: API 密钥

set_interrupted

用途:在需要的时候可强制终止正在运行的子进程,丢弃正在生成的文本

执行步骤

  1. 检查 process 是否存活
  2. 调用 terminate() 发送 SIGTERM
  3. 调用 join(timeout=1) 等待退出
  4. 关闭并清理 Queue
  5. 重置属性为 None

环境变量配置

变量名默认值说明
LLM_KEY"ollama"API 密钥,OpenAI SDK要求这个参数,由于调用的是本地的LLM,所以可以忽略
LLM_MODEL"qwen2.5:1.5b"使用的模型
SYS_MESSAGE见代码系统提示词

第四层:文本转语音模块(TTS)

PiperProvider 是一个基于 Piper-TTS 的文本转语音(TTS)服务提供者。该模块使用 ONNX Runtime 进行模型推理,支持 CUDA 加速,并提供可选的 PyAudio 实时播放功能。

许可证声明:本文件集成 Piper-TTS,采用 GPL-3.0-or-later 许可证。


核心架构

┌─────────────────────────────────────────────────────────┐
│ PiperProvider │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PiperVoice │ → │ ONNX Runtime│ → │ CUDA 加速 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↓ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ SynthesisConfig │ │ PyAudio 播放 │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘

数据流

输入文本

PiperVoice.synthesize()

生成音频块 (chunks)

提取 16-bit PCM 字节流

输出/播放

初始化流程

__init__ 执行顺序

开始

读取环境变量 MODEL_DIR

构建模型路径和配置路径

配置 SynthesisConfig(语音参数)

调用 _load_model() 加载模型

记录加载耗时

init_player=True?
├─ 是 → 初始化 PyAudio 播放器
└─ 否 → 跳过

完成

路径配置

默认模型路径结构:

{MODEL_DIR}/
└── piper_voices/
└── zh/
├── zh_CN-huayan-medium.onnx ## 模型文件
└── zh_CN-huayan-medium.onnx.json ## 配置文件

环境变量

变量名默认值说明
MODEL_DIR/home/nvidia/模型根目录
TORCHDYNAMO_DISABLE"1"禁用 Torch 动态编译
DISABLE_TORCH_COMPILE"1"禁用 Torch 编译优化

SynthesisConfig 参数详解

语音合成配置控制输出语音的特质:

参数当前值范围建议说明
length_scale1.10.8-1.5语速控制。小于1,加快,大于1,减慢
noise_scale0.40.3-1.0随机噪声。低值更清晰,高值更丰富
noise_w_scale0.50.3-1.2音素时长波动。影响自然度
normalize_audioTrue-音量归一化,防止爆音
volume1.00.5-2.0整体音量

调参建议

目标调整方向
更自然noise_w_scale (0.8-1.2)
更清晰noise_scale (0.3-0.6)
更快语速length_scale (0.8-1.0)
音量过小volume 或启用 normalize_audio

关键方法

_load_model

职责:加载 ONNX 模型到内存

执行步骤

  1. 调用 PiperVoice.load() 加载模型文件
  2. 读取 JSON 配置文件
  3. 启用 CUDA 加速(use_cuda=True
  4. 记录模型采样率

get_audio_param

用途:使用固定测试文本合成一小段音频,从第一个 chunk 提取生成音频参数(采样率、声道数、采样位数),用来实例化 AudioPlayer 对象。

测试文本: "你好,我是天工形者,很高兴认识你。"

调用 synthesize()

从第一个 chunk 获取:
- sample_rate
- sample_channels
- sample_width

返回 (sample_rate, channels, sample_width)

降级策略:合成失败时返回默认值 (21000, 1, 2)

tts(text) -> bytes

用途:将文本转换为 PCM 音频字节流

流程

输入文本

piper_instance.synthesize(text, syn_config)

遍历 chunks,提取 audio_int16_bytes

合并所有字节流

返回完整音频数据 (16-bit PCM)

返回值:可直接播放的 16-bit PCM 字节流,失败时返回空字节 b''

play(waveform: bytes)

用途:通过 PyAudio 播放音频

前置条件:初始化时 init_player=True

执行:调用已配置的 audio_stream.write() 写入音频数据


第五层:音频播放模块 AudioPlayer 类

实现了异步队列式音频播放,支持多音频源管理、播放状态追踪和线程安全的并发访问。


函数详解

wait_for_audio_ready(max_wait=5)

用途:阻塞等待音频输出设备就绪(因为orin1上直接连接麦克风通常为会报错,所以多次尝试连接,可成功连接上)


AudioPlayer 类

设计理念

采用生产者-消费者模式实现异步播放:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ 调用方 │ ──→ │ Queue 队列 │ ──→ │ 播放线程 │
│ play(data) │ │ (audio_data) │ │ keep_playing │
└──────────────┘ └──────────────┘ └──────────────┘

┌──────────────┐
│ PyAudio 流 │
└──────────────┘

特点

  • 调用方无需等待播放完成
  • 音频队列自动排队播放

初始化流程

__init__

调用 wait_for_audio_ready() 等待设备

创建 PyAudio 实例,获取默认设备信息

初始化锁和状态变量

配置 PCM 参数 (采样率、声道、位深)

打开音频输出流

启动后台播放线程

PCM 参数配置

参数默认值说明
sample_rate21000采样率,与 Piper-TTS 模型匹配
channels1声道数(单声道)
sample_width2采样位数(16-bit = 2 字节)
frames_per_buffer1024缓冲区帧数

这几个参数主要是待播放音频数据的参数,播放器需要正确设置这些参数,如果设置不对,则播放的声音会失真,甚至会出现纯杂音。


关键方法

is_speaking() -> bool

用途:判断是否有音频正在播放或待播放。原理是在每段音频放入播放队列的时候,都计算该音频的预估播放时间,然后累加保存起来播放完成时间,调用本方法的时候判断当前时间和播放完成时间的先后。

判断逻辑

检查队列是否有待播放音频

有? ─── 是 ───→ 返回 True
↓ 否
检查当前时间是否小于 playback_deadline

是? ───→ 返回 True(硬件缓冲区仍有数据)
↓ 否
返回 False
set_audioid(text)

用途:切换当前音频源标识,并创建对应队列

执行步骤

  1. 加锁更新 audioid
  2. audio_queues_map 中创建新的 Queue
play(audio_data: bytes)

用途:将音频数据加入播放队列

内部调用try_put(self.get_audioid(), audio_data)

try_put(audioid, audio_data)

用途:向指定队列添加音频数据,队列满时丢弃最旧数据

流程

检查 audioid 是否在映射中
↓ 否
创建新队列并加入映射

尝试放入数据(超时 1 秒)

队列满? ─── 是 ───→ 弹出最旧数据,放入新数据
keep_playing_audio() [后台线程]

用途:持续从队列读取音频数据并播放

主循环流程

while stop_event 未设置:

确保音频流已启动

获取当前 audioid 对应的队列

队列不存在或为空? ─── 是 ───→ sleep 0.01s,继续循环
↓ 否
取出音频数据

调用 _mark_audio_playing() 更新截止时间

写入音频流播放
_mark_audio_playing(audio_data)

用途:根据音频数据大小计算播放持续时间,更新截止时间

计算公式

chunk_duration = len(audio_data) / (sample_rate * channels * sample_width)
playback_deadline = max(当前时间, 上一个截止时间) + chunk_duration + 输出延迟

关键点:使用 max() 确保多个音频块连续累加,而非重叠

open_stream()

用途:打开 PyAudio 输出流

重试机制

  • 最多重试 3 次
  • 仅对 Invalid sample rate 错误 (-9997) 重试
  • 每次重试间隔 3 秒
close()

用途:释放所有资源

清理顺序

设置 stop_event 停止播放线程

等待播放线程结束(最多 2 秒)

重置 playback_deadline

停止并关闭音频流

终止 PyAudio 实例

第六层:完整处理流程

模块文件

  • tk_audio_process.py - 核心编排节点

AudioProcess 是一个 ROS2 节点,实现完整的语音对话流程:ASR 文本 → LLM 推理 → TTS 合成 → 音频播放。整体采用多线程流水线架构,各阶段通过队列解耦,支持语音打断和并行 TTS 合成。


整体流程

ASR 文本 (ROS2 topic: asr_sentence)


on_asr_sentence ← 打断判断、生成 request_id、入队


asr_sentence_queue


[NLP 线程] ← LLM 流式推理,按句子拆分,写入 answer_text_queue


answer_text_queue


[TTS 线程 × N] ← 并行 TTS 合成,写入 audio_segment_queue


audio_segment_queue


[PLAYBACK 线程] ← 按 sequence_index 有序送入 AudioPlayer


AudioPlayer ← 硬件播放

线程模型

线程名数量入口方法
NLP1keep_question_to_answer_by_ollama
TTS(n个线程)TTS_WORKERS 环境变量控制(默认 5)keep_answer_to_audio
PLAYBACK1keep_audio_segments_in_order

所有线程均为 daemon 线程,通过 stop_eventthreading.Event)协调退出。


Request ID 机制

每次新问题到来时生成一个 UUID 作为 request_id,同时写入 AudioPlayer.audioid。流水线各阶段在处理前后均调用 _is_current_request(request_id) 比对 ID,不匹配则丢弃该任务。这样无需显式取消机制,即可安全处理打断和重复问题。


关键方法

__init__

初始化有严格顺序要求:先初始化 PiperProvider(TTS 引擎)以获取音频参数(采样率、声道、位深),再用这些参数初始化 AudioPlayer,最后初始化 LLMClient 并启动三类工作线程。

on_asr_sentence(msg)

ASR 消息的 ROS2 回调入口,分三种情况处理:

  • 正在播放 & 无打断词:直接忽略,防止干扰当前对话。
  • 正在播放 & 含打断词("天工"/"天空"/"天宫"):生成新 request_id,清空所有队列,停止当前播放,但不启动新一轮问答。
  • 空闲状态(或新问题):生成新 request_id,清空队列,将 (request_id, text) 写入 asr_sentence_queue

两种会改变 request_id 的场景都会调用 llm_client.set_interrupted(True),提前终止正在运行的流式推理。

keep_question_to_answer_by_ollama(NLP 线程)

asr_sentence_queue 取问题,调用 llm_client.stream_sentence 流式获取 LLM 回答。每个文本片段以 (request_id, sequence_index, chunk) 格式写入 answer_text_queuesequence_index 从 0 递增,用于后续排序。每次写入前检查 _is_current_request,若请求已过期则立即中止本轮推理。

keep_answer_to_audio(TTS 线程)

answer_text_queue 取文本片段,调用 tts_service.tts() 合成 PCM 数据。TTS 执行前后各检查一次 _is_current_request,过期则丢弃,避免无效计算和播放。合成结果以 (request_id, sequence_index, text, pcm_bytes) 写入 audio_segment_queue。多线程并行处理可显著降低端到端延迟。

keep_audio_segments_in_order(PLAYBACK 线程)

由于 TTS 并发执行,音频片段可能乱序到达。本线程用 pending_segments 字典缓存已合成但未按序到达的片段,next_sequence_by_request 记录每个请求下一个应播放的序号,只有满足顺序的片段才会调用 audio_player.try_put 送入播放。当 audioid 发生变更(新请求或打断)时,立即清空缓存。

_is_current_request(request_id)

将传入 ID 与 AudioPlayer.get_audioid() 比对,用于流水线各阶段快速判断任务是否仍有效。

_drain_queue(queue_obj)

非阻塞地清空指定队列,在打断场景中快速丢弃积压任务。

close

有序关闭:置位 stop_event → 中断 LLM 推理 → 依次 join NLP、TTS、PLAYBACK 线程(各设超时) → 关闭 AudioPlayer。对"线程 join 自身"的情况做了保护,防止死锁。


环境变量

变量默认值说明
TTS_WORKERS5TTS 并发线程数

依赖模块

模块职责
PiperProviderTTS 引擎封装(Piper-TTS,GPL-3.0)
LLMClientLLM 推理客户端,支持流式输出和中断
AudioPlayer音频播放器,维护播放队列、audioid 和 speaking 状态