Skip to main content
Version: V2.0.5.1

3.4.Module Detailed Explanation


First Layer: Audio Acquisition Module

Module Files

  • socket_connector.py - Socket connection management
  • socket_audio_provider.py - Audio reception and parsing
  • tk_audio_publisher.py - Publish audio to ROS

Data Flow

RK3588s Microphone Array

Socket Server (IP: 10.42.0.127, Port: 9080)

SocketConnector Establish Connection

SocketAudioProvider Receive Raw Data

SocketAudioPublisher Publish to ROS Topic

Key Concept: VAD (Voice Activity Detection)

When receiving audio data, need to identify when speaking vs. silence:

VAD Status Code:
0 = Silence
1 = Start Speaking (New sentence begins)
2 = Continuous Speaking
3 = End Speaking (Sentence ends)

Implementation Logic:

def keep_receiving_publish_audio():
while True:
audio_data, vad = socket_provider.read()

if vad == 1: # Start speaking
audio_buffer.clear() # Clear old data
audio_buffer.extend(audio_data)

elif vad == 2: # Continuous speaking
audio_buffer.extend(audio_data) # Continue appending

elif vad == 3: # End speaking
audio_buffer.extend(audio_data)
# Complete sentence audio ready, publish to topic
publish_sentence(audio_buffer)
audio_buffer.clear()

Code Explanation: SocketConnector

class SocketConnector:
def __init__(self, ip: str, port: int):
# Create TCP Socket
self.client_socket = socket(AF_INET, SOCK_STREAM)

# Connect to server
self.client_socket.connect((ip, port))

# Set receive timeout 3 seconds
self.client_socket.settimeout(3.0)

def receive_full_data(self, expected_length):
"""
Receive complete data block

Since TCP is a stream protocol, data may be received in multiple chunks:
For example, to receive 1000 bytes, may get 500 first, then 500
"""
received_data = bytearray()

while len(received_data) < expected_length:
# Receive up to 4096 bytes at a time
chunk = self.client_socket.recv(4096)

if not chunk: # Server closed connection
return None

received_data.extend(chunk)

return bytes(received_data)

Code Explanation: SocketAudioProvider

This is the detailed code for receiving audio data from RK3588s. Why this approach? TienKung is equipped with iFLYTEK's RK3588 AIUI multimodal development kit. Its audio transmission protocol for you reference.

It clearly specifies that bytes 0-8 are the request header, corresponding to header = self.receive_full_data(9) below. Byte 9 is the VAD field, corresponding to vad = body[0] in the code below. Bytes 17 to n are the detailed audio data, corresponding to audio_data = body[8:-1] below.

class SocketAudioProvider(SocketConnector):
def read(self):
"""Parse audio packet protocol"""

# 1. Receive 9-byte header
header = self.receive_full_data(9)
# Format: [Sync, UserID, MessageType, MessageLength, MessageID]
# Binary format: 1 byte + 1 byte + 1 byte + 2 bytes + 4 bytes

sync_head, user_id, msg_type, msg_length, msg_id = struct.unpack(
'<BBBIH', # < = little-endian, B=byte, I=int, H=short
header
)

# 2. Validate header
if sync_head != 0xa5 or user_id != 0x01:
return None # Invalid packet

# 3. Receive message body
body = self.receive_full_data(msg_length + 1)
# Format: [VAD, Channel, ...] + CRC checksum byte

vad = body[0] # VAD status
audio_data = body[8:-1] # Extract audio data

return audio_data, vad

Code Explanation: SocketAudioPublisher

class SocketAudioPublisher(Node):
def __init__(self):
super().__init__('tk_audio_publisher')

# Create two publishers
self.publisher_ = self.create_publisher(
AudioFrame, # Message type
'audio_frames', # Topic name
10 # Queue size
)

self.sentence_publisher_ = self.create_publisher(
AudioFrame,
'audio_sentence_frames',
10
)

# Create Socket provider
self.audio_provider = SocketAudioProvider('10.42.0.127', 9080)

# Start background receiving thread
self.receive_pub_thread = threading.Thread(
target=self.keep_receiving_publish_audio
)
self.receive_pub_thread.start()

def keep_receiving_publish_audio(self):
"""Background thread: continuously receive and publish audio"""
audio_buffer = bytearray()

while not self.stop_event.is_set():
audio_res = self.audio_provider.read()

if audio_res is None:
continue

audio_data, vad = audio_res

if vad == 1: # Start speaking
audio_buffer.clear()
audio_buffer.extend(audio_data)

elif vad == 2: # Continuous speaking
audio_buffer.extend(audio_data)

elif vad == 3: # End speaking
audio_buffer.extend(audio_data)

# Publish complete sentence audio
msg = AudioFrame()
msg.data = bytes(audio_buffer)
self.sentence_publisher_.publish(msg)

self.get_logger().info(
f"Published complete audio, length: {len(audio_buffer)}"
)

audio_buffer.clear()

Second Layer: Speech Recognition Module (ASR)

Module Files

  • funasr_client.py - Funasr WebSocket client
  • tk_asr_text_publisher.py - ASR text publishing

Data Flow

audio_sentence_frames Topic

tk_asr_text_publisher Subscribe

funasr_client WebSocket Send Audio

x86 Funasr Service Recognition

WebSocket Return Recognition Result

asr_sentence Topic Publish

Core Concept: WebSocket Streaming Recognition

// 1. Connect to WebSocket server
ws = websocket.connect("wss://192.168.41.1:10097")

// 2. Send handshake message (tell server recognition parameters)
ws.send({
"mode": "offline", // Offline mode
"sample_rate": 16000, // Sample rate
"chunk_size": [5,10,5] // Audio chunk size for each send
})

// 3. Send audio in chunks (streaming)
for chunk in audio_chunks:
ws.send_binary(chunk)

// 4. Receive recognition results (also streaming)
for result in ws.recv():
text = result["text"]
print(f"Recognition result: {text}")

Code Explanation: FunASRClient

class FunASRClient:
def __init__(self, host="192.168.41.1", port=10097):
self.host = host
self.port = port
self.ssl_enabled = True # Use WSS encrypted connection

# Recognition parameter configuration
self.sample_rate = 16000
self.chunk_size = [5, 10, 5] # Audio chunk size (milliseconds)
self.chunk_interval = 10 # Chunk interval

# WebSocket connection (via global async loop)
self.loop = GlobalAsyncLoop.get_loop()
self.connect()

async def _connect(self):
"""Establish WebSocket connection"""
# Construct WebSocket URI
uri = f"wss://{self.host}:{self.port}"

# SSL context (disable certificate verification)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# Connect
return await websockets.connect(
uri,
subprotocols=["binary"],
ping_interval=None, # Disable heartbeat
ssl=ssl_context
)

def to_text(self, audio_bytes: bytes) -> str:
"""
Synchronous interface: input audio bytes, output recognition text
"""
# Execute async function within async loop
future = asyncio.run_coroutine_threadsafe(
self._async_to_text(audio_bytes),
self.loop
)

return future.result(timeout=30)

async def _async_to_text(self, audio_bytes: bytes) -> str:
"""Asynchronous recognition implementation"""
ws = await self.try_init_connection()

# 1. Send handshake message
start_signal = {
"mode": "offline",
"sample_rate": self.sample_rate,
"chunk_size": self.chunk_size,
"chunk_interval": self.chunk_interval,
}
await ws.send(json.dumps(start_signal))

# 2. Send audio in chunks
chunk_bytes = (self.sample_rate / 1000) * 2 # Bytes per millisecond

for i in range(0, len(audio_bytes), int(chunk_bytes * 10)):
chunk = audio_bytes[i:i+int(chunk_bytes*10)]
await ws.send(chunk) # Send binary data
await asyncio.sleep(0.01)

# 3. Send end signal
await ws.send(json.dumps({"mode": "offline"}))

# 4. Receive recognition result
result_text = ""
async for message in ws:
data = json.loads(message)
text = data.get("text", "")
if text:
result_text = text
if data.get("done"):
break

return result_text

Code Explanation: FunASRTextPublisher

class FunASRTextPublisher(Node):
def __init__(self):
super().__init__('tk_asr_text_publisher')

# Subscribe to audio topic
self.subscription = self.create_subscription(
AudioFrame,
'audio_sentence_frames',
self.audio_sentence_callback,
10
)

# Create publisher
self.asr_sentence_publisher = self.create_publisher(
String,
'/asr_sentence',
10
)

# Audio processing queue
self.audio_queue = Queue(maxsize=1)

# Funasr client
self.asr_service = FunASRClient(
host="192.168.41.1",
port=10097
)

# Background processing thread
self.process_thread = threading.Thread(
target=self.keep_process_audio
)
self.process_thread.start()

def audio_sentence_callback(self, msg: AudioFrame):
"""
Subscribe callback: called when receiving complete sentence audio
"""
# Put audio into queue
self.audio_queue.put(msg)

def keep_process_audio(self):
"""Background thread: process audio and recognize"""
while True:
try:
# Get audio from queue (wait max 2 seconds)
msg = self.audio_queue.get(timeout=2)

# Convert to bytes
audio_bytes = bytes(msg.data)

self.get_logger().info(
f"Received {len(audio_bytes)} bytes audio, starting recognition..."
)

# Call Funasr for recognition
asr_sentence = self.asr_service.to_text(audio_bytes)

# Filter out too-short recognition results
if len(asr_sentence) <= 3:
self.get_logger().info(f"Ignore short result: {asr_sentence}")
continue

# Publish recognition result
text_msg = String()
text_msg.data = asr_sentence
self.asr_sentence_publisher.publish(text_msg)

self.get_logger().info(f"Published recognition result: {asr_sentence}")

except Empty:
continue # Timeout, keep waiting
except Exception as e:
self.get_logger().error(f"Recognition error: {e}")

Third Layer: Large Model Natural Language Understanding Module (LLMClient)

LLMClient is a client class that interacts with large language model (LLM) services, specifically designed to communicate with Ollama services. It employs a multi-process architecture for streaming output, automatically determines whether the LLM service runs on orin1 or orin2, and supports intelligent sentence splitting.


Core Architecture

Multi-Process Design

The module uses the spawn method of multiprocessing to create child processes for the following reasons:

  • Non-Blocking: HTTP streaming requests execute in child processes while the main process remains responsive
  • Interruptibility: Users can interrupt the current request at any time and start a new conversation
  • Cross-Platform Compatibility: The spawn mode behaves consistently on Windows and Linux
┌─────────────────────────────────────────────────────────┐
│ Main Process │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Sentence │ ← │ Queue Read │ ← │ Result │ │
│ │ Concatenate │ │ │ │ Processing │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
↑ Queue
┌─────────────────────────────────────────────────────────┐
│ Child Process │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ OpenAI SDK │ → │ Stream │ → │ Queue Write │ │
│ │ │ │ Iteration │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘

Class Attributes Explanation

Configuration Attributes

AttributeDefaultDescription
max_history1Number of historical conversation rounds (1 round = user + assistant = 2 messages)
historydeque(maxlen=2)Double-ended queue that automatically limits history length
sentence_endings"。!?.!?"Forced sentence-ending punctuation
soft_endings",、;;,"Soft-split punctuation (works with max_len)
max_len25Soft-split trigger threshold (character count)

Service Discovery Attributes

AttributeDescription
active_llm_ipAuto-detected Ollama service IP
llm_endpointComplete API endpoint URL
api_keyAPI key (Ollama default is "ollama")
llm_modelModel name in use, default qwen2.5:1.5b

Execution Flow

1. Initialization Flow (__init__)

Start

Set history queue (deque)

Call test_llm_ip() to probe service

Configure API endpoint, key, model

Set system message

Complete

2. Service Discovery Flow (test_llm_ip)

This method automatically detects the Ollama service location:

Iterate candidate IPs: [192.168.41.3, 192.168.41.2]

Make GET /api/version request for each IP

Successful? ─── Yes ───→ Return this IP
↓ No
Try next IP

All failed? ───→ Return None, subsequent requests will be rejected

Design Points:

  • Use urllib.request instead of httpx to reduce dependencies
  • Log detailed information for troubleshooting network issues

3. Streaming Request Flow (stream_sentence)

This is the core method with the complete flow below:

Startup Phase
  1. Interrupt Previous Child Process: Call set_interrupted(True) to terminate the previous child process
  2. Build Message Payload: Combine system message, historical messages, and current user input
  3. Create Child Process: Start _stream_worker function
Streaming Retrieval Phase
Loop reading Queue

Received None? ─── Yes ───→ Child process ended, exit loop
↓ No
Append to buffer and assistant_response

Attempt sentence splitting

Queue timeout and child process dead? ─── Yes ───→ Exit loop
↓ No
Continue reading
Sentence Splitting Logic

Complete Sentence Split: Split immediately upon encountering 。!?.!?

buffer = "The sky is blue. That's because"
↓ Found period
yield "The sky is blue."
buffer = "That's because"

Forced Split: When buffer exceeds 25 characters, find soft split point

buffer = "This phenomenon is called Rayleigh scattering, when sunlight"
|←── If exceeds 25 characters ──→|
↓ Found comma
yield "This phenomenon is called Rayleigh scattering, "
buffer = "when sunlight"

Child Process Function (_stream_worker)

Responsibilities:

  • Create OpenAI client connection to Ollama-compatible API
  • Initiate streaming request
  • Put each chunk's text content into cross-process queue for main process to retrieve generated text
  • Put None as end signal after request completion

Parameter Explanation:

  • base_url: API base URL
  • model: Model name
  • messages_payload: Complete message list
  • queue: Cross-process communication queue
  • api_key: API key

set_interrupted

Purpose: Force terminate a running child process and discard generated text when needed

Execution Steps:

  1. Check if process is alive
  2. Call terminate() to send SIGTERM
  3. Call join(timeout=1) to wait for exit
  4. Close and clean up Queue
  5. Reset properties to None

Environment Variables Configuration

VariableDefaultDescription
LLM_KEY"ollama"API key, required by OpenAI SDK; can be ignored for local LLM
LLM_MODEL"qwen2.5:1.5b"Model in use
SYS_MESSAGESee codeSystem prompt

Fourth Layer: Text-to-Speech Module (TTS)

PiperProvider is a text-to-speech (TTS) service provider based on Piper-TTS. This module uses ONNX Runtime for model inference, supports CUDA acceleration, and provides optional PyAudio real-time playback functionality.

License Statement: This file integrates Piper-TTS under GPL-3.0-or-later license.


Core Architecture

┌─────────────────────────────────────────────────────────┐
│ PiperProvider │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PiperVoice │ → │ ONNX Runtime│ → │ CUDA │ │
│ │ │ │ │ │ Acceleration│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↓ │
│ ┌─────────────────┐ ┌─────────────┐ │
│ │ SynthesisConfig │ │ PyAudio │ │
│ │ │ │ Playback │ │
│ └─────────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘

Data Flow

Input Text

PiperVoice.synthesize()

Generate audio chunks

Extract 16-bit PCM byte stream

Output/Playback

Initialization Flow

__init__ Execution Order

Start

Read MODEL_DIR environment variable

Build model and config paths

Configure SynthesisConfig (voice parameters)

Call _load_model() to load model

Record loading time

init_player=True?
├─ Yes → Initialize PyAudio player
└─ No → Skip

Complete

Path Configuration

Default model path structure:

{MODEL_DIR}/
└── piper_voices/
└── zh/
├── zh_CN-huayan-medium.onnx ## Model file
└── zh_CN-huayan-medium.onnx.json ## Config file

Environment Variables:

VariableDefaultDescription
MODEL_DIR/home/nvidia/Model root directory
TORCHDYNAMO_DISABLE"1"Disable Torch dynamic compilation
DISABLE_TORCH_COMPILE"1"Disable Torch compile optimization

SynthesisConfig Parameters Explanation

Voice synthesis configuration controls output voice characteristics:

ParameterCurrent ValueSuggested RangeDescription
length_scale1.10.8-1.5Speech rate control. lt 1 accelerates, gt 1 decelerates
noise_scale0.40.3-1.0Random noise. Low for clarity, high for richness
noise_w_scale0.50.3-1.2Phoneme duration variation. Affects naturalness
normalize_audioTrue-Audio volume normalization to prevent clipping
volume1.00.5-2.0Overall volume

Tuning Guidelines

GoalAdjustment
More naturalnoise_w_scale (0.8-1.2)
Clearer soundnoise_scale (0.3-0.6)
Faster speechlength_scale (0.8-1.0)
Low volumevolume or enable normalize_audio

Key Methods

_load_model

Responsibility: Load ONNX model into memory

Execution Steps:

  1. Call PiperVoice.load() to load model file
  2. Read JSON config file
  3. Enable CUDA acceleration (use_cuda=True)
  4. Record model sample rate

get_audio_param

Purpose: Synthesize a short audio segment using fixed test text, extract audio parameters (sample rate, channels, sample width) from the first chunk for instantiating AudioPlayer.

Test text: "你好,我是天工形者,很高兴认识你。"

Call synthesize()

Extract from first chunk:
- sample_rate
- sample_channels
- sample_width

Return (sample_rate, channels, sample_width)

Fallback Strategy: Return default values (21000, 1, 2) if synthesis fails

tts(text) -> bytes

Purpose: Convert text to PCM audio byte stream

Flow:

Input text

piper_instance.synthesize(text, syn_config)

Iterate chunks, extract audio_int16_bytes

Merge all byte streams

Return complete audio data (16-bit PCM)

Return Value: Playable 16-bit PCM byte stream, returns empty bytes b'' on failure

play(waveform: bytes)

Purpose: Play audio via PyAudio

Prerequisite: Initialized with init_player=True

Execution: Call configured audio_stream.write() to write audio data


Fifth Layer: Audio Playback Module AudioPlayer Class

Implements asynchronous queue-based audio playback, supporting multiple audio sources, playback status tracking, and thread-safe concurrent access.


Function Explanation

wait_for_audio_ready(max_wait=5)

Purpose: Block and wait for audio output device to be ready (since connecting microphone directly on orin1 usually causes errors, multiple attempts are made for successful connection)


AudioPlayer Class

Design Philosophy

Employs producer-consumer pattern for asynchronous playback:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ Caller │ ──→ │ Queue │ ──→ │ Playback │
│ play(data) │ │ (audio_data) │ │ thread │
└──────────────┘ └──────────────┘ └──────────────┘

┌──────────────┐
│ PyAudio │
│ stream │
└──────────────┘

Features:

  • Caller doesn't need to wait for playback completion
  • Audio queue automatically queues playback

Initialization Flow

__init__

Call wait_for_audio_ready() to wait for device

Create PyAudio instance, get default device info

Initialize locks and state variables

Configure PCM parameters (sample rate, channels, bit depth)

Open audio output stream

Start background playback thread

PCM Parameters Configuration

ParameterDefaultDescription
sample_rate21000Sample rate, matches Piper-TTS model
channels1Number of channels (mono)
sample_width2Sample bit depth (16-bit = 2 bytes)
frames_per_buffer1024Buffer frame count

These parameters are for the audio data to be played. The player must configure them correctly; incorrect settings will cause sound distortion or pure noise.


Key Methods

is_speaking() -> bool

Purpose: Determine if audio is currently playing or queued. The principle is to calculate the estimated playback time for each audio segment when placing it in the playback queue, accumulate and save the playback completion time, and check whether the current time is before the completion time.

Judgment Logic:

Check if queue has pending audio

Yes? ─── Yes ───→ Return True
↓ No
Check if current time < playback_deadline

Yes? ───→ Return True (hardware buffer still has data)
↓ No
Return False
set_audioid(text)

Purpose: Switch current audio source identifier and create corresponding queue

Execution Steps:

  1. Lock and update audioid
  2. Create new Queue in audio_queues_map
play(audio_data: bytes)

Purpose: Add audio data to playback queue

Internal Call: try_put(self.get_audioid(), audio_data)

try_put(audioid, audio_data)

Purpose: Add audio data to specified queue, discard oldest data if queue is full

Flow:

Check if audioid exists in map
↓ No
Create new queue and add to map

Try to put data (timeout 1 second)

Queue full? ─── Yes ───→ Pop oldest data, put new data
keep_playing_audio() [Background Thread]

Purpose: Continuously read audio data from queue and play it

Main Loop Flow:

while stop_event not set:

Ensure audio stream started

Get queue for current audioid

Queue doesn't exist or empty? ─── Yes ───→ sleep 0.01s, continue loop
↓ No
Get audio data

Call _mark_audio_playing() to update deadline

Write to audio stream for playback
_mark_audio_playing(audio_data)

Purpose: Calculate playback duration based on audio data size, update deadline

Calculation Formula:

chunk_duration = len(audio_data) / (sample_rate * channels * sample_width)
playback_deadline = max(current_time, previous_deadline) + chunk_duration + output_delay

Key Point: Use max() to ensure multiple audio chunks accumulate sequentially, not overlap

open_stream()

Purpose: Open PyAudio output stream

Retry Mechanism:

  • Maximum 3 retries
  • Only retry for Invalid sample rate error (-9997)
  • 3-second interval between retries
close()

Purpose: Release all resources

Cleanup Order:

Set stop_event to stop playback thread

Wait for playback thread to end (max 2 seconds)

Reset playback_deadline

Stop and close audio stream

Terminate PyAudio instance

Sixth Layer: Complete Processing Flow

Module Files

  • tk_audio_process.py - Core orchestration node

AudioProcess is a ROS2 node that implements complete speech conversation flow: ASR Text → LLM Inference → TTS Synthesis → Audio Playback. Overall adopts multi-threaded pipeline architecture with stages decoupled via queues, supporting speech interruption and parallel TTS synthesis.


Overall Flow

ASR Text (ROS2 topic: asr_sentence)


on_asr_sentence ← Interruption judgment, generate request_id, enqueue


asr_sentence_queue


[NLP Thread] ← LLM streaming inference, split by sentences, write to answer_text_queue


answer_text_queue


[TTS Thread × N] ← Parallel TTS synthesis, write to audio_segment_queue


audio_segment_queue


[PLAYBACK Thread] ← Deliver to AudioPlayer in order by sequence_index


AudioPlayer ← Hardware playback

Thread Model

ThreadCountEntry Method
NLP1keep_question_to_answer_by_ollama
TTSControlled by TTS_WORKERS env var (default 5)keep_answer_to_audio
PLAYBACK1keep_audio_segments_in_order

All threads are daemon threads, coordinated exit via stop_event (threading.Event).


Request ID Mechanism

A UUID is generated as request_id for each new question, simultaneously written to AudioPlayer.audioid. Each pipeline stage calls _is_current_request(request_id) before and after processing to compare IDs; mismatches are discarded. This allows safe handling of interruptions and duplicate questions without explicit cancellation.


Key Methods

__init__

Initialization requires strict order: first initialize PiperProvider (TTS engine) to get audio parameters (sample rate, channels, bit depth), then use these to initialize AudioPlayer, finally initialize LLMClient and start three worker threads.

on_asr_sentence(msg)

ROS2 callback entry for ASR messages, handles three scenarios:

  • Playing & No Interrupt Word: Ignore directly, prevent interference with current conversation.
  • Playing & Contains Interrupt Word ("天工"/"天空"/"天宫"): Generate new request_id, clear all queues, stop current playback, but don't start new round of Q&A.
  • Idle State (or New Question): Generate new request_id, clear queue, write (request_id, text) to asr_sentence_queue.

Both scenarios that change request_id call llm_client.set_interrupted(True) to early terminate streaming inference.

keep_question_to_answer_by_ollama (NLP Thread)

Fetch question from asr_sentence_queue, call llm_client.stream_sentence for streaming LLM response. Each text segment written to answer_text_queue as (request_id, sequence_index, chunk), with sequence_index incrementing from 0 for later sorting. Check _is_current_request before each write; abort current turn if request has expired.

keep_answer_to_audio (TTS Thread)

Fetch text segment from answer_text_queue, call tts_service.tts() to synthesize PCM data. Check _is_current_request before and after TTS execution; discard if expired to avoid wasted computation and playback. Write synthesis result to audio_segment_queue as (request_id, sequence_index, text, pcm_bytes). Parallel processing of multiple threads significantly reduces end-to-end latency.

keep_audio_segments_in_order (PLAYBACK Thread)

Since TTS executes concurrently, audio segments may arrive out of order. This thread uses pending_segments dict to cache synthesized but out-of-order segments, next_sequence_by_request records the next sequence number to play for each request. Only in-order segments call audio_player.try_put for playback. Clear cache immediately when audioid changes (new request or interrupt).

_is_current_request(request_id)

Compare input ID with AudioPlayer.get_audioid(), for pipeline stages to quickly judge if task is still valid.

_drain_queue(queue_obj)

Non-blocking queue clearing, quickly discard accumulated tasks in interrupt scenarios.

close

Ordered shutdown: set stop_event → interrupt LLM inference → sequentially join NLP, TTS, PLAYBACK threads (each with timeout) → close AudioPlayer. Protection against "thread joining itself" prevents deadlock.


Environment Variables

VariableDefaultDescription
TTS_WORKERS5TTS concurrent thread count

Dependent Modules

ModuleResponsibility
PiperProviderTTS engine wrapper (Piper-TTS, GPL-3.0)
LLMClientLLM inference client, supports streaming and interruption
AudioPlayerAudio player, maintains playback queue, audioid, and speaking state