ADK向けのAerospike連携¶
adk-aerospike 連携は、ADK エージェントを分散リアルタイムキー値データベースである Aerospike に接続します。アプリケーションプロセス内でネイティブの Aerospike クライアントを使用し、単一のクラスタ上で3つすべての ADK Python ストレージインターフェースを実装します。aerospike:// URI スキーマを一度登録すれば、adk CLI はセッション、アーティファクト、メモリに Aerospike を使用できるようになります。
この連携を利用するには、いくつかの方法があります:
| アプローチ | 説明 |
|---|---|
| セッションサービス | AerospikeSessionService: スコープ付き状態(app:, user:, session)、チャンク化ストレージ付きイベント履歴、アトミックな append_event。 |
| メモリサービス | AerospikeMemoryService: トークンごとのポスティングリスト(posting-list)キーによる語彙적単語重複検索。 InMemoryMemoryService と同じセマンティクス。 |
| アーティファクトサービス | AerospikeArtifactService: セッションまたは user: ネームスペースごとのバージョン管理された Blob の保存。 |
| フルスタック | 3つすべてのサービスを1つの Runner に組み込むか、対応する aerospike:// URI を adk web / adk run に渡します。 |
主なユースケース¶
- 本番エージェントの永続化 (Production agent persistence): 別個のメモリサービスを運用することなく、会話状態、ツール出力、およびユーザー範囲のデータを再起動やレプリカ間で保持します。
- 高スループットエージェント (High-throughput agents): セッションの追加レイテンシが重要となるチャット、音声、リアルタイムオーケストレーションにおいて、ミリ秒未満の読み書きを実現します。
- 語彙的長期メモリ (Lexical long-term memory): 書き込み時にテキストをトークン化し、ポスティングリストキー (
app:user:kw:<token>) へのポイント読み取りによってメモリ行を復元します。埋め込みモデルは不要です。 - マルチモーダルアーティファクト (Multimodal artifacts): 画像、ファイル、生成された出力をバージョン履歴付きで保存します。
user:ファイル名はセッション間で可視化されます (ADK 仕様)。 - セルフホストおよびマルチテナント (Self-hosted and multi-tenant): 単一のネームスペースで、テナント範囲のアーティファクトおよびメモリ操作用の複合セカンダリインデックスを提供します。 Community または Enterprise、オンプレミスまたはクラウドで利用できます。
前提条件¶
- Python 3.11 以降
- Python用ADK (
google-adk) - Aerospike Database 7.x または 8.x (Community または Enterprise)
- 接続可能なクラスタ (以下のローカル Docker の例を参照)
開発用のローカル Aerospike の起動:
実行可能な例におけるモデル呼び出しのために、GOOGLE_API_KEY(またはモデルプロバイダの資格情報)を設定します。
インストール¶
エージェントでの使用¶
永続化されたセッションを持つ完全なマルチターンエージェントを構成するために、AerospikeSessionService を ADK Runner に組み込みます。
import asyncio
from adk_aerospike import AerospikeSessionService
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.genai import types
async def main() -> None:
session_service = AerospikeSessionService.from_uri(
"aerospike://localhost:3000/adk"
)
agent = LlmAgent(
name="assistant",
model="gemini-flash-latest",
instruction="Be helpful. Keep replies under 30 words.",
)
runner = Runner(
agent=agent,
app_name="myapp",
session_service=session_service,
)
session = await session_service.create_session(
app_name="myapp", user_id="user-1"
)
async for event in runner.run_async(
user_id="user-1",
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part(text="Hello")]
),
):
if event.content:
for part in event.content.parts or []:
if part.text:
print(part.text)
session_service.close()
asyncio.run(main())
状態、イベント、一覧表示のために、セッションサービスを直接使用します。スコープ付きのキーは ADK の規則(app:, user:, temp:)に従います。
import asyncio
from adk_aerospike import AerospikeSessionService
from google.adk.events import Event, EventActions
from google.genai import types
async def main() -> None:
svc = AerospikeSessionService.from_uri("aerospike://localhost:3000/adk")
session = await svc.create_session(
app_name="support_bot",
user_id="alice",
state={
"topic": "billing",
"app:tenant": "acme-corp",
"user:nickname": "Allie",
"temp:scratch": "throwaway",
},
)
await svc.append_event(
session,
Event(
invocation_id="i1",
author="user",
content=types.Content(
role="user",
parts=[types.Part(text="Where is my invoice?")],
),
actions=EventActions(state_delta={"turn": 1}),
),
)
fetched = await svc.get_session(
app_name="support_bot",
user_id="alice",
session_id=session.id,
)
print(fetched.state)
# topic, turn, app:tenant, user:nickname — temp: キーは永続化されません。
svc.close()
asyncio.run(main())
テキストを含むセッションイベントを永続化し、ベクトルインデックスなしで単語重複で検索します。
import asyncio
from adk_aerospike import AerospikeMemoryService
from google.adk.events import Event, EventActions
from google.adk.sessions import Session
from google.genai import types
async def main() -> None:
memory = AerospikeMemoryService.from_uri(
"aerospike://localhost:3000/adk", top_k=10
)
session = Session(
id="s-1",
app_name="support_bot",
user_id="alice",
events=[
Event(
invocation_id="i",
author="user",
content=types.Content(
role="user",
parts=[types.Part(text="Python uses duck typing.")],
),
actions=EventActions(),
),
],
)
await memory.add_session_to_memory(session)
resp = await memory.search_memory(
app_name="support_bot",
user_id="alice",
query="python duck typing",
)
for m in resp.memories:
print(m.content.parts[0].text)
memory.close()
asyncio.run(main())
セッションごとにバージョン管理されたアーティファクトを保存します。セッション間での可視性を高めるために、user: ファイル名プレフィックスを使用してください。
import asyncio
from adk_aerospike import AerospikeArtifactService
from google.genai import types
async def main() -> None:
svc = AerospikeArtifactService.from_uri(
"aerospike://localhost:3000/adk"
)
await svc.save_artifact(
app_name="support_bot",
user_id="alice",
session_id="s-1",
filename="report.pdf",
artifact=types.Part(
inline_data=types.Blob(
mime_type="application/pdf", data=b"%PDF-1.4..."
),
),
)
latest = await svc.load_artifact(
app_name="support_bot",
user_id="alice",
session_id="s-1",
filename="report.pdf",
)
print(latest.inline_data.mime_type)
svc.close()
asyncio.run(main())
from adk_aerospike import (
AerospikeArtifactService,
AerospikeMemoryService,
AerospikeSessionService,
)
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
uri = "aerospike://localhost:3000/adk"
session_service = AerospikeSessionService.from_uri(uri)
artifact_service = AerospikeArtifactService.from_uri(uri)
memory_service = AerospikeMemoryService.from_uri(uri)
agent = LlmAgent(name="assistant", model="gemini-flash-latest")
runner = Runner(
agent=agent,
app_name="myapp",
session_service=session_service,
artifact_service=artifact_service,
memory_service=memory_service,
)
URI スキーマを一度登録します(たとえば、エージェントの隣の services.py 内):
次に、各ストレージの役割について同一のネームスペースを向くように CLI に指定します:
adk web \
--session_service_uri=aerospike://localhost:3000/adk \
--artifact_service_uri=aerospike://localhost:3000/adk \
--memory_service_uri=aerospike://localhost:3000/adk
Note
register() は aerospike:// を ADK のサービスレジストリに組み込むため、開発 UI と CLI がカスタムファクトリコードなしでこれらの URL を解決できるようになります。
構成¶
接続 URI¶
3つのサービスすべてで1つの URI 形式が共有されます:
例:
aerospike://localhost:3000/adk
aerospike://user:pass@node1:3000,node2:3000/prod?set_prefix=prod_&tls=true
| クエリパラメータ | 説明 |
|---|---|
set_prefix |
Aerospike セット名のプレフィックス (デフォルトは adk_)。複数のアプリで1つのネームスペースを共有可能です。 |
tls=true |
TLSを有効にします。 mTLSの詳細については、from_uri に tls_config={...} を渡します。 |
auth_mode |
INTERNAL (デフォルト)、EXTERNAL、EXTERNAL_INSECURE、または PKI。 |
サービス間で共有接続プールを利用するために、既存の aerospike.Client および Schema を使用してサービスを構築することもできます。
状態のスコープ指定¶
セッションの state は、プレフィックスキーを使用します (google.adk.sessions.state.State と同様):
| プレフィックス | 保存先 | 可視性 |
|---|---|---|
app:foo |
adk_app_state |
アプリの全ユーザー |
user:foo |
adk_user_state |
セッションを超えた現在のユーザー |
temp:foo |
永続化されない | 現在の呼び出し(invocation)のみ |
| (プレフィックスなし) | 세션 레코드 | 現在のセッションのみ |
get_session は、ADK 互換性のためにプレフィックスが復元された1つの dict にすべてのスコープをマージします。
利用可能なサービス¶
サービス¶
| サービス | ADK インターフェース | 説明 |
|---|---|---|
AerospikeSessionService |
BaseSessionService |
セッション、イベント、スコープ付き状態。セッションレコード上のホットイベントテール。256KiBでのクローズドチャンク。ほとんどの追加は1回のアトミックな operate()。 get_session は 1 RTT でセッション+アプリ+ユーザー状態を取得するために batch_read を使用します。 |
AerospikeArtifactService |
BaseArtifactService |
(app, user, session, filename) ごとのバージョン管理されたアーティファクト。バージョンあたり最大 8MiB までのインラインペイロード。 user: ファイル名は ADK ユーザーネームスペースセンチネルを使用します。 |
AerospikeMemoryService |
BaseMemoryService |
テキストを含むイベントごとに1つのメモリ行。トークンごとのポスティングリスト PK。 search_memory はクエリトークンの重複でランク付けします。 |
URI 登録¶
| 関数 | 説明 |
|---|---|
adk_aerospike.register() |
CLI および adk web 用に aerospike:// を ADK のサービスレジ스트リに登録します。 |
ストレージレイアウト¶
ネームスペース内のデフォルトのセットプレフィックス adk_:
| セット | キーパターン | 用途 |
|---|---|---|
adk_sessions |
app:user:session |
セッションレコード(状態 + ホットイベントテール) |
adk_sessions |
app:user:session:c:NNNNNNNN |
チャンク化されたイベント |
adk_sessions |
app:user:sl |
list_sessions 用のセッションリストマニフェスト |
adk_app_state |
app |
アプリスコープの状態 |
adk_user_state |
app:user |
ユーザースコープの状態 |
adk_artifacts |
app:user:session:fname:ver |
アーティファクトのバージョン |
adk_memory |
app:user:session:event_id |
メモリ行 |
adk_memory |
app:user:kw:token |
語彙検索用のポスティングリスト |
インデックス、チャンキング不変条件、および運用上の注意事項については、リポジトリの データモデル を参照してください。