コンテンツにスキップ

ADK 向け BigQuery Agent Analytics プラグイン

ADKでサポートPython v1.21.0

バージョン要件

auto-schema-upgrade、tool provenance 追跡、HITL イベントトレーシングを含む このドキュメントの機能を完全に利用するには、ADK Python バージョン 1.26.0 以上を使用してください。

BigQuery Agent Analytics Plugin は、詳細なエージェント動作分析のための堅牢な ソリューションを提供し、Agent Development Kit (ADK) を大きく拡張します。ADK Plugin アーキテクチャと BigQuery Storage Write API を使って重要な運用 イベントを Google BigQuery テーブルへ直接キャプチャして記録し、デバッグ、 リアルタイム監視、包括的なオフライン性能評価のための高度な機能を提供します。

バージョン 1.26.0 では、Auto Schema Upgrade(既存テーブルへ新しい列を 安全に追加)、Tool Provenance 追跡(LOCAL、MCP、SUB_AGENT、A2A、 TRANSFER_AGENT、TRANSFER_A2A)、Human-in-the-Loop 連携のための HITL Event Tracing が追加されました。バージョン 1.27.0 では、Automatic View Creation(フラットでクエリしやすいイベントビューの生成)が追加されました。

BigQuery Storage Write API

この機能は有料サービスである BigQuery Storage Write API を使用します。 コストについては BigQuery ドキュメント を参照してください。

ユースケース

  • エージェントワークフローのデバッグと分析: 幅広い プラグインライフサイクル イベント(LLM 呼び出し、ツール使用)と エージェントが生成したイベント (ユーザー入力、モデル応答)を、明確に定義されたスキーマへキャプチャします。
  • 高ボリューム分析とデバッグ: ロギング処理は Storage Write API を使って 非同期に実行されるため、高スループットと低レイテンシを両立できます。
  • マルチモーダル分析: テキスト、画像、その他の modality を記録して分析します。 大きなファイルは GCS にオフロードされ、Object Table 経由で BigQuery ML から 利用できます。
  • 分散トレーシング: trace_idspan_id を使った OpenTelemetry スタイルの トレーシングを標準サポートし、エージェント実行フローを可視化できます。
  • Tool Provenance: 各ツール呼び出しの出自(ローカル関数、MCP サーバー、 サブエージェント、A2A リモートエージェント、transfer agent)を追跡します。
  • Human-in-the-Loop (HITL) トレーシング: 資格情報要求、確認プロンプト、 ユーザー入力要求のための専用イベントタイプを提供します。
  • クエリ可能なイベントビュー: フラットなイベントタイプ別 BigQuery ビュー (例: v_llm_request, v_tool_completed)を自動作成し、JSON ペイロード データを展開して下流分析を簡素化します。

キャプチャされるイベント概要

次の表は、プラグインが記録するすべてのイベントタイプを示します。詳しい ペイロード例は イベントタイプとペイロード を参照してください。 View 列は、create_views が有効な場合(デフォルト) に任意で作成される BigQuery ビューを示します。

イベントタイプ キャプチャされるタイミング 主なペイロードフィールド ビュー
USER_MESSAGE_RECEIVED ユーザーメッセージが invocation に入るとき テキスト要約 / content parts v_user_message_received
INVOCATION_STARTING invocation が開始するとき (共通列のみ) v_invocation_starting
INVOCATION_COMPLETED invocation が終了するとき (共通列のみ) v_invocation_completed
AGENT_STARTING エージェント実行が開始するとき instruction 要約 v_agent_starting
AGENT_COMPLETED エージェント実行が終了するとき レイテンシ v_agent_completed
LLM_REQUEST モデルリクエストが送信されるとき モデル、プロンプト、設定、ツール v_llm_request
LLM_RESPONSE モデルレスポンスを受信するとき 応答、使用トークン、キャッシュメタデータ、レイテンシ、TTFT v_llm_response
LLM_ERROR モデル呼び出しが失敗するとき エラーメッセージ、レイテンシ v_llm_error
TOOL_STARTING ツール実行が開始するとき ツール名、引数、出自 v_tool_starting
TOOL_COMPLETED ツールが成功するとき ツール名、結果、出自、レイテンシ v_tool_completed
TOOL_ERROR ツールが失敗するとき ツール名、引数、出自、エラー、レイテンシ v_tool_error
STATE_DELTA セッション状態が変化するとき 状態 delta v_state_delta
HITL_CREDENTIAL_REQUEST 資格情報要求が発行されるとき synthetic ツール名、引数 v_hitl_credential_request
HITL_CONFIRMATION_REQUEST 確認要求が発行されるとき synthetic ツール名、引数 v_hitl_confirmation_request
HITL_INPUT_REQUEST ユーザー入力要求が発行されるとき synthetic ツール名、引数 v_hitl_input_request
HITL_CREDENTIAL_REQUEST_COMPLETED ユーザーが資格情報応答を提供するとき synthetic ツール名、結果 (ベーステーブルのみ)
HITL_CONFIRMATION_REQUEST_COMPLETED ユーザーが確認応答を提供するとき synthetic ツール名、結果 (ベーステーブルのみ)
HITL_INPUT_REQUEST_COMPLETED ユーザーが入力応答を提供するとき synthetic ツール名、結果 (ベーステーブルのみ)
A2A_INTERACTION リモート A2A 呼び出しが完了するとき 応答、task ID、context ID、request/response v_a2a_interaction
AGENT_RESPONSE 最終エージェントレスポンスが yield されるとき レスポンス(content)、source event ID/author/branch(attributes) v_agent_response

クイックスタート

プラグインをエージェントの App オブジェクトに追加します。前提条件については 前提条件 を参照してください。

agent.py
import os
from google.adk.agents import Agent
from google.adk.apps import App
from google.adk.models.google_llm import Gemini
from google.adk.plugins.bigquery_agent_analytics_plugin import BigQueryAgentAnalyticsPlugin

os.environ['GOOGLE_CLOUD_PROJECT'] = 'your-gcp-project-id'
os.environ['GOOGLE_CLOUD_LOCATION'] = 'us-central1'
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'True'

plugin = BigQueryAgentAnalyticsPlugin(
    project_id="your-gcp-project-id",
    dataset_id="your-big-query-dataset-id",
)

root_agent = Agent(
    model=Gemini(model="gemini-flash-latest"),
    name='my_agent',
    instruction="You are a helpful assistant.",
)

app = App(
    name="my_agent",
    root_agent=root_agent,
    plugins=[plugin],
)

エージェントの実行とテスト

チャットインターフェースでエージェントを実行し、"tell me what you can do""List datasets in my cloud project <your-gcp-project-id>" のようなリクエストを いくつか送ってプラグインをテストします。これらの操作により、Google Cloud プロジェクトの BigQuery インスタンスに記録されるイベントが作成されます。 イベント処理後、BigQuery Console で 次のクエリを使ってデータを確認できます。

SELECT timestamp, event_type, content
FROM `your-gcp-project-id.your-big-query-dataset-id.agent_events`
ORDER BY timestamp DESC
LIMIT 20;
GCS オフロード、OpenTelemetry、BigQuery ツールを含む完全な例
my_bq_agent/agent.py
# my_bq_agent/agent.py
import os
import google.auth
from google.adk.apps import App
from google.adk.plugins.bigquery_agent_analytics_plugin import BigQueryAgentAnalyticsPlugin, BigQueryLoggerConfig
from google.adk.agents import Agent
from google.adk.models.google_llm import Gemini
from google.adk.tools.bigquery import BigQueryToolset, BigQueryCredentialsConfig


# --- OpenTelemetry TracerProvider Setup (Optional) ---
# ADK includes OpenTelemetry as a core dependency.
# Configuring a TracerProvider enables full distributed tracing
# (populates trace_id, span_id with standard OTel identifiers).
# If no TracerProvider is configured, the plugin falls back to internal
# UUIDs for span correlation while still preserving the parent-child hierarchy.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
trace.set_tracer_provider(TracerProvider())

# --- Configuration ---
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
DATASET_ID = os.environ.get("BIG_QUERY_DATASET_ID", "your-big-query-dataset-id")
# GOOGLE_CLOUD_LOCATION must be a valid Agent Platform region (e.g., "us-central1").
# BQ_LOCATION is the BigQuery dataset location, which can be a multi-region
# like "US" or "EU", or a single region like "us-central1".
VERTEX_LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
BQ_LOCATION = os.environ.get("BQ_LOCATION", "US")
GCS_BUCKET = os.environ.get("GCS_BUCKET_NAME", "your-gcs-bucket-name") # Optional

if PROJECT_ID == "your-gcp-project-id":
    raise ValueError("Please set GOOGLE_CLOUD_PROJECT or update the code.")

# --- CRITICAL: Set environment variables BEFORE Gemini instantiation ---
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID
os.environ['GOOGLE_CLOUD_LOCATION'] = VERTEX_LOCATION
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'True'

# --- Initialize the Plugin with Config ---
bq_config = BigQueryLoggerConfig(
    enabled=True,
    gcs_bucket_name=GCS_BUCKET, # Enable GCS offloading for multimodal content
    log_multi_modal_content=True,
    max_content_length=500 * 1024, # 500 KB limit for inline text
    batch_size=1, # Default is 1 for low latency, increase for high throughput
    shutdown_timeout=10.0
)

bq_logging_plugin = BigQueryAgentAnalyticsPlugin(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    table_id="agent_events", # default table name is agent_events
    config=bq_config,
    location=BQ_LOCATION
)

# --- Initialize Tools and Model ---
credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bigquery_toolset = BigQueryToolset(
    credentials_config=BigQueryCredentialsConfig(credentials=credentials)
)

llm = Gemini(model="gemini-flash-latest")

root_agent = Agent(
    model=llm,
    name='my_bq_agent',
    instruction="You are a helpful assistant with access to BigQuery tools.",
    tools=[bigquery_toolset]
)

# --- Create the App ---
app = App(
    name="my_bq_agent",
    root_agent=root_agent,
    plugins=[bq_logging_plugin],
)

Agent Runtime にデプロイしますか?

Agent Runtime へのデプロイ を参照してください。

前提条件

  • BigQuery API が有効な Google Cloud プロジェクト
  • BigQuery データセット: プラグインを使用する前に、ロギングテーブルを保存する データセットを作成します。テーブルが存在しない場合、プラグインは必要なイベント テーブルをデータセット内に自動作成します。
  • Google Cloud Storage バケット(任意): マルチモーダルコンテンツ(画像、 音声など)を記録する場合は、大きなファイルをオフロードするために GCS バケットを 作成しておくことを推奨します。
  • 認証:
    • ローカル: gcloud auth application-default login を実行します。
    • クラウド: サービスアカウントに必要な権限があることを確認します。
注: Gemini モデルセレクタ gemini-flash-latest

ADK ドキュメントの多くのコード例では、gemini-flash-latest を使って 利用可能な最新の Gemini Flash バージョンを選択します。ただし、us-central1 のようなリージョン エンドポイントから Gemini にアクセスする場合、この選択文字列が動作しない ことがあります。その場合は、Gemini モデル ページまたは Google Cloud の Gemini モデル 一覧から具体的なモデルバージョン文字列を使用してください。

IAM 権限

エージェントが正しく動作するには、エージェントを実行する principal(サービス アカウント、ユーザーアカウントなど)に次の Google Cloud ロールが必要です。

  • roles/bigquery.jobUser: プロジェクトレベルで BigQuery クエリを実行するため
  • roles/bigquery.dataEditor: テーブルレベルでログ/イベントデータを書き込むため
  • GCS オフロードを使う場合: 対象バケットの roles/storage.objectCreatorroles/storage.objectViewer

構成オプション

コンストラクタパラメータ

BigQueryAgentAnalyticsPlugin コンストラクタは次のパラメータを受け取ります。 また、以下の BigQueryLoggerConfig に直接転送される **kwargs も受け取れます。

パラメータ デフォルト 使用する場面
project_id str (必須) Google Cloud プロジェクトを選択します
dataset_id str (必須) BigQuery データセットを選択します
table_id Optional[str] None カスタムテーブル名を使います(config の table_id を上書き)
config Optional[BigQueryLoggerConfig] None 詳細調整用の config オブジェクトを渡します
location str "US" BigQuery データセットのロケーションに合わせます(例: "US", "EU", "us-central1"
credentials Optional[google.auth.credentials.Credentials] None ADC の代わりに明示的なサービスアカウント、impersonated、cross-project 資格情報を使います
plugin = BigQueryAgentAnalyticsPlugin(
    project_id="my-project",
    dataset_id="my_dataset",
    batch_size=10,           # forwarded to BigQueryLoggerConfig
    shutdown_timeout=5.0,    # forwarded to BigQueryLoggerConfig
)

BigQueryLoggerConfig オプション

以下のオプションはすべて任意で、適切なデフォルト値があります。 BigQueryLoggerConfig に渡すか、プラグインコンストラクタの **kwargs として 渡してください。

オプション デフォルト 使用する場面
enabled bool True ロギングを一時的に無効にします
table_id str "agent_events" カスタムテーブル名を使います(コンストラクタ値が優先)
clustering_fields List[str] ["event_type", "agent", "user_id"] テーブル作成時のクラスタリングをカスタマイズします
gcs_bucket_name Optional[str] None 大きなテキストとマルチモーダルコンテンツを GCS にオフロードします
connection_id Optional[str] None BigQuery ObjectRef / object table を使います(例: us.my-connection
max_content_length int 500 * 1024 オフロード/切り詰め前のインラインペイロードサイズを制御します
batch_size int 1 書き込みスループットとレイテンシを調整します
batch_flush_interval float 1.0 部分 batch を定期的に flush します(秒)
shutdown_timeout float 10.0 終了時の最終 flush を待ちます(秒)
event_allowlist Optional[List[str]] None 選択した イベントタイプ だけを記録します
event_denylist Optional[List[str]] None 機密性が高い、またはノイズの多い イベントタイプ をスキップします
content_formatter Optional[Callable] None イベントごとのカスタムマスキング/フォーマットを適用します((content, event_type) を受け取る)
log_multi_modal_content bool True GCS 参照を含む content_parts 詳細をキャプチャします
queue_max_size int 10000 メモリ内イベントキューの上限を設定します
retry_config RetryConfig RetryConfig() リトライ動作を調整します(max_retries=3, initial_delay=1.0, multiplier=2.0, max_delay=10.0
log_session_metadata bool True attributes にセッション情報(session_id, app_name, user_id, state)を追加します。temp: または secret: 接頭辞のキーは マスク されます。
custom_tags Dict[str, Any] {} すべてのイベントの attributes に静的タグ(例: {"env": "prod"})を追加します
auto_schema_upgrade bool True 既存テーブルへ新しい列を自動追加します(追加変更のみ)
create_views bool True イベントタイプ別 BigQuery ビューを作成します(1.27.0+)
view_prefix str "v" 複数のプラグインが同じデータセットを共有するとき、ビュー名の衝突を避けます(例: "v_staging"

次のコード例は、BigQuery Agent Analytics プラグインの構成を定義する方法を示します。

import json
import re

from google.adk.plugins.bigquery_agent_analytics_plugin import BigQueryLoggerConfig

def redact_dollar_amounts(event_content: Any, event_type: str) -> str:
    """
    Custom formatter to redact dollar amounts (e.g., $600, $12.50)
    and ensure JSON output if the input is a dict.

    Args:
        event_content: The raw content of the event.
        event_type: The event type string (e.g., "LLM_REQUEST", "LLM_RESPONSE").
    """
    text_content = ""
    if isinstance(event_content, dict):
        text_content = json.dumps(event_content)
    else:
        text_content = str(event_content)

    # Regex to find dollar amounts: $ followed by digits, optionally with commas or decimals.
    # Examples: $600, $1,200.50, $0.99
    redacted_content = re.sub(r'\$\d+(?:,\d{3})*(?:\.\d+)?', 'xxx', text_content)

    return redacted_content

config = BigQueryLoggerConfig(
    enabled=True,
    event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # Only log these events
    # event_denylist=["TOOL_STARTING"], # Skip these events
    shutdown_timeout=10.0, # Wait up to 10s for logs to flush on exit
    max_content_length=500, # Truncate content to 500 chars
    content_formatter=redact_dollar_amounts, # Redact the dollar amounts in the logging content
    queue_max_size=10000, # Max events to hold in memory
    auto_schema_upgrade=True, # Automatically add new columns to existing tables
    create_views=True, # Automatically create per-event-type views
    # retry_config=RetryConfig(max_retries=3), # Optional: Configure retries
)

plugin = BigQueryAgentAnalyticsPlugin(
    project_id="my-project",
    dataset_id="my_dataset",
    config=config,
)

スキーマと本番環境セットアップ

スキーマ参照

イベントテーブル(agent_events)は柔軟なスキーマを使用します。次の表は、例の 値を含む包括的なリファレンスです。

フィールド名 モード 説明
timestamp TIMESTAMP REQUIRED イベント作成時の UTC タイムスタンプです。主要な並び替えキーであり、日次パーティションキーです。精度はマイクロ秒です。 2026-02-03 20:52:17 UTC
event_type STRING NULLABLE 正規化されたイベントカテゴリです。標準値には LLM_REQUEST, LLM_RESPONSE, LLM_ERROR, TOOL_STARTING, TOOL_COMPLETED, TOOL_ERROR, AGENT_STARTING, AGENT_COMPLETED, STATE_DELTA, INVOCATION_STARTING, INVOCATION_COMPLETED, USER_MESSAGE_RECEIVED、HITL イベント(HITL イベント 参照)が含まれます。上位レベルのフィルタリングに使います。 LLM_REQUEST
agent STRING NULLABLE このイベントを担当するエージェント名です。エージェント初期化時、または root_agent_name コンテキストで定義されます。 my_bq_agent
session_id STRING NULLABLE 会話スレッド全体の永続識別子です。複数ターンやサブエージェント呼び出しを通じて一定です。 04275a01-1649-4a30-b6a7-5b443c69a7bc
invocation_id STRING NULLABLE 単一の実行ターンまたはリクエストサイクルの一意識別子です。多くの文脈では trace_id に対応します。 e-b55b2000-68c6-4e8b-b3b3-ffb454a92e40
user_id STRING NULLABLE セッションを開始したユーザー(人間またはシステム)の識別子です。User オブジェクトまたはメタデータから抽出されます。 test_user
trace_id STRING NULLABLE OpenTelemetry Trace ID(32 文字 hex)です。単一の分散リクエストライフサイクル内のすべての操作をつなぎます。 e-b55b2000-68c6-4e8b-b3b3-ffb454a92e40
span_id STRING NULLABLE OpenTelemetry Span ID(16 文字 hex)です。この特定の原子的操作を一意に識別します。 69867a836cd94798be2759d8e0d70215
parent_span_id STRING NULLABLE 直前の呼び出し元の Span ID です。親子実行ツリー(DAG)を再構築するために使います。 ef5843fe40764b4b8afec44e78044205
content JSON NULLABLE 主なイベントペイロードです。構造は event_type によって多態的です。 {"system_prompt": "You are...", "prompt": [{"role": "user", "content": "hello"}], "response": "Hi", "usage": {"total": 15}}
attributes JSON NULLABLE メタデータ/エンリッチメント(使用量統計、モデル情報、ツール出自、カスタムタグ)です。 {"model": "gemini-flash-latest", "usage_metadata": {"total_token_count": 15}, "session_metadata": {"session_id": "...", "app_name": "...", "user_id": "...", "state": {}}, "custom_tags": {"env": "prod"}}
latency_ms JSON NULLABLE 性能指標です。標準キーは total_ms(wall-clock duration)と time_to_first_token_ms(ストリーミングレイテンシ)です。 {"total_ms": 1250, "time_to_first_token_ms": 450}
status STRING NULLABLE 上位レベルの結果です。値は OK(成功)または ERROR(失敗)です。 OK
error_message STRING NULLABLE 人間が読める例外メッセージまたはスタックトレース断片です。statusERROR のときだけ設定されます。 Error 404: Dataset not found
is_truncated BOOLEAN NULLABLE content または attributes が BigQuery セルサイズ制限(デフォルト 10MB)を超えて一部削除された場合は true です。 false
content_parts RECORD REPEATED マルチモーダルセグメント(Text、Image、Blob)の配列です。大きなバイナリや GCS 参照など、content を単純な JSON としてシリアライズできない場合に使います。 [{"mime_type": "text/plain", "text": "hello"}]

テーブルが存在しない場合、プラグインが自動的に作成します。本番環境では、必要に 応じて次の DDL でテーブルを手動作成できます。

本番環境セットアップ用の手動 DDL
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events`
(
  timestamp TIMESTAMP NOT NULL OPTIONS(description="The UTC time at which the event was logged."),
  event_type STRING OPTIONS(description="Indicates the type of event being logged (e.g., 'LLM_REQUEST', 'TOOL_COMPLETED')."),
  agent STRING OPTIONS(description="The name of the ADK agent or author associated with the event."),
  session_id STRING OPTIONS(description="A unique identifier to group events within a single conversation or user session."),
  invocation_id STRING OPTIONS(description="A unique identifier for each individual agent execution or turn within a session."),
  user_id STRING OPTIONS(description="The identifier of the user associated with the current session."),
  trace_id STRING OPTIONS(description="OpenTelemetry trace ID for distributed tracing."),
  span_id STRING OPTIONS(description="OpenTelemetry span ID for this specific operation."),
  parent_span_id STRING OPTIONS(description="OpenTelemetry parent span ID to reconstruct hierarchy."),
  content JSON OPTIONS(description="The event-specific data (payload) stored as JSON."),
  content_parts ARRAY<STRUCT<
    mime_type STRING,
    uri STRING,
    object_ref STRUCT<
      uri STRING,
      version STRING,
      authorizer STRING,
      details JSON
    >,
    text STRING,
    part_index INT64,
    part_attributes STRING,
    storage_mode STRING
  >> OPTIONS(description="Detailed content parts for multi-modal data."),
  attributes JSON OPTIONS(description="Arbitrary key-value pairs for additional metadata (e.g., 'root_agent_name', 'model_version', 'usage_metadata', 'session_metadata', 'custom_tags')."),
  latency_ms JSON OPTIONS(description="Latency measurements (e.g., total_ms)."),
  status STRING OPTIONS(description="The outcome of the event, typically 'OK' or 'ERROR'."),
  error_message STRING OPTIONS(description="Populated if an error occurs."),
  is_truncated BOOLEAN OPTIONS(description="Flag indicates if content was truncated.")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;

自動作成されるビュー(1.27.0+)

create_views=True(1.27.0 以降のデフォルト)にすると、プラグインは各イベント タイプについて、共通 JSON 構造をフラットで型付きの列に展開するビューを自動作成 します。これにより SQL が大幅に簡素化され、複雑な JSON_VALUEJSON_QUERY 関数を明示的に書く必要がなくなります。

ビュー名は {view_prefix}_{event_type_lowercase} の規則に従います。たとえば デフォルト接頭辞 "v" では、LLM_REQUESTv_llm_request になります。同じ データセット内で複数のプラグインインスタンスが別テーブルへ書き込む場合は、 BigQueryLoggerConfigview_prefix に別の値を設定してビュー名の衝突を 防いでください。

# Two plugins in the same dataset with distinct view prefixes
plugin_prod = BigQueryAgentAnalyticsPlugin(
    project_id=PROJECT_ID, dataset_id=DATASET_ID,
    table_id="agent_events_prod",
    config=BigQueryLoggerConfig(view_prefix="v_prod"),
)
# Creates views: v_prod_llm_request, v_prod_tool_completed, ...

plugin_staging = BigQueryAgentAnalyticsPlugin(
    project_id=PROJECT_ID, dataset_id=DATASET_ID,
    table_id="agent_events_staging",
    config=BigQueryLoggerConfig(view_prefix="v_staging"),
)
# Creates views: v_staging_llm_request, v_staging_tool_completed, ...

スキーマアップグレード後にビューを更新する場合などは、public async メソッド await plugin.create_analytics_views() を手動で呼び出すこともできます。

すべてのビューには次の 共通列 が含まれます: timestamp, event_type, agent, session_id, invocation_id, user_id, trace_id, span_id, parent_span_id, status, error_message, is_truncated

次の表は、自動作成されるすべてのビューとイベント固有列を示します。

ビュー名 イベント固有列
v_user_message_received (共通列のみ)
v_llm_request model (STRING), request_content (JSON), llm_config (JSON), tools (JSON)
v_llm_response response (JSON), usage_prompt_tokens (INT64), usage_completion_tokens (INT64), usage_total_tokens (INT64), usage_cached_tokens (INT64), total_ms (INT64), ttft_ms (INT64), model_version (STRING), usage_metadata (JSON), cache_metadata (JSON), context_cache_hit_rate (FLOAT64)
v_llm_error total_ms (INT64)
v_tool_starting tool_name (STRING), tool_args (JSON), tool_origin (STRING)
v_tool_completed tool_name (STRING), tool_result (JSON), tool_origin (STRING), total_ms (INT64)
v_tool_error tool_name (STRING), tool_args (JSON), tool_origin (STRING), total_ms (INT64)
v_agent_starting agent_instruction (STRING)
v_agent_completed total_ms (INT64)
v_invocation_starting (共通列のみ)
v_invocation_completed (共通列のみ)
v_state_delta state_delta (JSON)
v_hitl_credential_request tool_name (STRING), tool_args (JSON)
v_hitl_confirmation_request tool_name (STRING), tool_args (JSON)
v_hitl_input_request tool_name (STRING), tool_args (JSON)
v_a2a_interaction response_content (JSON), a2a_task_id (STRING), a2a_context_id (STRING), a2a_request (JSON), a2a_response (JSON)
v_agent_response response_text (STRING), source_event_id (STRING), source_event_author (STRING), source_event_branch (STRING)

イベントタイプとペイロード

content 列には、event_type ごとの JSON オブジェクトが含まれます。 content_parts 列はコンテンツの構造化ビューを提供し、画像やオフロードされた データに特に便利です。

コンテンツの切り詰め

  • 可変コンテンツフィールドは max_content_lengthBigQueryLoggerConfig で 構成、デフォルト 500KB)で切り詰められます。
  • gcs_bucket_name が構成されている場合、大きなコンテンツは切り詰められず GCS にオフロードされ、参照が content_parts.object_ref に保存されます。

LLM インタラクション(プラグインライフサイクル)

これらのイベントは、LLM に送信される生のリクエストと LLM から受け取るレスポンスを 追跡します。

1. LLM_REQUEST

会話履歴とシステム指示を含む、モデルへ送信されたプロンプトをキャプチャします。

{
  "event_type": "LLM_REQUEST",
  "content": {
    "system_prompt": "You are a helpful assistant...",
    "prompt": [
      {
        "role": "user",
        "content": "hello how are you today"
      }
    ]
  },
  "attributes": {
    "root_agent_name": "my_bq_agent",
    "model": "gemini-flash-latest",
    "tools": ["list_dataset_ids", "execute_sql"],
    "llm_config": {
      "temperature": 0.5,
      "top_p": 0.9
    }
  }
}

2. LLM_RESPONSE

モデル出力とトークン使用量統計をキャプチャします。

{
  "event_type": "LLM_RESPONSE",
  "content": {
    "response": "text: 'Hello! I'm doing well...'",
    "usage": {
      "completion": 19,
      "prompt": 10129,
      "total": 10148
    }
  },
  "attributes": {
    "root_agent_name": "my_bq_agent",
    "model_version": "gemini-flash-latest",
    "usage_metadata": {
      "prompt_token_count": 10129,
      "candidates_token_count": 19,
      "total_token_count": 10148
    }
  },
  "latency_ms": {
    "time_to_first_token_ms": 2579,
    "total_ms": 2579
  }
}

3. LLM_ERROR

LLM 呼び出しが例外で失敗したときに記録されます。エラーメッセージがキャプチャ され、span が閉じられます。

{
  "event_type": "LLM_ERROR",
  "content": null,
  "attributes": {
    "root_agent_name": "my_bq_agent"
  },
  "error_message": "Error 429: Resource exhausted",
  "latency_ms": {
    "total_ms": 350
  }
}

ツール使用(プラグインライフサイクル)

これらのイベントは、エージェントによるツール実行を追跡します。各ツールイベント には、ツールの出自を分類する tool_origin フィールドが含まれます。

ツール出自 説明
LOCAL FunctionTool インスタンス(ローカル Python 関数)
MCP Model Context Protocol ツール(McpTool インスタンス)
SUB_AGENT AgentTool インスタンス(サブエージェント)
A2A リモート Agent2Agent インスタンス(RemoteA2aAgent
TRANSFER_AGENT TransferToAgentTool インスタンス(一般的なエージェント transfer)
TRANSFER_A2A RemoteA2aAgent へ transfer する TransferToAgentTool インスタンス(呼び出しレベルで分類)
UNKNOWN 未分類のツール

4. TOOL_STARTING

エージェントがツール実行を開始したときに記録されます。

{
  "event_type": "TOOL_STARTING",
  "content": {
    "tool": "list_dataset_ids",
    "args": {
      "project_id": "bigquery-public-data"
    },
    "tool_origin": "LOCAL"
  }
}

5. TOOL_COMPLETED

ツール実行が完了したときに記録されます。

{
  "event_type": "TOOL_COMPLETED",
  "content": {
    "tool": "list_dataset_ids",
    "result": [
      "austin_311",
      "austin_bikeshare"
    ],
    "tool_origin": "LOCAL"
  },
  "latency_ms": {
    "total_ms": 467
  }
}

6. TOOL_ERROR

ツール実行が例外で失敗したときに記録されます。ツール名、引数、ツール出自、 エラーメッセージをキャプチャします。

{
  "event_type": "TOOL_ERROR",
  "content": {
    "tool": "list_dataset_ids",
    "args": {
      "project_id": "nonexistent-project"
    },
    "tool_origin": "LOCAL"
  },
  "error_message": "Error 404: Dataset not found",
  "latency_ms": {
    "total_ms": 150
  }
}

状態管理

これらのイベントは、通常はツールによってトリガーされるエージェント状態の変更を 追跡します。

7. STATE_DELTA

エージェントの内部状態変更(例: ツールによって更新されたカスタムアプリケーション 状態)を追跡します。

組み込みマスキング

temp: または secret: 接頭辞の状態キーは、記録される state_delta 内で 自動的に [REDACTED] にマスクされます。詳しくは 組み込み マスキング を参照してください。

{
  "event_type": "STATE_DELTA",
  "attributes": {
    "state_delta": {
      "customer_tier": "enterprise",
      "last_query_dataset": "bigquery-public-data.samples"
    }
  }
}

エージェントライフサイクルと汎用イベント

イベントタイプ コンテンツ(JSON)構造
INVOCATION_STARTING {}
INVOCATION_COMPLETED {}
AGENT_STARTING "You are a helpful agent..."
AGENT_COMPLETED {}
USER_MESSAGE_RECEIVED {"text_summary": "Help me book a flight."}
AGENT_RESPONSE {"response": "Here are the flights..."}

AGENT_RESPONSE

エージェントがユーザーへの最終レスポンスを yield したときに記録されます。レスポンス テキストは content に保存され、source event metadata は attributes に保存されます。

{
  "event_type": "AGENT_RESPONSE",
  "content": {
    "response": "Here are the available flights..."
  },
  "attributes": {
    "source_event_id": "evt-abc123",
    "source_event_author": "flight_agent",
    "source_event_branch": "main"
  }
}

Human-in-the-Loop (HITL) イベント

プラグインは ADK の synthetic HITL ツール呼び出しを自動検出し、専用のイベント タイプを出力します。これらのイベントは通常の TOOL_STARTING / TOOL_COMPLETED イベントに 加えて 記録されます。

認識される HITL ツール名は次のとおりです。

  • adk_request_credential: ユーザー資格情報の要求(例: OAuth トークン)
  • adk_request_confirmation: 続行前のユーザー確認要求
  • adk_request_input: 自由形式のユーザー入力要求
イベントタイプ トリガー コンテンツ(JSON)構造
HITL_CREDENTIAL_REQUEST エージェントが adk_request_credential を呼び出す {"tool": "adk_request_credential", "args": {...}}
HITL_CREDENTIAL_REQUEST_COMPLETED ユーザーが資格情報応答を提供する {"tool": "adk_request_credential", "result": {...}}
HITL_CONFIRMATION_REQUEST エージェントが adk_request_confirmation を呼び出す {"tool": "adk_request_confirmation", "args": {...}}
HITL_CONFIRMATION_REQUEST_COMPLETED ユーザーが確認応答を提供する {"tool": "adk_request_confirmation", "result": {...}}
HITL_INPUT_REQUEST エージェントが adk_request_input を呼び出す {"tool": "adk_request_input", "args": {...}}
HITL_INPUT_REQUEST_COMPLETED ユーザーが入力応答を提供する {"tool": "adk_request_input", "result": {...}}

HITL リクエストイベントは on_event_callbackfunction_call パートから検出 されます。HITL 完了イベントは on_event_callbackon_user_message_callback の両方にある function_response パートから検出されます。

HITL イベント用ビュー

自動作成ビューは、3 つの リクエスト イベントタイプ (v_hitl_credential_request, v_hitl_confirmation_request, v_hitl_input_request)に対してのみ存在します。3 つの *_COMPLETED イベントタイプはベーステーブルには記録されますが、専用ビューはありません。 agent_events テーブルで WHERE event_type LIKE 'HITL_%_COMPLETED' を使って 直接照会してください。

A2A インタラクションイベント

エージェントが Agent2Agent (A2A) プロトコルでリモートエージェントと通信すると、 プラグインはリクエストとレスポンスの詳細をキャプチャする A2A_INTERACTION イベントを記録します。

A2A_INTERACTION

A2A リモートエージェント呼び出しが完了したときに記録されます。

{
  "event_type": "A2A_INTERACTION",
  "content": {
    "response_content": "The remote agent's response...",
    "a2a_task_id": "task-abc123",
    "a2a_context_id": "ctx-def456",
    "a2a_request": { ... },
    "a2a_response": { ... }
  }
}

ストレージ動作: GCS オフロード

BigQueryLoggerConfiggcs_bucket_name が構成されている場合、プラグインは 大きなテキストやマルチモーダルコンテンツ(画像、音声など)を Google Cloud Storage へ自動的にオフロードします。content 列には要約または placeholder が 入り、content_parts には GCS URI を指す object_ref が保存されます。 構成オプションconnection_idmax_content_length も参照してください。

オフロードされたテキスト例

{
  "event_type": "LLM_REQUEST",
  "content_parts": [
    {
      "part_index": 1,
      "mime_type": "text/plain",
      "storage_mode": "GCS_REFERENCE",
      "text": "AAAA... [OFFLOADED]",
      "object_ref": {
        "uri": "gs://sample-bucket-name/2025-12-10/e-f9545d6d/ae5235e6_p1.txt",
        "authorizer": "us.bqml_connection",
        "details": {"gcs_metadata": {"content_type": "text/plain"}}
      }
    }
  ]
}

オフロードされた画像例

{
  "event_type": "LLM_REQUEST",
  "content_parts": [
    {
      "part_index": 2,
      "mime_type": "image/png",
      "storage_mode": "GCS_REFERENCE",
      "text": "[MEDIA OFFLOADED]",
      "object_ref": {
        "uri": "gs://sample-bucket-name/2025-12-10/e-f9545d6d/ae5235e6_p2.png",
        "authorizer": "us.bqml_connection",
        "details": {"gcs_metadata": {"content_type": "image/png"}}
      }
    }
  ]
}

オフロードされたコンテンツの照会(署名付き URL の取得)

SELECT
  timestamp,
  event_type,
  part.mime_type,
  part.storage_mode,
  part.object_ref.uri AS gcs_uri,
  -- Generate a signed URL to read the content directly (requires connection_id configuration)
  STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.your-dataset-id.agent_events`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;

クエリレシピ

実行をデバッグする

trace_id を使って特定の会話ターンを追跡

SELECT timestamp, event_type, agent, JSON_VALUE(content, '$.response') as summary
FROM `your-gcp-project-id.your-dataset-id.agent_events`
WHERE trace_id = 'your-trace-id'
ORDER BY timestamp ASC;

Span 階層と duration 分析

SELECT
  span_id,
  parent_span_id,
  event_type,
  timestamp,
  -- Extract duration from latency_ms for completed operations
  CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) as duration_ms,
  -- Identify the specific tool or operation
  COALESCE(
    JSON_VALUE(content, '$.tool'),
    'LLM_CALL'
  ) as operation
FROM `your-gcp-project-id.your-dataset-id.agent_events`
WHERE trace_id = 'your-trace-id'
  AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
ORDER BY timestamp ASC;

エラー分析(LLM とツールエラー)

ビューを使う方法(推奨):

-- Tool errors with provenance
SELECT timestamp, agent, tool_name, tool_origin, error_message, total_ms
FROM `your-gcp-project-id.your-dataset-id.v_tool_error`
ORDER BY timestamp DESC
LIMIT 20;

-- LLM errors
SELECT timestamp, agent, error_message, total_ms
FROM `your-gcp-project-id.your-dataset-id.v_llm_error`
ORDER BY timestamp DESC
LIMIT 20;

コストと性能を監視する

トークン使用量分析

v_llm_response ビューを使う方法(推奨):

SELECT
  AVG(usage_total_tokens) as avg_tokens,
  AVG(usage_prompt_tokens) as avg_prompt_tokens,
  AVG(usage_completion_tokens) as avg_completion_tokens
FROM `your-gcp-project-id.your-dataset-id.v_llm_response`;

または JSON 抽出とともにベーステーブルを使います。

SELECT
  AVG(CAST(JSON_VALUE(content, '$.usage.total') AS INT64)) as avg_tokens
FROM `your-gcp-project-id.your-dataset-id.agent_events`
WHERE event_type = 'LLM_RESPONSE';

レイテンシ分析(LLM とツール)

ビューを使う方法(推奨):

-- LLM latency
SELECT AVG(total_ms) as avg_llm_ms, AVG(ttft_ms) as avg_ttft_ms
FROM `your-gcp-project-id.your-dataset-id.v_llm_response`;

-- Tool latency by tool name
SELECT tool_name, tool_origin, AVG(total_ms) as avg_tool_ms
FROM `your-gcp-project-id.your-dataset-id.v_tool_completed`
GROUP BY tool_name, tool_origin
ORDER BY avg_tool_ms DESC;

またはベーステーブルを使います。

SELECT
  event_type,
  AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64)) as avg_latency_ms
FROM `your-gcp-project-id.your-dataset-id.agent_events`
WHERE event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
GROUP BY event_type;

ツールとインタラクションを調べる

Tool Provenance 分析

v_tool_completed ビューを使う方法(推奨):

SELECT
  tool_origin,
  tool_name,
  COUNT(*) as call_count,
  AVG(total_ms) as avg_latency_ms
FROM `your-gcp-project-id.your-dataset-id.v_tool_completed`
GROUP BY tool_origin, tool_name
ORDER BY call_count DESC;

HITL インタラクション分析

SELECT
  timestamp,
  event_type,
  session_id,
  JSON_VALUE(content, '$.tool') as hitl_tool,
  content
FROM `your-gcp-project-id.your-dataset-id.agent_events`
WHERE event_type LIKE 'HITL_%'
ORDER BY timestamp DESC
LIMIT 20;

マルチモーダルコンテンツを分析する

マルチモーダルコンテンツの照会(content_parts と ObjectRef を使用)

SELECT
  timestamp,
  part.mime_type,
  part.object_ref.uri as gcs_uri
FROM `your-gcp-project-id.your-dataset-id.agent_events`,
UNNEST(content_parts) as part
WHERE part.mime_type LIKE 'image/%'
ORDER BY timestamp DESC;

BigQuery リモートモデル(Gemini)でマルチモーダルコンテンツを分析

SELECT
  logs.session_id,
  -- Get a signed URL for the image
  STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
  -- Analyze the image using a remote model (e.g., gemini-pro-vision)
  AI.GENERATE(
    ('Describe this image briefly. What company logo?', parts.object_ref)
  ) AS generated_result
FROM
  `your-gcp-project-id.your-dataset-id.agent_events` logs,
  UNNEST(logs.content_parts) AS parts
WHERE
  parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;

AI による根本原因分析

BigQuery ML と Gemini を使い、失敗したセッションを自動分析してエラーの根本原因を 特定します。

DECLARE failed_session_id STRING;
-- Find a recent failed session
SET failed_session_id = (
    SELECT session_id
    FROM `your-gcp-project-id.your-dataset-id.agent_events`
    WHERE error_message IS NOT NULL
    ORDER BY timestamp DESC
    LIMIT 1
);

-- Reconstruct the full conversation context
WITH SessionContext AS (
    SELECT
        session_id,
        STRING_AGG(CONCAT(event_type, ': ', COALESCE(TO_JSON_STRING(content), '')), '\n' ORDER BY timestamp) as full_history
    FROM `your-gcp-project-id.your-dataset-id.agent_events`
    WHERE session_id = failed_session_id
    GROUP BY session_id
)
-- Ask Gemini to diagnose the issue
SELECT
    session_id,
    AI.GENERATE(
        ('Analyze this conversation log and explain the root cause of the failure. Log: ', full_history),
        endpoint => 'gemini-flash-latest'
    ).result AS root_cause_explanation
FROM SessionContext;

会話型分析

BigQuery Conversational Analytics を 使って、自然言語でエージェントログを分析することもできます。agent_events テーブルに接続された会話型分析エージェントを BigQuery Agents Hub で作成し、次のように 質問できます。

  • "Show me the error rate over time"
  • "What are the most common tool calls?"
  • "Identify sessions with high token usage"

プラグイン付きで Agent Runtime にデプロイ

BigQuery Agent Analytics プラグインを含むエージェントを Agent Runtime にデプロイできます。このセクションでは、ADK CLI を使ったデプロイ手順と、 代替として Agent Platform SDK を使うプログラム的な方法を説明します。

バージョン要件

このプラグインを Agent Runtime にデプロイするには、ADK Python バージョン 1.24.0 以上を使用してください。以前のバージョンでは、プラグインの 非同期ログ writer がサーバーレスランタイムによって終了される前に、保留中の イベントを flush できない問題がありました。1.24.0 以降では、各 invocation の終了時に同期 flush を行い、すべてのイベントが書き込まれるようにします。

前提条件

デプロイ前に、次の項目を含む一般的な Agent Runtime セットアップ を完了して ください。

  1. Agent Platform APICloud Resource Manager API が有効な Google Cloud プロジェクト
  2. 対象プロジェクトの BigQuery データセット(または適切な権限を持つ cross-project データセット)
  3. デプロイアーティファクト用の Cloud Storage staging バケット
  4. デプロイ用サービスアカウントに IAM 権限 に記載されたロールがあること
  5. コーディング環境が gcloud auth logingcloud auth application-default login認証されていること

ステップ 1: エージェントとプラグインを定義

プラグインを含む App オブジェクトを持つエージェントプロジェクトフォルダを 作成します。App オブジェクトは、プラグイン付き Agent Runtime デプロイに必要です。

my_bq_agent/
├── __init__.py
├── agent.py
└── requirements.txt
my_bq_agent/__init__.py
from . import agent
my_bq_agent/agent.py
import os
import google.auth
from google.adk.agents import Agent
from google.adk.apps import App
from google.adk.models.google_llm import Gemini
from google.adk.plugins.bigquery_agent_analytics_plugin import (
    BigQueryAgentAnalyticsPlugin,
    BigQueryLoggerConfig,
)
from google.adk.tools.bigquery import BigQueryToolset, BigQueryCredentialsConfig

# --- Configuration ---
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
DATASET_ID = os.environ.get("BQ_DATASET", "agent_analytics")
# BQ_LOCATION is the BigQuery dataset location (multi-region "US"/"EU" or
# a single region like "us-central1"). This is separate from the Agent Platform
# region used by GOOGLE_CLOUD_LOCATION.
BQ_LOCATION = os.environ.get("BQ_LOCATION", "US")

os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"

# --- Plugin ---
bq_analytics_plugin = BigQueryAgentAnalyticsPlugin(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    location=BQ_LOCATION,
    config=BigQueryLoggerConfig(
        batch_size=1,
        batch_flush_interval=0.5,
        log_session_metadata=True,
    ),
)

# --- Tools ---
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
bigquery_toolset = BigQueryToolset(
    credentials_config=BigQueryCredentialsConfig(credentials=credentials)
)

# --- Agent ---
root_agent = Agent(
    model=Gemini(model="gemini-flash-latest"),
    name="my_bq_agent",
    instruction="You are a helpful assistant with access to BigQuery tools.",
    tools=[bigquery_toolset],
)

# --- App (required for Agent Runtime with plugins) ---
app = App(
    name="my_bq_agent",
    root_agent=root_agent,
    plugins=[bq_analytics_plugin],
)
my_bq_agent/requirements.txt
google-adk[bigquery]
google-cloud-bigquery-storage
pyarrow
opentelemetry-api
opentelemetry-sdk

ステップ 2: ADK CLI でデプロイ

adk deploy agent_engine コマンドを使ってエージェントをデプロイします。 --adk_app フラグは、どの App オブジェクトを使うかを CLI に伝えます。

PROJECT_ID=your-gcp-project-id
LOCATION=us-central1

adk deploy agent_engine \
    --project=$PROJECT_ID \
    --region=$LOCATION \
    --staging_bucket=gs://your-staging-bucket \
    --display_name="My BQ Analytics Agent" \
    --adk_app=agent.app \
    my_bq_agent

--adk_app フラグ

--adk_app フラグは、App オブジェクトのモジュールパスと変数名を module.variable 形式で指定します。この例では、agent.appagent.pyapp 変数を指します。これにより、デプロイはプラグイン構成を正しく拾います。

デプロイに成功すると、次のような出力が表示されます。

AgentEngine created. Resource name: projects/123456789/locations/us-central1/reasoningEngines/751619551677906944

次のステップで使う Resource name を控えておいてください。

ステップ 3: デプロイしたエージェントをテスト

デプロイ後、Agent Platform SDK を使ってエージェントへクエリできます。

test_deployed_agent.py
import uuid
import vertexai

PROJECT_ID = "your-gcp-project-id"
LOCATION = "us-central1"
AGENT_ID = "751619551677906944"  # from deployment output

vertexai.init(project=PROJECT_ID, location=LOCATION)
client = vertexai.Client(project=PROJECT_ID, location=LOCATION)

agent = client.agent_engines.get(
    name=f"projects/{PROJECT_ID}/locations/{LOCATION}/reasoningEngines/{AGENT_ID}"
)

user_id = f"test_user_{uuid.uuid4().hex[:8]}"
for chunk in agent.stream_query(
    message="List datasets in my project", user_id=user_id
):
    print(chunk, end="", flush=True)

ステップ 4: BigQuery でイベントを確認

デプロイしたエージェントにいくつかクエリを送信した後、BigQuery テーブルを クエリしてイベントが記録されていることを確認します。

SELECT timestamp, event_type, agent, content
FROM `your-gcp-project-id.agent_analytics.agent_events`
ORDER BY timestamp DESC
LIMIT 20;

INVOCATION_STARTING, LLM_REQUEST, LLM_RESPONSE, TOOL_STARTING, TOOL_COMPLETED, INVOCATION_COMPLETED などのイベントが表示されるはずです。

代替: Agent Platform SDK でデプロイ

Agent Platform SDK を直接使ってプログラム的にデプロイすることもできます。これは CI/CD パイプラインやカスタムデプロイワークフローに便利です。

deploy.py
import vertexai
from my_bq_agent.agent import app

PROJECT_ID = "your-gcp-project-id"
LOCATION = "us-central1"
STAGING_BUCKET = "gs://your-staging-bucket"

vertexai.init(
    project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET
)
client = vertexai.Client(project=PROJECT_ID, location=LOCATION)

remote_app = client.agent_engines.create(
    agent=app,
    config={
        "display_name": "My BQ Analytics Agent",
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-adk[bigquery]",
            "google-cloud-aiplatform[agent_engines]",
            "google-cloud-bigquery-storage",
            "pyarrow",
            "opentelemetry-api",
            "opentelemetry-sdk",
        ],
    },
)
print(f"Deployed agent: {remote_app.api_resource.name}")

トラブルシューティング

デプロイ後に BigQuery テーブルへイベントが表示されない場合は、次を確認します。

  1. ADK バージョンを確認: requirements に google-adk>=1.24.0 が含まれている ことを確認します。以前のバージョンは、サーバーレスランタイムがプロセスを 停止する前に保留中イベントを flush しません。

  2. デバッグロギングを有効化: 静かなエラーを見つけるには、agent.py の先頭に 次を追加します。

    import logging
    logging.basicConfig(level=logging.INFO)
    logging.getLogger("google_adk").setLevel(logging.DEBUG)
    
  3. IAM 権限を確認: Agent Runtime サービスアカウントには、対象テーブルの roles/bigquery.dataEditor とプロジェクトの roles/bigquery.jobUser が必要です。 cross-project ロギングでは、ソースプロジェクトで BigQuery API が有効であり、 サービスアカウントに宛先テーブルの bigquery.tables.updateData があることも 確認してください。

  4. プラグイン初期化を確認: Cloud Logging で resource.type="reasoning_engine" を フィルタし、プラグイン起動メッセージやエラーログを探します。

  5. デバッグ用に即時 flush を使用: バッファリング問題を切り分けるには、 BigQueryLoggerConfigbatch_size=1batch_flush_interval=0.1 を設定します。

セキュリティ: 機密資格情報のロギングを避ける

OAuth トークン、API キー、クライアントシークレットを記録しないでください

BigQuery Agent Analytics プラグインは、ツール引数、LLM プロンプト、認証関連 イベント(HITL 資格情報要求など)を含む詳細なイベントペイロードを キャプチャします。エージェントが 認証済みツール(OAuth2 を使う AuthenticatedFunctionTool など)を使用する場合、プラグインは client_secret, access_token, API キーなどの機密値を BigQuery テーブルの content 列に記録する可能性があります。

これは既知の懸念 (google/adk-python#3845) であり、 分析データ内で資格情報が露出する可能性があります。

プラグインには、一般的なシークレットを自動保護する 組み込みマスキング が 含まれています。追加の制御が必要な場合は、その上にカスタムマスキングを重ねられます。

組み込みマスキング

プラグインは、content または attributes JSON のどこに現れても、次の既知の キー名の値を自動的にマスクします(大文字小文字を区別しません)。

client_secret, access_token, refresh_token, id_token, api_key, password

さらに、temp: または secret: 接頭辞の state key は、記録される state_delta 内で自動的に [REDACTED] に置き換えられます。つまり、credential service によってキャッシュされた OAuth トークンのように secret: scope に保存 された ADK セッション状態は、BigQuery に永続化されません。

構成不要

組み込みマスキングは、構造化された attributes と state ロギングに常に有効で、 ネストされた辞書や attribute 値内の JSON エンコード文字列にも再帰的に適用 されます。カスタム content_formatter は生の content に 最初に 実行される ため、自由形式ペイロードに現れる可能性のあるシークレットを追加マスクする際に 使ってください。

content_formatter で追加シークレットをマスクする

BigQueryLoggerConfig にカスタム content_formatter 関数を指定し、書き込み前に 機密フィールドを削除またはマスクできます。

import json
import re
from typing import Any

SENSITIVE_KEYS = {"client_secret", "access_token", "refresh_token", "api_key", "secret"}

def redact_credentials(event_content: Any, event_type: str) -> str:
    """Redact OAuth secrets and tokens from logged content."""
    if isinstance(event_content, dict):
        text = json.dumps(event_content)
    else:
        text = str(event_content)

    for key in SENSITIVE_KEYS:
        # Redact values in JSON-like strings: "client_secret": "GOCSPX-xxx"
        text = re.sub(
            rf'("{key}"\s*:\s*)"[^"]*"',
            rf'\1"[REDACTED]"',
            text,
            flags=re.IGNORECASE,
        )
    return text

config = BigQueryLoggerConfig(
    content_formatter=redact_credentials,
    # ... other options
)

event_denylist で資格情報イベントを除外する

認証関連イベントを記録する必要がない場合は、完全に除外します。

config = BigQueryLoggerConfig(
    event_denylist=[
        "HITL_CREDENTIAL_REQUEST",
        "HITL_CREDENTIAL_REQUEST_COMPLETED",
    ],
    # ... other options
)

一般的なベストプラクティス

  • エージェントソースコードにシークレットをハードコードしないでください。OAuth クライアントシークレットや API キーには、環境変数または Google Cloud Secret Manager などの secret manager を使用します。
  • IAM を使って BigQuery テーブルアクセスを制限し、記録されたイベントデータを 読める人を限定してください。
  • 予期しない機密データがキャプチャされていないことを確認するため、ログを定期的に 監査してください。

運用

トレーシングと可観測性

プラグインは trace_idspan_idparent_span_id の各列を、発行されるすべての行に入力するため、親子実行ツリー(Agent → LLM 呼び出し / ツール呼び出し)は BigQuery からきれいに再構築されます。

  • 内部での span 追跡、OTel span のエクスポートなし: プラグインは、16 桁の hex 値の span_id からなる独自の内部スタックで親子階層を追跡します。ルート呼び出しの span は、アクティブな周囲の OTel span がある場合はその ID を再利用します(これにより runner の呼び出し span と一致します)。子 BQAA span は内部で生成されます。構成された OpenTelemetry TracerProvidertracer.start_span(...) を呼び出さないため、そのインスツルメンテーションが構成されたエクスプローラーに到達することはありません。これにより、Agent Engine のテレメトリが有効になっている場合(GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY=true)や、他の Cloud Trace エクスポートツールをホストプロセスに接続した場合に、Cloud Trace で重複する span が発生するのを防ぎます。
  • 周囲の OTel Span がある場合の trace_id 継承: 周囲のランタイムで既に OTel span(Agent Engine の呼び出し span、ADK Runner の呼び出し span、またはエージェント実行前に開始されたその他の span)が生成されている場合、プラグインはその trace_id を読み取ってすべての BigQuery 行に記録します。これにより、BigQuery の行は共有された trace_id を介して既存の Cloud Trace トレースにきれいに接続されます。
  • 周囲の Span がない場合のフォールバック: アクティブな周囲の OTel span がない場合(例:ホスト側のトレーサーが構成されていない非 Agent Engine デプロイ)、プラグインは呼び出しごとに 32 桁の hex 値である trace_id を新しく生成し、外部にトレーサー設定がなくても BigQuery 内で親子階層構造が常に保存されるようにします。
  • TracerProvider が不要: ホストプロセスに OpenTelemetry TracerProvider を構成することは任意です。これは、プラグインの trace_id が既に存在するユーザーの周囲の span から取得されることを望む場合にのみ有用です(例:非 ADK サービスのテレメトリデータと関連付ける場合)。プラグインは、独自の記録管理のためにプロバイダーを必要としなくなりました。

プラグインが OTel エクスポートツールにデータを供給することに依存していた場合

一部の古い構成では、OpenTelemetry span をエクスポートするために BQAA プラグインを補助チャネルとして使用していましたが、このパスは意図的に削除されました。代わりに、ホストアプリケーションで OTel インスツルメンテーションを構成してください。(Agent Engine はこれを自動的に接続します。ローカルデプロイの場合は、ADK 自体のフレームワークインスツルメンテーションまたは明示的な TracerProvider を使用してください。)プラグインの BigQuery 行は、引き続き trace_id を介してトレースに接続されます。

公開メソッド

プラグインはライフサイクル管理のためにいくつかの公開メソッドを提供しています。

  • await plugin.flush(): 保留中のすべてのイベントを BigQuery に flush します。データ損失を避けるため、終了前に呼び出してください。
  • await plugin.shutdown(timeout=None): プラグインを正常終了し、保留中のイベントを flush してリソースを解放します。任意の timeout パラメータは config の shutdown_timeout を上書きします。
  • await plugin.create_analytics_views(): イベントタイプ別 analytics view をすべて手動で再作成します。スキーマアップグレード後やビュー更新が必要なときに便利です。
  • plugin.get_drop_stats(): drop_reason ごとにドロップされたイベント数のスナップショットを返します。以下のドロップされたイベントの可観測性を参照してください。
  • 非同期コンテキストマネージャー: プラグインは自動起動と終了のために async with をサポートします。

    async with BigQueryAgentAnalyticsPlugin(
        project_id=PROJECT_ID, dataset_id=DATASET_ID
    ) as plugin:
        # plugin is initialized and ready to use
        ...
    # plugin.shutdown() is called automatically on exit
    

Java では、プラグインのライフサイクルは Plugin から継承された close() メソッド(RxJava の Completable を返す)を介して管理されます。

  • plugin.close(): プラグインを正常に終了し、保留中のイベントを flush し、リソース(BigQuery の書き込みクライアントおよびエグゼキュータを含む)を解放します。
  • 自動クローズ: InMemoryRunner を使用している場合、runner.close() を呼び出すと、登録されているすべてのプラグイン(BigQuery Agent Analytics プラグインを含む)が自動的にクローズされます。
// 手動シャットダウン
plugin.close().blockingAwait();

ドロップされたイベントの可観測性

BigQuery へのロギングはベストエフォート(best-effort)方式です。メモリ内キューがオーバーフローするか、書き込みが最終的に失敗した場合、イベントがドロップされる可能性があります。プラグインは drop_reason ごとにドロップされた行を追跡し、ポーリング API を公開しているため、ホストはこれを検出してアラートを発信し、カウントを独自のモニタリングシステムに送信できます。

ドロップ理由:

理由 (Reason) 原因 (Cause)
queue_full メモリ内バッチキューがオーバーフローしました(ホストがドレイナー(drainer)が送信できる速度よりも速くイベントを生成しています)。BigQueryLoggerConfigqueue_max_size を増やすか、より大きなチャンクでドレインするために batch_size を上げるか、コンシューマー側をスケール(より多くの同時呼び出しをより高速に完了)させてください。
arrow_prep_failed 行を Arrow 表現に変換できませんでした(通常、スキーマ/タイプの不一致)。ログを調査して問題のあるフィールドを確認してください。
retry_exhausted Storage Write API の呼び出しが、再試行バジェットを使い果たすまで再試行可能なエラー(例:一時的な gRPC エラー)を返し続けました。
non_retryable Storage Write API が再試行不可能なエラー(権限、割り当て、スキーマ拒否)を返しました。通常、管理者の介入が必要です。
unexpected_error バッチの準備または書き込み中に捕捉されたその他の例外です。

カウントの読み取り:

# プラグイン起動後の {drop_reason: count} のスナップショット。
stats = plugin.get_drop_stats()
# 例: {"queue_full": 12, "retry_exhausted": 0, ...}

total_dropped = sum(stats.values())

モニタリングシステムへのエクスポート — 定期的にポーリングして差分(delta)を送信します:

import asyncio

async def export_loop(plugin):
    last = {k: 0 for k in (
        "queue_full", "arrow_prep_failed",
        "retry_exhausted", "non_retryable", "unexpected_error",
    )}
    while True:
        current = plugin.get_drop_stats()
        for reason, count in current.items():
            delta = count - last.get(reason, 0)
            if delta:
                # 例: metric_client.write_point(
                #         metric="bqaa_dropped_events",
                #         labels={"reason": reason}, value=delta)
                ...
        last = current
        await asyncio.sleep(60)

継続的に queue_full または retry_exhausted カウントが 0 以外になっている場合は、BQAA がデータ損失のリスクにさらされていることを示す最も明確なシグナルです。ダッシュボードまたはアラートに表示することをお勧めします。

Multiprocessing と fork 安全性

プラグインは fork-aware です。gRPC C-core ライブラリをロードする前に GRPC_ENABLE_FORK_SUPPORT=1 を設定し、子プロセスで継承されたランタイム状態 (gRPC channel、write stream、event loop)をリセットする os.register_at_fork handler を登録します。これにより、プラグインは os.fork() 後も file descriptor をリークしたり、親の接続へデータを送ったりせずに動作できます。

ただし、本番デプロイでは spawn multiprocessing start method が推奨されます。 fork は進行中の gRPC 状態を含む親アドレス空間をコピーし、fork 後のリセットは 各子プロセスの最初の write にレイテンシを追加します。spawn では各 worker が プラグインをクリーンに初期化します。

Gunicorn デプロイでは特に次を推奨します。

  • lazy plugin initialization と組み合わせて --preload を使用します(プラグインは 最初のイベントが記録されるまでセットアップを遅延します)。
  • または post_fork hook 内でプラグインを初期化し、各 worker が独自の client を 持つようにします。

Note

fork-safety メカニズムはランタイム状態のみをリセットします。fork 時点で親 プロセスの queue にあり、まだ flush されていなかったイベントを replay する ことはありません。配信保証が必要な場合は、fork 前に await plugin.flush() を 呼び出してください。

コンテキストグラフ

行レベルの agent_events に加えて、BigQuery Agent Analytics SDK を使用してコンテキストグラフを実体化(materialize)できます。これは、エージェントの意思決定(処理したリクエスト、検討したオプション、選択した結果)をクエリ可能な BigQuery の プロパティグラフ です。単にイベントがログに記録されたという事実だけでなく、Graph Query Language(GQL)を使用して意思決定が何故行われたかの 理由 を追跡できます。

Context graph flow: an ADK agent's events flow through the BigQuery Agent Analytics plugin into the agent_events table; the SDK's bqaa context-graph command materializes a structured decision graph that auditors, operators, and executives consume through GQL in BigQuery Studio and Conversational Analytics — with no external graph database.

グラフは、テーブルの DDL と CREATE PROPERTY GRAPH スキーマの 2 つの宣言的なアーティファクトで定義されます。SDK の bqaa context-graph --property-graph コマンドは、これらと実際のテーブルスキーマから抽出対象(取得するエンティティと関係性、およびその列の型)を導き出します。一般的なケースでは別個のオントロジーやバインディングファイルは不要で、AI プロンプトのガイドとして説明が必要な場合や、エンティティの継承、派生プロパティ、または列名の変更が必要な場合にのみ明示的な ontology.yaml / binding.yaml を使用します。

ローカルで 1 回実行するか、Cloud Scheduler でトリガーされる Cloud Run ジョブとしてスケジュール実行します。これには、読み取り専用イベント/書き込み可能グラフのデータセット分割、最小権限のサービスアカウント、構造化された JSON ログ、および Cloud Monitoring アラートが含まれます。運用リファレンス(前提条件、IAM マトリックス、推奨スケジュール、JSON ログの形状、監視、およびクリーンアップ)は SDK リポジトリにあります。

プラグインを使用して Agent Runtime にデプロイする

BigQuery Agent Analytics プラグインを含むエージェントを Agent Runtime にデプロイできます。このセクションでは、ADK CLI を使用してデプロイする手順と、Agent Platform SDK をプログラムから使用してデプロイする代替手順を説明します。

バージョン要件

このプラグインを使用して Agent Runtime にデプロイするには、ADK Python バージョン 1.24.0 以上 を使用してください。以前のバージョンでは、サーバーレスランタイムが保留中のイベントをフラッシュする前にプラグインの非同期ログライターを強制終了してしまう問題がありました。1.24.0 以降、プラグインは各呼び出しの最後に同期フラッシュを実行して、すべてのイベントが確実に書き込まれるようにします。

前提条件

デプロイする前に、以下を含む一般的な Agent Runtime のセットアップ を完了していることを確認してください。

  1. Agent Platform APICloud Resource Manager API が有効化された Google Cloud プロジェクト。
  2. 対象プロジェクト의 BigQuery データセット(または正しい権限を持つクロスプロジェクトデータセット)。
  3. デプロイアーティファクト用の Cloud Storage ステージングバケット
  4. デプロイするサービスアカウントが IAM 権限 にリストされている IAM ロールを持っていること。
  5. 開発環境が gcloud auth login および gcloud auth application-default login認証 されていること。

ステップ 1: エージェントとプラグインを定義する

プラグインを含む App オブジェクトを持つエージェントプロジェクトフォルダを作成します。プラグインを含む Agent Runtime デプロイには App オブジェクトが必要です。

my_bq_agent/
├── __init__.py
├── agent.py
└── requirements.txt
my_bq_agent/__init__.py
from . import agent
my_bq_agent/agent.py
import os
import google.auth
from google.adk.agents import Agent
from google.adk.apps import App
from google.adk.models.google_llm import Gemini
from google.adk.plugins.bigquery_agent_analytics_plugin import (
    BigQueryAgentAnalyticsPlugin,
    BigQueryLoggerConfig,
)
from google.adk.tools.bigquery import BigQueryToolset, BigQueryCredentialsConfig

# --- 設定 ---
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
DATASET_ID = os.environ.get("BQ_DATASET", "agent_analytics")
# BQ_LOCATION は BigQuery データセットのロケーションです(マルチリージョン "US"/"EU" 또는
# "us-central1" などのシングルリージョン)。これは GOOGLE_CLOUD_LOCATION で使用される
# Agent Platform のリージョンとは別個です。
BQ_LOCATION = os.environ.get("BQ_LOCATION", "US")

os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"

# --- プラグイン ---
bq_analytics_plugin = BigQueryAgentAnalyticsPlugin(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    location=BQ_LOCATION,
    config=BigQueryLoggerConfig(
        batch_size=1,
        batch_flush_interval=0.5,
        log_session_metadata=True,
    ),
)

# --- ツール ---
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
bigquery_toolset = BigQueryToolset(
    credentials_config=BigQueryCredentialsConfig(credentials=credentials)
)

# --- エージェント ---
root_agent = Agent(
    model=Gemini(model="gemini-flash-latest"),
    name="my_bq_agent",
    instruction="You are a helpful assistant with access to BigQuery tools.",
    tools=[bigquery_toolset],
)

# --- アプリ (プラグインを伴う Agent Runtime のデプロイで必要) ---
app = App(
    name="my_bq_agent",
    root_agent=root_agent,
    plugins=[bq_analytics_plugin],
)
my_bq_agent/requirements.txt
google-adk[bigquery]
google-cloud-bigquery-storage
pyarrow
opentelemetry-api
opentelemetry-sdk

ステップ 2: ADK CLI を使用してデプロイする

エージェントをデプロイするために adk deploy agent_engine コマンドを使用します。--adk_app フラグは使用する App オブジェクトを CLI に伝えます。

PROJECT_ID=your-gcp-project-id
LOCATION=us-central1

adk deploy agent_engine \
    --project=$PROJECT_ID \
    --region=$LOCATION \
    --staging_bucket=gs://your-staging-bucket \
    --display_name="My BQ Analytics Agent" \
    --adk_app=agent.app \
    my_bq_agent

--adk_app フラグ

--adk_app フラグは App オブジェクトのモジュールパスと変数名を module.variable の形式で指定します。この例では、agent.appagent.py 内の app 変数を参照します。これにより、デプロイ時にプラグイン設定が正しく反映されます。

正常にデプロイされると、以下のような出力が表示されます。

AgentEngine created. Resource name: projects/123456789/locations/us-central1/reasoningEngines/751619551677906944

次のステップのために Resource name をメモしておいてください。

ステップ 3: デプロイされたエージェントをテストする

デプロイ後、Agent Platform SDK を使用してエージェントにクエリを実行できます。

test_deployed_agent.py
import uuid
import vertexai

PROJECT_ID = "your-gcp-project-id"
LOCATION = "us-central1"
AGENT_ID = "751619551677906944"  # デプロイ出力から取得した ID

vertexai.init(project=PROJECT_ID, location=LOCATION)
client = vertexai.Client(project=PROJECT_ID, location=LOCATION)

agent = client.agent_engines.get(
    name=f"projects/{PROJECT_ID}/locations/{LOCATION}/reasoningEngines/{AGENT_ID}"
)

user_id = f"test_user_{uuid.uuid4().hex[:8]}"
for chunk in agent.stream_query(
    message="List datasets in my project", user_id=user_id
):
    print(chunk, end="", flush=True)

ステップ 4: BigQuery でイベントを確認する

デプロイされたエージェントにいくつかクエリを送信した後、BigQuery テーブルをクエリしてイベントがログに記録されていることを確認します。

SELECT timestamp, event_type, agent, content
FROM `your-gcp-project-id.agent_analytics.agent_events`
ORDER BY timestamp DESC
LIMIT 20;

INVOCATION_STARTING, LLM_REQUEST, LLM_RESPONSE, TOOL_STARTING, TOOL_COMPLETED, INVOCATION_COMPLETED などのイベントが表示されるはずです。

代替案: Agent Platform SDK を使用してデプロイする

プログラムから直接デプロイすることもできます。これは CI/CD パイプラインやカスタムデプロイワークフローに便利です。

deploy.py
import vertexai
from my_bq_agent.agent import app

PROJECT_ID = "your-gcp-project-id"
LOCATION = "us-central1"
STAGING_BUCKET = "gs://your-staging-bucket"

vertexai.init(
    project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET
)
client = vertexai.Client(project=PROJECT_ID, location=LOCATION)

remote_app = client.agent_engines.create(
    agent=app,
    config={
        "display_name": "My BQ Analytics Agent",
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-adk[bigquery]",
            "google-cloud-aiplatform[agent_engines]",
            "google-cloud-bigquery-storage",
            "pyarrow",
            "opentelemetry-api",
            "opentelemetry-sdk",
        ],
    },
)
print(f"Deployed agent: {remote_app.api_resource.name}")

トラブルシューティング

デプロイ後に BigQuery テーブルにイベントが表示されない場合:

  1. ADK バージョンの確認: 要件に google-adk>=1.24.0 が含まれていることを確認してください。以前のバージョンでは、サーバーレスランタイムがプロセスを一時停止する前に保留中のイベントがフラッシュされません。
  2. デバッグログの有効化: agent.py の上部に以下を追加して、サイレントエラーを表示させます。

    import logging
    logging.basicConfig(level=logging.INFO)
    logging.getLogger("google_adk").setLevel(logging.DEBUG)
    
  3. IAM 権限の確認: Agent Runtime のサービスアカウントには、ターゲットテーブルに対する roles/bigquery.dataEditor と、プロジェクトに対する roles/bigquery.jobUser ロールが必要です。クロスプロジェクト ログ記録の場合、ソースプロジェクトで BigQuery API が有効になっており、サービスアカウントが宛先テーブルに対する bigquery.tables.updateData を持っていることも確認してください。

  4. プラグイン初期化の検証: Cloud Logging で resource.type="reasoning_engine" でフィルタリングし、プラグインの起動メッセージやエラーログを探します。
  5. デバッグのための即時フラッシュの使用: バッファリングの問題を排除するために、BigQueryLoggerConfigbatch_size=1 および batch_flush_interval=0.1 に設定します。

セキュリティ: 機密性の高い資格情報のログ記録を避ける

OAuth トークン、API キー、またはクライアントシークレットをログに記録しないでください

BigQuery Agent Analytics プラグインは、ツールの引数、LLM プロンプト、認証関連イベント(HITL 資格情報リクエストなど)を含む詳細なイベントペイロードをキャプチャします。エージェントが認証済みツール(例:OAuth2 を使用した AuthenticatedFunctionTool)を使用している場合、プラグインは client_secretaccess_token、または API キーなどの機密性の高い値を BigQuery テーブルの content 列に記録する可能性があります。

これは既知の懸念事項(google/adk-python#3845)であり、分析データでの資格情報漏洩につながる可能性があります。

プラグインには、一般的なシークレットを自動的に保護する組み込みの秘匿化(redaction)が含まれています。追加の制御のために、カスタム秘匿化を上に重ねることができます。

組み込みの秘匿化

プラグインは、content または attributes JSON のどこに現れても、次のよく知られたキー名(大文字小文字を区別しない)の値を自動的に秘匿化します。

client_secret, access_token, refresh_token, id_token, api_key, password

さらに、temp: または secret: で始まる状態キーは、ログに記録された state_delta で自動的に [REDACTED] に置き換えられます。これは、secret: スコープ内に保存された ADK セッション状態(資格情報サービスによってキャッシュされた OAuth トークンなど)が BigQuery に永続化されないことを意味します。

設定不要

組み込みの秘匿化は、構造化された属性と状態のログ記録に対して常に有効であり、属性値内のネストされたディクショナリや JSON エンコードされた文字列に再帰的に適用されます。カスタム content_formatter は未加工コンテンツに対して最初に実行されるため、自由形式のペイロードに現れる可能性のあるシークレットのマスキングを追加するために使用してください。

content_formatter を使用して追加のシークレットを秘匿化する

書き込まれる前に機密フィールドを削除またはマスクするには、BigQueryLoggerConfig にカスタム content_formatter 関数を提供します。

import json
import re
from typing import Any

SENSITIVE_KEYS = {"client_secret", "access_token", "refresh_token", "api_key", "secret"}

def redact_credentials(event_content: Any, event_type: str) -> str:
    """ログに記録されたコンテンツから OAuth シークレットとトークンを秘匿化します。"""
    if isinstance(event_content, dict):
        text = json.dumps(event_content)
    else:
        text = str(event_content)

    for key in SENSITIVE_KEYS:
        # JSON 風の文字列の値を秘匿化: "client_secret": "GOCSPX-xxx"
        text = re.sub(
            rf'("{key}"\s*:\s*)"[^"]*"',
            rf'\1"[REDACTED]"',
            text,
            flags=re.IGNORECASE,
        )
    return text

config = BigQueryLoggerConfig(
    content_formatter=redact_credentials,
    # ... 他のオプション
)
import com.google.adk.plugins.agentanalytics.BigQueryLoggerConfig;
import java.util.function.BiFunction;
import java.util.Set;

Set<String> SENSITIVE_KEYS = Set.of("client_secret", "access_token", "refresh_token", "api_key", "secret");

BiFunction<Object, String, Object> redactCredentials = (content, eventType) -> {
  String text = content.toString();
  for (String key : SENSITIVE_KEYS) {
    // JSON 風の文字列の値を秘匿化: "client_secret": "GOCSPX-xxx"
    text = text.replaceAll(
        "(?i)(\"" + key + "\"\\s*:\\s*)\"[^\"]*\"",
        "$1\"[REDACTED]\""
    );
  }
  return text;
};

BigQueryLoggerConfig config = BigQueryLoggerConfig.builder()
    .contentFormatter(redactCredentials)
    // ... 他のオプション
    .build();

event_denylist を使用して資格情報イベントをスキップする

認証関連イベントをログに記録する必要がない場合は、それらを完全に除外します。

config = BigQueryLoggerConfig(
    event_denylist=[
        "HITL_CREDENTIAL_REQUEST",
        "HITL_CREDENTIAL_REQUEST_COMPLETED",
    ],
    # ... 他のオプション
)
import com.google.common.collect.ImmutableList;

BigQueryLoggerConfig config = BigQueryLoggerConfig.builder()
    .eventDenylist(ImmutableList.of(
        "HITL_CREDENTIAL_REQUEST",
        "HITL_CREDENTIAL_REQUEST_COMPLETED"
    ))
    // ... 他のオプション
    .build();

一般的なベストプラクティス

  • エージェントのソースコードにシークレットをハードコードしないでください。OAuth クライアントシークレットや API キーには、環境変数やシークレットマネージャー(例:Google Cloud Secret Manager)を使用してください。
  • ログに記録されたイベントデータを読み取れるユーザーを制限するには、IAM を使用して BigQuery 테이블へのアクセスを制限 してください。
  • 予期しない機密データがキャプチャされていないことを確認するために、定期的に ログを監査 してください。

運用 (Operations)

トレーシングと可観測性

プラグインは、出力されるすべての行に trace_idspan_id、および parent_span_id 列を入力するため、親子実行ツリー(Agent → LLM 呼び出し / Tool 呼び出し)が BigQuery から綺麗に再構築されます。

  • 内部スパン追跡、OTel スパンのエクスポートはなし。 プラグインは、自身の内部スタックにある 16桁の span_id 値で親子階層を追跡します。ルート呼び出しスパンは、有効な場合に周囲の OTel スパンの ID を再利用します(Runner の呼び出しスパンに合わせるため)。子 BQAA スパンは内部的に生成されます。設定された OpenTelemetry TracerProvidertracer.start_span(...) を呼び出すことはないため、その計測が設定されたエクスポートツールに到達することはありません。これにより、Agent Engine テレメトリが有効になっている場合(GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY=true)や、ホストプロセスに他の Cloud Trace エクスポートツールを接続した場合に、Cloud Trace で重複するスパンが発生するのを防ぎます。
  • 周囲の OTel スパンから trace_id を継承(存在する場合)。 周囲のランタイムが OTel スパン(Agent Engine の呼び出しスパン、ADK Runner の呼び出しスパン、またはエージェント実行前に開いたスパン)をすでに開始している場合、プラグインはその trace_id を読み取ってすべての BigQuery 行にスタンプします。したがって、BigQuery の行は、共有された trace_id を介して既存の Cloud Trace トレースと綺麗に結合されます。
  • 周囲にスパンが存在しない場合のフォールバック。 周囲の OTel スパンがアクティブでない場合(ホスト側にトレーサーが構成されていない非 Agent Engine デプロイなど)、プラグインは呼び出しごとに 32桁の trace_id を生成するため、外部トレーサーのセットアップがなくても親子階層は BigQuery で常に保存されます。
  • TracerProvider は不要です。 ホストプロセスでの OpenTelemetry TracerProvider の構成はオプションです。これは、プラグインの trace_id を既存の周囲のスパンから取得したい場合(非 ADK サービスからのテレメトリと関連付けるため)にのみ重要です。プラグインは、自身の記録管理にプロバイダーを必要としなくなりました。

プラグインが OTel エクスポートツールにデータを供給することに依存していた場合

一部の古い構成では、OpenTelemetry span をエクスポートするために BQAA プラグインを補助チャネルとして使用していましたが、このパスは意図的に削除されました。代わりに、ホストアプリケーションで OTel インスツルメンテーションを構成してください(Agent Engine はこれを自動的に接続します。ローカルデプロイの場合は、ADK 自体のフレームワークインスツルメンテーションまたは明示的な TracerProvider を使用してください)。プラグインの BigQuery 行は、引き続き trace_id を介してトレースに接続されます。

公開メソッド

プラグインはライフサイクル管理のためにいくつかの public メソッドを公開しています。

  • await plugin.flush(): 保留中のすべてのイベントを BigQuery に flush します。データ損失を避けるため、終了前に呼び出してください。
  • await plugin.shutdown(timeout=None): プラグインを正常終了し、保留中イベントを flush してリソースを解放します。任意の timeout パラメータは config の shutdown_timeout を上書きします。
  • await plugin.create_analytics_views(): イベントタイプ別 analytics view をすべて手動で再作成します。スキーマアップグレード後やビュー更新が必要なときに便利です。
  • plugin.get_drop_stats(): drop_reason ごとにドロップされたイベント数のスナップショットを返します。以下のドロップされたイベントの可観測性を参照してください。
  • 非同期コンテキストマネージャー: プラグインは自動起動と終了のために async with をサポートします。

    async with BigQueryAgentAnalyticsPlugin(
        project_id=PROJECT_ID, dataset_id=DATASET_ID
    ) as plugin:
        # plugin is initialized and ready to use
        ...
    # plugin.shutdown() is called automatically on exit
    

Java では、プラグインの ライフサイクルは Plugin から継承された close() メソッド(RxJava の Completable を返す)を介して管理されます。

  • plugin.close(): プラグインを正常に終了し、保留中のイベントを flush し、リソース(BigQuery の書き込みクライアントおよびエグゼキュータを含む)を解放します。
  • 自動クローズ: InMemoryRunner を使用している場合、runner.close() を呼び出すと、登録されているすべてのプラグイン(BigQuery Agent Analytics プラグインを含む)が自動的にクローズされます。
// 手動シャットダウン
plugin.close().blockingAwait();

ドロップされたイベントの可観測性

BigQuery へのロギングはベストエフォート(best-effort)方式です。メモリ内キューがオーバーフローするか、書き込みが最終的に失敗した場合、イベントがドロップされる可能性があります。プラグインは drop_reason ごとにドロップされた行を追跡し、ポーリング API を公開しているため、ホストはこれを検出してアラートを発信し、カウントを独自のモニ타リングシステムに送信できます。

ドロップ理由:

理由 (Reason) 原因 (Cause)
queue_full メモリ内バッチキューがオーバーフローしました(ホストがドレイナー(drainer)が送信できる速度よりも速くイベントを生成しています)。BigQueryLoggerConfigqueue_max_size を増やすか、より大きなチャンクでドレインするために batch_size を上げるか、コンシューマー側をスケール(より多くの同時呼び出しをより高速に完了)させてください。
arrow_prep_failed 行を Arrow 表現に変換できませんでした(通常、スキーマ/タイプの不一致)。ログを調査して問題のあるフィールドを確認してください。
retry_exhausted Storage Write API の呼び出しが、再試行バジェットを使い果たすまで再試行可能なエラー(例:一時的な gRPC エラー)を返し続けました。
non_retryable Storage Write API が再試行不可能なエラー(権限、割り当て、スキーマ拒否)を返しました。通常、管理者の介入が必要です。
unexpected_error バッチの準備または書き込み中に捕捉されたその他の例外です。

カウントの読み取り:

# プラグイン起動後の {drop_reason: count} のスナップショット。
stats = plugin.get_drop_stats()
# 例: {"queue_full": 12, "retry_exhausted": 0, ...}

total_dropped = sum(stats.values())

モニタリングシステムへのエクスポート — 定期的にポーリングして差分(delta)を送信します:

import asyncio

async def export_loop(plugin):
    last = {k: 0 for k in (
        "queue_full", "arrow_prep_failed",
        "retry_exhausted", "non_retryable", "unexpected_error",
    )}
    while True:
        current = plugin.get_drop_stats()
        for reason, count in current.items():
            delta = count - last.get(reason, 0)
            if delta:
                # 例: metric_client.write_point(
                #         metric="bqaa_dropped_events",
                #         labels={"reason": reason}, value=delta)
                ...
        last = current
        await asyncio.sleep(60)

継続的に queue_full または retry_exhausted カウントが 0 以外になっている場合は、BQAA がデータ損失のリスクにさらされていることを示す最も明確なシグナルです。ダッシュボードまたはアラートに表示することをお勧めします。

Multiprocessing と fork 安全性

プラグインは fork-aware です。gRPC C-core ライブラリをロードする前に GRPC_ENABLE_FORK_SUPPORT=1 を設定し、子プロセスで継承されたランタイム状態(gRPC チャネル、書き込みストリーム、イベントループ)をリセットする os.register_at_fork ハンドラーを登録します。これは、プラグインがファイル記述子をリークしたり、親の接続でデータを送信したりすることなく、os.fork() 後も生き残ることができることを意味します。

ただし、spawn は本番デプロイに推奨されるマルチプロセッシング開始メソッドですfork は親のアドレス空間(処理中の gRPC 状態を含む)をコピーし、フォーク後のリセットによって各子の最初の書き込みにレイテンシが追加されます。spawn を使用すると、各ワーカーはプラグインをクリーンに初期化します。

特に Gunicorn デプロイの場合:

  • 最初のイベントがログに記録されるまでプラグインがセットアップを遅延させる lazy 初期化と --preload を組み合わせるか、
  • post_fork フック内でプラグインを初期化し、各ワーカーが独自のクライアントを取得するようにします。

Note

フォークセーフティメカニズムはランタイム状態のみをリセットします。フォークの時点で親プロセスでキューに入れられていたがまだフラッシュされていなかったイベントを再再生(replay)することはありません。確実に配信する必要がある場合は、フォークする前に await plugin.flush() を呼び出してください。

記録データを利用する追加方法

BigQuery Agent Analytics SDK

BigQuery Agent Analytics SDK は、 プラグインによって記録されたデータをプログラムから利用、分析する方法を提供します。 SDK は次の用途に使えます。

  • エージェント評価: エージェント実行を期待結果と比較します。
  • Golden trajectory matching: エージェント実行パスが承認済みシーケンスと 一致するか検証します。
  • Trace 可視化: 記録された span からエージェント実行フローを再構築して 可視化します。

ダッシュボードを構築する

BigQuery Agent Analytics SDK には、エージェント性能データをクエリして可視化する 方法を示す Jupyter notebook 例 が含まれています。これを出発点に、BigQuery Agent Analytics データセットに合わせた 独自ダッシュボードを構築できます。Colab Data Apps を使って notebook を interactive dashboard として公開することもできます。

フィードバック

BigQuery Agent Analytics に関するフィードバックを歓迎します。質問、提案、問題が ある場合は bqaa-feedback@google.com までご連絡ください。

追加リソース