Part 2: LiveRequestQueue でメッセージを送る¶
Part 1 では、ADK Gemini Live API Toolkit アプリケーションの
4 フェーズライフサイクルを学びました。今回はアップストリームの流れ、
つまり LiveRequestQueue を使ってアプリケーションからエージェントへ
メッセージを送る方法に焦点を当てます。
従来 API のようにメッセージ種別ごとに別エンドポイントやチャネルを使うのではなく、
ADK は LiveRequestQueue と LiveRequest メッセージモデルで
単一の統合インターフェースを提供します。今回は次を扱います。
- メッセージタイプ:
send_content()によるテキスト送信、send_realtime()による 音声/画像/動画ストリーミング、activity シグナルによる会話ターン制御、 control シグナルによるセッションの正常終了 - 並行処理パターン: 非同期キュー管理とイベントループのスレッド安全性
- ベストプラクティス: 非同期コンテキストでのキュー作成、適切なリソース解放、 メッセージ順序保証の理解
- トラブルシューティング: メッセージ未処理やキューライフサイクル問題の診断
LiveRequestQueue の理解は、非同期イベントループ内でマルチモーダル入力を
シームレスに扱う高応答なストリーミングアプリを作るうえで重要です。
LiveRequestQueue と LiveRequest¶
LiveRequestQueue は、ストリーミング会話でエージェントへメッセージを送る
主要インターフェースです。テキスト、音声、制御シグナルで別チャネルを管理する代わりに、
ADK は単一の LiveRequest コンテナで全メッセージタイプを扱います。
class LiveRequest(BaseModel):
content: Optional[Content] = None # Text-based content and structured data
blob: Optional[Blob] = None # Audio/video data and binary streams
activity_start: Optional[ActivityStart] = None # Signal start of user activity
activity_end: Optional[ActivityEnd] = None # Signal end of user activity
close: bool = False # Graceful connection termination signal
このシンプルな設計で、一般的なストリーミングシナリオを網羅できます。
content と blob はデータタイプを扱い、activity_start と activity_end は
activity signaling を担い、close フラグは正常終了を表します。
content と blob は排他的で、1 つの LiveRequest にはどちらか一方のみ設定します。
ADK はクライアント側でこれを強制しないため、両方設定すると送信を試みますが、
Live API バックエンドでバリデーションエラーになります。
ADK の send_content() と send_realtime() は片方だけ設定するため、
LiveRequest を手動作成するよりこれらメソッド利用を推奨します。
次の図は、各メッセージタイプがアプリケーションから LiveRequestQueue メソッド、
LiveRequest コンテナを経由して Live API に届く流れを示します。
graph LR
subgraph "Application"
A1[User Text Input]
A2[Audio Stream]
A3[Activity Signals]
A4[Close Signal]
end
subgraph "LiveRequestQueue Methods"
B1[send_content<br/>Content]
B2[send_realtime<br/>Blob]
B3[send_activity_start<br/>ActivityStart]
B3b[send_activity_end<br/>ActivityEnd]
B4[close<br/>close=True]
end
subgraph "LiveRequest Container"
C1[content: Content]
C2[blob: Blob]
C3[activity_start/end]
C4[close: bool]
end
subgraph "Gemini Live API"
D[WebSocket Connection]
end
A1 --> B1 --> C1 --> D
A2 --> B2 --> C2 --> D
A3 --> B3 --> C3 --> D
A3 --> B3b --> C3
A4 --> B4 --> C4 --> D
さまざまなメッセージタイプを送る¶
LiveRequestQueue は、メッセージタイプごとの便利メソッドを提供します。
このセクションでは、テキスト送信、音声/動画ストリーミング、
手動ターン制御の activity シグナル、セッション終了を扱います。
send_content(): ターン単位のテキスト送信¶
send_content() は turn-by-turn モードでテキストを送ります。
各メッセージは独立した会話ターンとして扱われ、
モデルに「ターン完了」を知らせて即時応答生成を促します。
content = types.Content(parts=[types.Part(text=json_message["text"])])
live_request_queue.send_content(content)
ADK Gemini Live API Toolkit における Content / Part の使い方:
-
Content(google.genai.types.Content): 会話の 1 メッセージ(1 ターン)を表す コンテナ。複数のPartを保持します。 -
Part(google.genai.types.Part): メッセージ内の個別コンテンツ要素。 ADK Gemini Live API Toolkit + Live API では主に次を使います。 text: モデルへ送るテキスト(コード含む)
実運用では単一 text Part がほとんどです。 マルチパート構造は次のような用途を想定しています。 - テキストと function response の混在(ADK が自動処理) - テキスト説明と構造化データの組み合わせ - 新しいコンテンツ型への将来拡張
Live API ではマルチモーダル入力(音声/動画)は
マルチパート Content ではなく send_realtime() を使います。
ADK Gemini Live API Toolkit における Content / Part
Gemini API の Part は inline_data、file_data、function_call、
function_response など多くのフィールドを持ちますが、
多くは ADK が自動処理するか、Live API では別機構を使います。
- 関数呼び出し: ADK が function call の受信、関数実行、 結果返却を自動処理します。手動構築は不要です。
- 画像/動画:
inline_data付きsend_content()は使わないでください。 連続ストリーミングにはsend_realtime(Blob(mime_type="image/jpeg", data=...))を使います。 詳細は Part 5: How to Use Image and Video を参照。
send_realtime(): 音声・画像・動画をリアルタイム送信¶
send_realtime() は主に音声/画像/動画などのバイナリストリームを送ります。
Blob 型で扱われ、リアルタイムモード向けです。
テキストがターン単位処理なのに対し、Blob は連続チャンク処理向けです。
生バイトを渡すと、Pydantic が JSON 直列化時に Base64 エンコードを自動実施し、
安全にネットワーク送信できます(LiveRequest.model_config)。
MIME タイプはモデルが形式を理解するために使われます。
audio_blob = types.Blob(
mime_type="audio/pcm;rate=16000",
data=audio_data
)
live_request_queue.send_realtime(audio_blob)
Learn More
音声・画像・動画の仕様、形式、ベストプラクティスは Part 5: How to Use Audio, Image and Video を参照してください。
Activity Signals¶
activity シグナル(ActivityStart / ActivityEnd)は、
RunConfig で自動(サーバー側)VAD を明示的に無効化した場合のみ送信できます。
次のケースで使います。
- Push-to-talk UI: ボタン操作などで発話タイミングを明示制御
- 騒音環境: 自動 VAD が不安定なため、クライアント VAD や手動制御を利用
- クライアント側 VAD: 発話時のみ音声を送ってネットワーク負荷を削減
- カスタム UX: ジェスチャー起動、時間区切り音声など
activity シグナルがモデルに伝える内容:
ActivityStart: 「ユーザーが話し始めた。音声を蓄積して処理を開始」ActivityEnd: 「ユーザーが話し終えた。蓄積音声を処理して応答生成」
VAD 無効時にこれらが無いと、モデルは開始/終了境界を判断できないため、 ターン境界を明示する必要があります。
activity シグナル送信:
from google.genai import types
# 手動 activity signaling(例: push-to-talk)
live_request_queue.send_activity_start() # 発話開始
# ボタン押下中のみ音声チャンクを送信
while user_is_holding_button:
audio_blob = types.Blob(mime_type="audio/pcm;rate=16000", data=audio_chunk)
live_request_queue.send_realtime(audio_blob)
live_request_queue.send_activity_end() # 発話終了
デフォルト(自動 VAD): activity シグナルを送らない場合、
Live API の内蔵 VAD が send_realtime() の音声から発話境界を自動検出します。
多くのアプリではこの方式が推奨です。
Learn More
自動 VAD と手動 activity シグナルの比較、VAD 無効化の判断基準、 ベストプラクティスは Part 5: Voice Activity Detection を参照。
Control Signals¶
close シグナルはストリーミングセッションの正常終了を表します。
モデル接続をきれいに閉じ、Bidi-stream を終了します。
ADK Gemini Live API Toolkit ではアプリ側で close を明示送信する責任があります。
BIDI モードでの手動クローズ: StreamingMode.BIDI では、
セッション終了時やエラー時に close() を呼び出すべきです。
これによりセッションリソースを最小化できます。
SSE モードでの自動クローズ: レガシー StreamingMode.SSE(Bidi ではない)では、
モデルの turn_complete=True 受信時に ADK がキューに close() を自動実行します
(base_llm_flow.py:1150)。
詳細比較は Part 4: Understanding RunConfig を参照してください。
try:
logger.debug("Starting asyncio.gather for upstream and downstream tasks")
await asyncio.gather(
upstream_task(),
downstream_task()
)
logger.debug("asyncio.gather completed normally")
except WebSocketDisconnect:
logger.debug("Client disconnected normally")
except Exception as e:
logger.error(f"Unexpected error in streaming tasks: {e}", exc_info=True)
finally:
# 例外が発生しても必ずキューを閉じる
logger.debug("Closing live_request_queue")
live_request_queue.close()
close() を呼ばないとどうなるか
ADK はローカルリソースを自動解放しますが、
BIDI モードで close() を呼ばないと Live API に正常終了シグナルが送られません。
結果として一定時間後に突然の切断として扱われ、
クラウド側に "ゾンビ" セッションが残る可能性があります。
これらはタイムアウトまでクォータを消費するため、
同時セッション処理能力が下がる場合があります。
Learn More
ストリーミング中の包括的なエラー処理(break/continue、エラー種別対応)は
Part 3: Error Events を参照してください。
並行処理とスレッド安全性¶
LiveRequestQueue の並行動作を理解することは、信頼性の高いストリーミングアプリ構築に重要です。
このキューは asyncio.Queue ベースのため、
同一イベントループスレッド内の同時アクセスは安全ですが、
異なるスレッドから呼ぶ場合は特別な配慮が必要です。
このセクションでは設計上の意図、
追加対策なしで安全なケース、
loop.call_soon_threadsafe() が必要なケースを説明します。
非同期キュー管理¶
LiveRequestQueue は内部消費が非同期でも、
send_content() と send_realtime() は同期メソッドです。
これは await 不要の non-blocking 操作
asyncio.Queue.put_nowait() を使うためです。
なぜ同期 send メソッドか? 利便性と単純さのためです。
非同期コード内のどこからでも await なしで呼べます。
async def upstream_task() -> None:
"""Receives messages from WebSocket and sends to LiveRequestQueue."""
while True:
message = await websocket.receive()
if "bytes" in message:
audio_data = message["bytes"]
audio_blob = types.Blob(
mime_type="audio/pcm;rate=16000",
data=audio_data
)
live_request_queue.send_realtime(audio_blob)
elif "text" in message:
text_data = message["text"]
json_message = json.loads(text_data)
if json_message.get("type") == "text":
content = types.Content(parts=[types.Part(text=json_message["text"])])
live_request_queue.send_content(content)
このパターンは、非同期 I/O と同期 CPU 処理を自然に組み合わせられます。 send メソッドは即時復帰するため、アプリの応答性を保てます。
ベストプラクティス: 非同期コンテキストでキューを作成する¶
LiveRequestQueue は必ず非同期コンテキスト(async 関数/コルーチン)で作成し、
正しいイベントループを使うようにしてください。
# ✅ 推奨 - 非同期コンテキストで作成
async def main():
queue = LiveRequestQueue() # 既存イベントループを使用
# ストリーミング処理に使う正しいイベントループを確実に使える
# ❌ 非推奨 - イベントループを自動作成
queue = LiveRequestQueue() # 動作はするが ADK が新規ループを作成
# 安全機構により動作するが、複雑なアプリやマルチスレッドで
# ループ協調の問題を引き起こす可能性がある
なぜ重要か: LiveRequestQueue はインスタンス化時にイベントループを必要とします。
ADK はループ未存在時に自動作成する安全機構を持ちますが、
マルチスレッドやカスタムイベントループ構成では予期しない挙動につながる可能性があります。
メッセージ順序保証¶
LiveRequestQueue は予測可能なメッセージ配送動作を提供します。
| 保証 | 説明 | 影響 |
|---|---|---|
| FIFO 順序 | 送信順に処理(asyncio.Queue で保証) |
会話コンテキストと一貫性を維持 |
| 非結合配送 | 各メッセージを独立して配送 | 自動バッチ化なし。send 1 回につき request 1 件 |
| デフォルト無制限 | ブロックせず無制限に受け付け | 利点: クライアントコード簡素化 リスク: 処理より送信が速いとメモリ増加 対策: 本番でキュー深さ監視 |
Production Tip: 高スループットの音声/動画ストリーミングでは
live_request_queue._queue.qsize()を監視してバックプレッシャーを検知してください。 キュー深さが継続的に増えるなら送信レートを下げるかバッチングを実装してください。 なお_queueは内部属性であり、将来バージョンで変更される可能性があります。
まとめ¶
このパートでは、LiveRequestQueue が非同期イベントループ内で
ADK ストリーミングエージェントへメッセージを送る統合インターフェースであることを学びました。
LiveRequest モデルを理解し、
send_content() によるテキスト、
send_realtime() による音声/動画 blob、
手動ターン制御の activity シグナル、
close() による正常終了制御を扱いました。
また、非同期キュー管理、非同期コンテキストでの生成、リソース解放、
メッセージ順序保証のベストプラクティスも学習しました。
これで、ADK Gemini Live API Toolkit アプリにおけるアップストリーム通信チャネルとして
LiveRequestQueue を使う方法を理解できました。
次は、これらのメッセージに対してエージェントが生成するイベントを処理する
ダウンストリームフローを学びます。
← Previous: Part 1: Introduction to ADK Gemini Live API Toolkit | Next: Part 3: Event Handling with run_live() →