3.4.Module Detailed Explanation
First Layer: Audio Acquisition Module
Module Files
socket_connector.py- Socket connection managementsocket_audio_provider.py- Audio reception and parsingtk_audio_publisher.py- Publish audio to ROS
Data Flow
RK3588s Microphone Array
↓
Socket Server (IP: 10.42.0.127, Port: 9080)
↓
SocketConnector Establish Connection
↓
SocketAudioProvider Receive Raw Data
↓
SocketAudioPublisher Publish to ROS Topic
Key Concept: VAD (Voice Activity Detection)
When receiving audio data, need to identify when speaking vs. silence:
VAD Status Code:
0 = Silence
1 = Start Speaking (New sentence begins)
2 = Continuous Speaking
3 = End Speaking (Sentence ends)
Implementation Logic:
def keep_receiving_publish_audio():
while True:
audio_data, vad = socket_provider.read()
if vad == 1: # Start speaking
audio_buffer.clear() # Clear old data
audio_buffer.extend(audio_data)
elif vad == 2: # Continuous speaking
audio_buffer.extend(audio_data) # Continue appending
elif vad == 3: # End speaking
audio_buffer.extend(audio_data)
# Complete sentence audio ready, publish to topic
publish_sentence(audio_buffer)
audio_buffer.clear()
Code Explanation: SocketConnector
class SocketConnector:
def __init__(self, ip: str, port: int):
# Create TCP Socket
self.client_socket = socket(AF_INET, SOCK_STREAM)
# Connect to server
self.client_socket.connect((ip, port))
# Set receive timeout 3 seconds
self.client_socket.settimeout(3.0)
def receive_full_data(self, expected_length):
"""
Receive complete data block
Since TCP is a stream protocol, data may be received in multiple chunks:
For example, to receive 1000 bytes, may get 500 first, then 500
"""
received_data = bytearray()
while len(received_data) < expected_length:
# Receive up to 4096 bytes at a time
chunk = self.client_socket.recv(4096)
if not chunk: # Server closed connection
return None
received_data.extend(chunk)
return bytes(received_data)
Code Explanation: SocketAudioProvider
This is the detailed code for receiving audio data from RK3588s. Why this approach? TienKung is equipped with iFLYTEK's RK3588 AIUI multimodal development kit. Its audio transmission protocol for you reference.
It clearly specifies that bytes 0-8 are the request header, corresponding to header = self.receive_full_data(9) below. Byte 9 is the VAD field, corresponding to vad = body[0] in the code below. Bytes 17 to n are the detailed audio data, corresponding to audio_data = body[8:-1] below.
class SocketAudioProvider(SocketConnector):
def read(self):
"""Parse audio packet protocol"""
# 1. Receive 9-byte header
header = self.receive_full_data(9)
# Format: [Sync, UserID, MessageType, MessageLength, MessageID]
# Binary format: 1 byte + 1 byte + 1 byte + 2 bytes + 4 bytes
sync_head, user_id, msg_type, msg_length, msg_id = struct.unpack(
'<BBBIH', # < = little-endian, B=byte, I=int, H=short
header
)
# 2. Validate header
if sync_head != 0xa5 or user_id != 0x01:
return None # Invalid packet
# 3. Receive message body
body = self.receive_full_data(msg_length + 1)
# Format: [VAD, Channel, ...] + CRC checksum byte
vad = body[0] # VAD status
audio_data = body[8:-1] # Extract audio data
return audio_data, vad
Code Explanation: SocketAudioPublisher
class SocketAudioPublisher(Node):
def __init__(self):
super().__init__('tk_audio_publisher')
# Create two publishers
self.publisher_ = self.create_publisher(
AudioFrame, # Message type
'audio_frames', # Topic name
10 # Queue size
)
self.sentence_publisher_ = self.create_publisher(
AudioFrame,
'audio_sentence_frames',
10
)
# Create Socket provider
self.audio_provider = SocketAudioProvider('10.42.0.127', 9080)
# Start background receiving thread
self.receive_pub_thread = threading.Thread(
target=self.keep_receiving_publish_audio
)
self.receive_pub_thread.start()
def keep_receiving_publish_audio(self):
"""Background thread: continuously receive and publish audio"""
audio_buffer = bytearray()
while not self.stop_event.is_set():
audio_res = self.audio_provider.read()
if audio_res is None:
continue
audio_data, vad = audio_res
if vad == 1: # Start speaking
audio_buffer.clear()
audio_buffer.extend(audio_data)
elif vad == 2: # Continuous speaking
audio_buffer.extend(audio_data)
elif vad == 3: # End speaking
audio_buffer.extend(audio_data)
# Publish complete sentence audio
msg = AudioFrame()
msg.data = bytes(audio_buffer)
self.sentence_publisher_.publish(msg)
self.get_logger().info(
f"Published complete audio, length: {len(audio_buffer)}"
)
audio_buffer.clear()
Second Layer: Speech Recognition Module (ASR)
Module Files
funasr_client.py- Funasr WebSocket clienttk_asr_text_publisher.py- ASR text publishing
Data Flow
audio_sentence_frames Topic
↓
tk_asr_text_publisher Subscribe
↓
funasr_client WebSocket Send Audio
↓
x86 Funasr Service Recognition
↓
WebSocket Return Recognition Result
↓
asr_sentence Topic Publish
Core Concept: WebSocket Streaming Recognition
// 1. Connect to WebSocket server
ws = websocket.connect("wss://192.168.41.1:10097")
// 2. Send handshake message (tell server recognition parameters)
ws.send({
"mode": "offline", // Offline mode
"sample_rate": 16000, // Sample rate
"chunk_size": [5,10,5] // Audio chunk size for each send
})
// 3. Send audio in chunks (streaming)
for chunk in audio_chunks:
ws.send_binary(chunk)
// 4. Receive recognition results (also streaming)
for result in ws.recv():
text = result["text"]
print(f"Recognition result: {text}")
Code Explanation: FunASRClient
class FunASRClient:
def __init__(self, host="192.168.41.1", port=10097):
self.host = host
self.port = port
self.ssl_enabled = True # Use WSS encrypted connection
# Recognition parameter configuration
self.sample_rate = 16000
self.chunk_size = [5, 10, 5] # Audio chunk size (milliseconds)
self.chunk_interval = 10 # Chunk interval
# WebSocket connection (via global async loop)
self.loop = GlobalAsyncLoop.get_loop()
self.connect()
async def _connect(self):
"""Establish WebSocket connection"""
# Construct WebSocket URI
uri = f"wss://{self.host}:{self.port}"
# SSL context (disable certificate verification)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Connect
return await websockets.connect(
uri,
subprotocols=["binary"],
ping_interval=None, # Disable heartbeat
ssl=ssl_context
)
def to_text(self, audio_bytes: bytes) -> str:
"""
Synchronous interface: input audio bytes, output recognition text
"""
# Execute async function within async loop
future = asyncio.run_coroutine_threadsafe(
self._async_to_text(audio_bytes),
self.loop
)
return future.result(timeout=30)
async def _async_to_text(self, audio_bytes: bytes) -> str:
"""Asynchronous recognition implementation"""
ws = await self.try_init_connection()
# 1. Send handshake message
start_signal = {
"mode": "offline",
"sample_rate": self.sample_rate,
"chunk_size": self.chunk_size,
"chunk_interval": self.chunk_interval,
}
await ws.send(json.dumps(start_signal))
# 2. Send audio in chunks
chunk_bytes = (self.sample_rate / 1000) * 2 # Bytes per millisecond
for i in range(0, len(audio_bytes), int(chunk_bytes * 10)):
chunk = audio_bytes[i:i+int(chunk_bytes*10)]
await ws.send(chunk) # Send binary data
await asyncio.sleep(0.01)
# 3. Send end signal
await ws.send(json.dumps({"mode": "offline"}))
# 4. Receive recognition result
result_text = ""
async for message in ws:
data = json.loads(message)
text = data.get("text", "")
if text:
result_text = text
if data.get("done"):
break
return result_text
Code Explanation: FunASRTextPublisher
class FunASRTextPublisher(Node):
def __init__(self):
super().__init__('tk_asr_text_publisher')
# Subscribe to audio topic
self.subscription = self.create_subscription(
AudioFrame,
'audio_sentence_frames',
self.audio_sentence_callback,
10
)
# Create publisher
self.asr_sentence_publisher = self.create_publisher(
String,
'/asr_sentence',
10
)
# Audio processing queue
self.audio_queue = Queue(maxsize=1)
# Funasr client
self.asr_service = FunASRClient(
host="192.168.41.1",
port=10097
)
# Background processing thread
self.process_thread = threading.Thread(
target=self.keep_process_audio
)
self.process_thread.start()
def audio_sentence_callback(self, msg: AudioFrame):
"""
Subscribe callback: called when receiving complete sentence audio
"""
# Put audio into queue
self.audio_queue.put(msg)
def keep_process_audio(self):
"""Background thread: process audio and recognize"""
while True:
try:
# Get audio from queue (wait max 2 seconds)
msg = self.audio_queue.get(timeout=2)
# Convert to bytes
audio_bytes = bytes(msg.data)
self.get_logger().info(
f"Received {len(audio_bytes)} bytes audio, starting recognition..."
)
# Call Funasr for recognition
asr_sentence = self.asr_service.to_text(audio_bytes)
# Filter out too-short recognition results
if len(asr_sentence) <= 3:
self.get_logger().info(f"Ignore short result: {asr_sentence}")
continue
# Publish recognition result
text_msg = String()
text_msg.data = asr_sentence
self.asr_sentence_publisher.publish(text_msg)
self.get_logger().info(f"Published recognition result: {asr_sentence}")
except Empty:
continue # Timeout, keep waiting
except Exception as e:
self.get_logger().error(f"Recognition error: {e}")
Third Layer: Large Model Natural Language Understanding Module (LLMClient)
LLMClient is a client class that interacts with large language model (LLM) services, specifically designed to communicate with Ollama services. It employs a multi-process architecture for streaming output, automatically determines whether the LLM service runs on orin1 or orin2, and supports intelligent sentence splitting.
Core Architecture
Multi-Process Design
The module uses the spawn method of multiprocessing to create child processes for the following reasons:
- Non-Blocking: HTTP streaming requests execute in child processes while the main process remains responsive
- Interruptibility: Users can interrupt the current request at any time and start a new conversation
- Cross-Platform Compatibility: The
spawnmode behaves consistently on Windows and Linux
┌─────────────────────────────────────────────────────────┐
│ Main Process │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Sentence │ ← │ Queue Read │ ← │ Result │ │
│ │ Concatenate │ │ │ │ Processing │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
↑ Queue
┌─────────────────────────────────────────────────────────┐
│ Child Process │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ OpenAI SDK │ → │ Stream │ → │ Queue Write │ │
│ │ │ │ Iteration │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
Class Attributes Explanation
Configuration Attributes
| Attribute | Default | Description |
|---|---|---|
max_history | 1 | Number of historical conversation rounds (1 round = user + assistant = 2 messages) |
history | deque(maxlen=2) | Double-ended queue that automatically limits history length |
sentence_endings | "。!?.!?" | Forced sentence-ending punctuation |
soft_endings | ",、;;," | Soft-split punctuation (works with max_len) |
max_len | 25 | Soft-split trigger threshold (character count) |
Service Discovery Attributes
| Attribute | Description |
|---|---|
active_llm_ip | Auto-detected Ollama service IP |
llm_endpoint | Complete API endpoint URL |
api_key | API key (Ollama default is "ollama") |
llm_model | Model name in use, default qwen2.5:1.5b |
Execution Flow
1. Initialization Flow (__init__)
Start
↓
Set history queue (deque)
↓
Call test_llm_ip() to probe service
↓
Configure API endpoint, key, model
↓
Set system message
↓
Complete
2. Service Discovery Flow (test_llm_ip)
This method automatically detects the Ollama service location:
Iterate candidate IPs: [192.168.41.3, 192.168.41.2]
↓
Make GET /api/version request for each IP
↓
Successful? ─── Yes ───→ Return this IP
↓ No
Try next IP
↓
All failed? ───→ Return None, subsequent requests will be rejected
Design Points:
- Use
urllib.requestinstead ofhttpxto reduce dependencies - Log detailed information for troubleshooting network issues
3. Streaming Request Flow (stream_sentence)
This is the core method with the complete flow below:
Startup Phase
- Interrupt Previous Child Process: Call
set_interrupted(True)to terminate the previous child process - Build Message Payload: Combine system message, historical messages, and current user input
- Create Child Process: Start
_stream_workerfunction
Streaming Retrieval Phase
Loop reading Queue
↓
Received None? ─── Yes ───→ Child process ended, exit loop
↓ No
Append to buffer and assistant_response
↓
Attempt sentence splitting
↓
Queue timeout and child process dead? ─── Yes ───→ Exit loop
↓ No
Continue reading
Sentence Splitting Logic
Complete Sentence Split: Split immediately upon encountering 。!?.!?
buffer = "The sky is blue. That's because"
↓ Found period
yield "The sky is blue."
buffer = "That's because"
Forced Split: When buffer exceeds 25 characters, find soft split point
buffer = "This phenomenon is called Rayleigh scattering, when sunlight"
|←── If exceeds 25 characters ──→|
↓ Found comma
yield "This phenomenon is called Rayleigh scattering, "
buffer = "when sunlight"
Child Process Function (_stream_worker)
Responsibilities:
- Create OpenAI client connection to Ollama-compatible API
- Initiate streaming request
- Put each chunk's text content into cross-process queue for main process to retrieve generated text
- Put
Noneas end signal after request completion
Parameter Explanation:
base_url: API base URLmodel: Model namemessages_payload: Complete message listqueue: Cross-process communication queueapi_key: API key
set_interrupted
Purpose: Force terminate a running child process and discard generated text when needed
Execution Steps:
- Check if
processis alive - Call
terminate()to send SIGTERM - Call
join(timeout=1)to wait for exit - Close and clean up Queue
- Reset properties to None
Environment Variables Configuration
| Variable | Default | Description |
|---|---|---|
LLM_KEY | "ollama" | API key, required by OpenAI SDK; can be ignored for local LLM |
LLM_MODEL | "qwen2.5:1.5b" | Model in use |
SYS_MESSAGE | See code | System prompt |
Fourth Layer: Text-to-Speech Module (TTS)
PiperProvider is a text-to-speech (TTS) service provider based on Piper-TTS. This module uses ONNX Runtime for model inference, supports CUDA acceleration, and provides optional PyAudio real-time playback functionality.
License Statement: This file integrates Piper-TTS under GPL-3.0-or-later license.
Core Architecture
┌─────────────────────────────────────────────────────────┐
│ PiperProvider │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PiperVoice │ → │ ONNX Runtime│ → │ CUDA │ │
│ │ │ │ │ │ Acceleration│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↓ │
│ ┌─────────────────┐ ┌─────────────┐ │
│ │ SynthesisConfig │ │ PyAudio │ │
│ │ │ │ Playback │ │
│ └─────────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
Data Flow
Input Text
↓
PiperVoice.synthesize()
↓
Generate audio chunks
↓
Extract 16-bit PCM byte stream
↓
Output/Playback
Initialization Flow
__init__ Execution Order
Start
↓
Read MODEL_DIR environment variable
↓
Build model and config paths
↓
Configure SynthesisConfig (voice parameters)
↓
Call _load_model() to load model
↓
Record loading time
↓
init_player=True?
├─ Yes → Initialize PyAudio player
└─ No → Skip
↓
Complete
Path Configuration
Default model path structure:
{MODEL_DIR}/
└── piper_voices/
└── zh/
├── zh_CN-huayan-medium.onnx ## Model file
└── zh_CN-huayan-medium.onnx.json ## Config file
Environment Variables:
| Variable | Default | Description |
|---|---|---|
MODEL_DIR | /home/nvidia/ | Model root directory |
TORCHDYNAMO_DISABLE | "1" | Disable Torch dynamic compilation |
DISABLE_TORCH_COMPILE | "1" | Disable Torch compile optimization |
SynthesisConfig Parameters Explanation
Voice synthesis configuration controls output voice characteristics:
| Parameter | Current Value | Suggested Range | Description |
|---|---|---|---|
length_scale | 1.1 | 0.8-1.5 | Speech rate control. lt 1 accelerates, gt 1 decelerates |
noise_scale | 0.4 | 0.3-1.0 | Random noise. Low for clarity, high for richness |
noise_w_scale | 0.5 | 0.3-1.2 | Phoneme duration variation. Affects naturalness |
normalize_audio | True | - | Audio volume normalization to prevent clipping |
volume | 1.0 | 0.5-2.0 | Overall volume |
Tuning Guidelines
| Goal | Adjustment |
|---|---|
| More natural | ↑ noise_w_scale (0.8-1.2) |
| Clearer sound | ↓ noise_scale (0.3-0.6) |
| Faster speech | ↓ length_scale (0.8-1.0) |
| Low volume | ↑ volume or enable normalize_audio |
Key Methods
_load_model
Responsibility: Load ONNX model into memory
Execution Steps:
- Call
PiperVoice.load()to load model file - Read JSON config file
- Enable CUDA acceleration (
use_cuda=True) - Record model sample rate
get_audio_param
Purpose: Synthesize a short audio segment using fixed test text, extract audio parameters (sample rate, channels, sample width) from the first chunk for instantiating AudioPlayer.
Test text: "你好,我是天工形者,很高兴认识你。"
↓
Call synthesize()
↓
Extract from first chunk:
- sample_rate
- sample_channels
- sample_width
↓
Return (sample_rate, channels, sample_width)
Fallback Strategy: Return default values (21000, 1, 2) if synthesis fails
tts(text) -> bytes
Purpose: Convert text to PCM audio byte stream
Flow:
Input text
↓
piper_instance.synthesize(text, syn_config)
↓
Iterate chunks, extract audio_int16_bytes
↓
Merge all byte streams
↓
Return complete audio data (16-bit PCM)
Return Value: Playable 16-bit PCM byte stream, returns empty bytes b'' on failure
play(waveform: bytes)
Purpose: Play audio via PyAudio
Prerequisite: Initialized with init_player=True
Execution: Call configured audio_stream.write() to write audio data
Fifth Layer: Audio Playback Module AudioPlayer Class
Implements asynchronous queue-based audio playback, supporting multiple audio sources, playback status tracking, and thread-safe concurrent access.
Function Explanation
wait_for_audio_ready(max_wait=5)
Purpose: Block and wait for audio output device to be ready (since connecting microphone directly on orin1 usually causes errors, multiple attempts are made for successful connection)
AudioPlayer Class
Design Philosophy
Employs producer-consumer pattern for asynchronous playback:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Caller │ ──→ │ Queue │ ──→ │ Playback │
│ play(data) │ │ (audio_data) │ │ thread │
└──────────────┘ └──────────────┘ └──────────────┘
↓
┌──────────────┐
│ PyAudio │
│ stream │
└──────────────┘
Features:
- Caller doesn't need to wait for playback completion
- Audio queue automatically queues playback
Initialization Flow
__init__
↓
Call wait_for_audio_ready() to wait for device
↓
Create PyAudio instance, get default device info
↓
Initialize locks and state variables
↓
Configure PCM parameters (sample rate, channels, bit depth)
↓
Open audio output stream
↓
Start background playback thread
PCM Parameters Configuration
| Parameter | Default | Description |
|---|---|---|
sample_rate | 21000 | Sample rate, matches Piper-TTS model |
channels | 1 | Number of channels (mono) |
sample_width | 2 | Sample bit depth (16-bit = 2 bytes) |
frames_per_buffer | 1024 | Buffer frame count |
These parameters are for the audio data to be played. The player must configure them correctly; incorrect settings will cause sound distortion or pure noise.
Key Methods
is_speaking() -> bool
Purpose: Determine if audio is currently playing or queued. The principle is to calculate the estimated playback time for each audio segment when placing it in the playback queue, accumulate and save the playback completion time, and check whether the current time is before the completion time.
Judgment Logic:
Check if queue has pending audio
↓
Yes? ─── Yes ───→ Return True
↓ No
Check if current time < playback_deadline
↓
Yes? ───→ Return True (hardware buffer still has data)
↓ No
Return False
set_audioid(text)
Purpose: Switch current audio source identifier and create corresponding queue
Execution Steps:
- Lock and update
audioid - Create new Queue in
audio_queues_map
play(audio_data: bytes)
Purpose: Add audio data to playback queue
Internal Call: try_put(self.get_audioid(), audio_data)
try_put(audioid, audio_data)
Purpose: Add audio data to specified queue, discard oldest data if queue is full
Flow:
Check if audioid exists in map
↓ No
Create new queue and add to map
↓
Try to put data (timeout 1 second)
↓
Queue full? ─── Yes ───→ Pop oldest data, put new data
keep_playing_audio() [Background Thread]
Purpose: Continuously read audio data from queue and play it
Main Loop Flow:
while stop_event not set:
↓
Ensure audio stream started
↓
Get queue for current audioid
↓
Queue doesn't exist or empty? ─── Yes ───→ sleep 0.01s, continue loop
↓ No
Get audio data
↓
Call _mark_audio_playing() to update deadline
↓
Write to audio stream for playback
_mark_audio_playing(audio_data)
Purpose: Calculate playback duration based on audio data size, update deadline
Calculation Formula:
chunk_duration = len(audio_data) / (sample_rate * channels * sample_width)
playback_deadline = max(current_time, previous_deadline) + chunk_duration + output_delay
Key Point: Use max() to ensure multiple audio chunks accumulate sequentially, not overlap
open_stream()
Purpose: Open PyAudio output stream
Retry Mechanism:
- Maximum 3 retries
- Only retry for Invalid sample rate error (-9997)
- 3-second interval between retries
close()
Purpose: Release all resources
Cleanup Order:
Set stop_event to stop playback thread
↓
Wait for playback thread to end (max 2 seconds)
↓
Reset playback_deadline
↓
Stop and close audio stream
↓
Terminate PyAudio instance
Sixth Layer: Complete Processing Flow
Module Files
tk_audio_process.py- Core orchestration node
AudioProcess is a ROS2 node that implements complete speech conversation flow: ASR Text → LLM Inference → TTS Synthesis → Audio Playback. Overall adopts multi-threaded pipeline architecture with stages decoupled via queues, supporting speech interruption and parallel TTS synthesis.
Overall Flow
ASR Text (ROS2 topic: asr_sentence)
│
▼
on_asr_sentence ← Interruption judgment, generate request_id, enqueue
│
▼
asr_sentence_queue
│
▼
[NLP Thread] ← LLM streaming inference, split by sentences, write to answer_text_queue
│
▼
answer_text_queue
│
▼
[TTS Thread × N] ← Parallel TTS synthesis, write to audio_segment_queue
│
▼
audio_segment_queue
│
▼
[PLAYBACK Thread] ← Deliver to AudioPlayer in order by sequence_index
│
▼
AudioPlayer ← Hardware playback
Thread Model
| Thread | Count | Entry Method |
|---|---|---|
| NLP | 1 | keep_question_to_answer_by_ollama |
| TTS | Controlled by TTS_WORKERS env var (default 5) | keep_answer_to_audio |
| PLAYBACK | 1 | keep_audio_segments_in_order |
All threads are daemon threads, coordinated exit via stop_event (threading.Event).
Request ID Mechanism
A UUID is generated as request_id for each new question, simultaneously written to AudioPlayer.audioid. Each pipeline stage calls _is_current_request(request_id) before and after processing to compare IDs; mismatches are discarded. This allows safe handling of interruptions and duplicate questions without explicit cancellation.
Key Methods
__init__
Initialization requires strict order: first initialize PiperProvider (TTS engine) to get audio parameters (sample rate, channels, bit depth), then use these to initialize AudioPlayer, finally initialize LLMClient and start three worker threads.
on_asr_sentence(msg)
ROS2 callback entry for ASR messages, handles three scenarios:
- Playing & No Interrupt Word: Ignore directly, prevent interference with current conversation.
- Playing & Contains Interrupt Word ("天工"/"天空"/"天宫"): Generate new request_id, clear all queues, stop current playback, but don't start new round of Q&A.
- Idle State (or New Question): Generate new request_id, clear queue, write
(request_id, text)toasr_sentence_queue.
Both scenarios that change request_id call llm_client.set_interrupted(True) to early terminate streaming inference.
keep_question_to_answer_by_ollama (NLP Thread)
Fetch question from asr_sentence_queue, call llm_client.stream_sentence for streaming LLM response. Each text segment written to answer_text_queue as (request_id, sequence_index, chunk), with sequence_index incrementing from 0 for later sorting. Check _is_current_request before each write; abort current turn if request has expired.
keep_answer_to_audio (TTS Thread)
Fetch text segment from answer_text_queue, call tts_service.tts() to synthesize PCM data. Check _is_current_request before and after TTS execution; discard if expired to avoid wasted computation and playback. Write synthesis result to audio_segment_queue as (request_id, sequence_index, text, pcm_bytes). Parallel processing of multiple threads significantly reduces end-to-end latency.
keep_audio_segments_in_order (PLAYBACK Thread)
Since TTS executes concurrently, audio segments may arrive out of order. This thread uses pending_segments dict to cache synthesized but out-of-order segments, next_sequence_by_request records the next sequence number to play for each request. Only in-order segments call audio_player.try_put for playback. Clear cache immediately when audioid changes (new request or interrupt).
_is_current_request(request_id)
Compare input ID with AudioPlayer.get_audioid(), for pipeline stages to quickly judge if task is still valid.
_drain_queue(queue_obj)
Non-blocking queue clearing, quickly discard accumulated tasks in interrupt scenarios.
close
Ordered shutdown: set stop_event → interrupt LLM inference → sequentially join NLP, TTS, PLAYBACK threads (each with timeout) → close AudioPlayer. Protection against "thread joining itself" prevents deadlock.
Environment Variables
| Variable | Default | Description |
|---|---|---|
TTS_WORKERS | 5 | TTS concurrent thread count |
Dependent Modules
| Module | Responsibility |
|---|---|
PiperProvider | TTS engine wrapper (Piper-TTS, GPL-3.0) |
LLMClient | LLM inference client, supports streaming and interruption |
AudioPlayer | Audio player, maintains playback queue, audioid, and speaking state |