コンテンツにスキップ

Runtime Event Loop

ADKでサポートPython v0.1.0Typescript v0.2.0Go v0.1.0Java v0.1.0Kotlin v0.1.0

ADK Runtime は、ユーザー対話中にエージェントアプリを動かす基盤エンジンです。 定義したエージェント、ツール、コールバックを連携させ、 入力に応じた情報フロー・状態変更・LLM/ストレージ連携を管理します。

Runtime はエージェントアプリの エンジン と考えるとわかりやすいです。 開発者は部品(Agent/Tool)を定義し、 Runtime がそれらを接続して実行します。

コアアイデア: Event Loop

ADK Runtime の中心は Event Loop です。 Runner と実行ロジック(Agent、LLM 呼び出し、Callback、Tool)の 協調を回す仕組みです。

intro_components.png

シンプルに言うと:

  1. Runner がユーザー問い合わせを受け取りメイン Agent を開始
  2. Agent が応答/ツール要求/状態変更などを Event として yield
  3. Runner が受け取って処理(状態保存など)し上流へ転送
  4. AgentRunner 処理後にのみ再開
  5. 現在の問い合わせでイベントが尽きるまで繰り返し

このイベント駆動ループが ADK 実行モデルの基本です。

Event Loop の内部

Note

メソッド名や引数名は SDK 言語ごとに異なります。

Runner の役割(オーケストレータ)

Runner は 1 回の invocation の中央調整役です。

  1. 開始: ユーザークエリを受け、SessionService に履歴追加
  2. Kick-off: メインエージェント実行開始
  3. 受信/処理: yield された Event を即時処理
  4. state_delta / artifact_delta などを Services でコミット
  5. 上流転送: UI/呼び出し元へ転送
  6. 反復: エージェントを再開し次イベント生成へ

実行ロジックの役割(Agent/Tool/Callback)

実際の判断・計算は実行ロジック側が担います。

  1. 現在の InvocationContext で実行
  2. 伝える内容を Event にして yield
  3. yield 直後に一時停止
  4. Runner 処理後に再開
  5. 再開時は前イベントでコミット済み状態を参照可能

この yield / pause / resume サイクルが Runtime の中核です。

Runtime の主要コンポーネント

  1. Runner

    • 役割: 1 ユーザークエリ実行の入口 (run_async)
    • 機能: Event Loop 管理、イベント処理/コミット、上流転送
  2. 実行ロジック

    • AgentBaseAgent, LlmAgent など)
    • ToolsBaseTool, FunctionTool, AgentTool など)
    • Callbacks(フック関数)
    • 機能: 判断/計算/外部連携を行い Eventyield
  3. Event

    • 役割: Runner と実行ロジック間のメッセージ
    • 機能: コンテンツ + 副作用意図(actions)を運ぶ
  4. Services

    • SessionService: Session 保存/読込、state 適用、履歴管理
    • ArtifactService: バイナリアーティファクト管理
    • MemoryService: (任意)長期意味記憶
    • 役割: Runner がイベント処理時に使うバックエンド層です。event.actions に含まれる state_deltaartifact_delta を反映し、会話やアーティファクトの永続化を担います。
  5. Session

    • 役割: 1 会話の状態/履歴コンテナ
  6. Invocation

    • 役割: 1 ユーザークエリに対する処理単位
    • 機能: 複数の agent run / LLM call / tool 実行を invocation_id で束ねる
    • 補足: temp: プレフィックスの状態は 1 回の invocation にだけスコープされ、次のターンには引き継がれません。

シンプルな invocation フロー

intro_components.png

典型例(ツール呼び出しを含む場合):

  1. ユーザー入力受信
  2. Runner が Session を読み込み、ユーザーイベント記録
  3. root agent 実行
  4. LLM がツール呼び出しを判断
  5. FunctionCall イベント yield
  6. Agent 一時停止
  7. Runner 処理/転送
  8. Agent 再開
  9. ツール実行
  10. ツール結果取得
  11. FunctionResponse イベント yield
  12. Agent 一時停止
  13. Runner が状態/アーティファクト差分をコミット
  14. Agent 再開
  15. 最終応答生成
  16. テキストイベント yield
  17. Agent 一時停止
  18. Runner 転送
  19. Agent 終了
  20. Runner ループ完了

重要な Runtime 振る舞い

状態更新コミットのタイミング

状態変更は、state_delta を含むイベントが yield され、 Runner が処理した後に永続化が保証されます。

したがって yield 後に再開したコードは、 直前イベントのコミット済み状態を前提にできます。

Session State の "Dirty Read"

同一 invocation 内では、コミット前のローカル状態が読める場合があります。

  • 利点: 同一ステップ内コンポーネント連携がしやすい
  • 注意: コミット前に失敗すると変更が失われる可能性

重要な状態遷移は、必ずイベント経由でコミットされる設計にしてください。

Streaming vs Non-Streaming (partial=True)

  • ストリーミング時は partial=True を即時転送
  • 多くの場合、partial イベントの actions はコミットしない
  • 最終イベント(partial=False など)で状態差分をコミット
  • 非ストリーミングは単一イベントで処理

ストリーミングでは、UI への逐次表示を優先しつつ、状態の永続化は最終イベントで一度だけ行うため、 partial=True のイベントは通常そのまま上流へ流し、actions の反映は抑制されます。 非ストリーミングでは、1 つのイベントをまとめて処理するため、状態更新がより単純になります。

これにより、UI の逐次表示と状態一貫性を両立できます。

Async が基本 (run_async)

ADK Runtime は非同期実行を前提に設計されています。 LLM 応答待ちやツール実行を効率よく扱うためです。

  • 主要入口: Runner.run_async
  • 同期 run は便宜用ラッパーで、内部で run_async を使うことが多い
  • Python は asyncio
  • TypeScript は Promise / AsyncGenerator
  • Java は RxJava

同期コールバック/ツール利用時の注意

  • ブロッキング I/O はイベントループ停滞の原因
  • Python は asyncio.to_thread などで回避可能
  • TypeScript は Promise ベース I/O を推奨
  • CPU-bound 同期処理はスレッドを占有

これらを理解すると、状態整合性・ストリーミング更新・非同期実行に関する 設計/デバッグが容易になります。

言語別の補足

Runner.run_async の入口

async for でイベントを逐次処理します。

AsyncGeneratorfor await で消費します。

チャネルまたはコールバックでイベントを受け取ります。

RxJava のストリームとして扱います。

状態コミットのタイミング

yield 後に Runner が state_delta を反映します。

イベント受信後に状態を反映し、UI 更新を行います。

イベントの順序を保ちながら永続化します。

同期処理は避け、非同期の完了後に状態を確定します。

Streaming と Non-Streaming

partial=True は逐次表示、partial=False は確定更新です。

途中チャンクと最終チャンクを分けて表示します。

部分イベントは UI へ、最終イベントは保存へ回します。

ストリーム中のイベントは表示用、完了イベントは永続化用に扱います。

Async を前提にする理由

asyncio で LLM / I/O 待ちを止めません。

Promise ベースでイベントループを塞ぎません。

Goroutine とチャネルで並行性を保ちます。

RxJava でバックプレッシャーを意識して扱います。

Kotlin の実行ループ例

英語原文に追加された Kotlin の実行ループ例を以下にも掲載します。

概念的なループ

/**
 * Simplified view of Runner's main loop logic in Kotlin
 */
fun runAsync(
    userId: String,
    sessionId: String,
    newMessage: Content,
    runner: InMemoryRunner,
    sessionService: InMemorySessionService,
): Flow<Event> {
    // 1. Append newMessage to session event history (via SessionService)
    // 2. Kick off event loop by calling the agent
    // 3. Process generated events, commit changes, and yield upstream
    return runner
        .runAsync(
            userId = userId,
            sessionId = sessionId,
            newMessage = newMessage,
        ).onEach { event ->
            // Process the event and commit changes to services (done internally by Runner)
            // sessionService.appendEvent(...)
        }
}

実行ロジック

/**
 * Simplified view of logic inside Agent.runAsync, callbacks, or tools in Kotlin
 */
suspend fun executionLogic(ctx: InvocationContext) {
    // ... previous code runs based on current state ...

    // 1. Determine a change or output is needed, construct the event
    val updateData = mapOf("field_1" to "value_2")
    val eventWithStateChange =
        Event(
            author = "my_agent",
            actions = EventActions(stateDelta = updateData.toMutableMap()),
            content = Content.fromText(Role.MODEL, "State updated."),
        )

    // 2. Yield the event to the Runner for processing & commit
    // In Kotlin, this is done by emitting to the Flow
    // emit(eventWithStateChange)

    // <<<<<<<<<<<< EXECUTION PAUSES HERE >>>>>>>>>>>>
    // (Implicitly, when the Flow consumer collects the event and processes it)

    // <<<<<<<<<<<< RUNNER PROCESSES & COMMITS THE EVENT >>>>>>>>>>>>

    // 3. Resume execution ONLY after Runner is done processing.
    // Now, the state committed by the Runner is reliably reflected.
    val val1 = ctx.session.state["field_1"]
    println("Resumed execution. Value of field_1 is now: $val1")
}

状態更新のタイミング

/**
 * Conceptual view of state update timing in Kotlin
 */
suspend fun stateUpdateTiming(ctx: InvocationContext) {
    // 1. Modify state
    ctx.session.state["status"] = "processing"
    val event1 =
        Event(
            author = "my_agent",
            actions = EventActions(stateDelta = mutableMapOf("status" to "processing")),
        )

    // 2. Yield event with the delta (emit to flow)
    // emit(event1)

    // --- PAUSE --- Runner processes event1, SessionService commits 'status' = 'processing' ---

    // 3. Resume execution
    // Now it's safe to rely on the committed state
    val currentStatus = ctx.session.state["status"] // Guaranteed to be 'processing'
    println("Status after resuming: $currentStatus")
}

Dirty Read の例

/**
 * Conceptual view of dirty reads in Kotlin
 */
fun dirtyRead(ctx: InvocationContext) {
    // Code in a callback
    ctx.session.state["field_1"] = "value_1"
    // State is locally set to 'value_1', but not yet committed by Runner

    // ... agent runs ...

    // Code in a tool called later *within the same invocation*
    // Readable (dirty read), but 'value_1' isn't guaranteed persistent yet.
    val val1 = ctx.session.state["field_1"] // 'val' will likely be 'value_1' here
    println("Dirty read value in tool: $val1")

    // Assume the event carrying the state_delta={'field_1': 'value_1'}
    // is yielded *after* this tool runs and is processed by the Runner.
}