Warning
_run_async_impl
(또는 다른 언어의 동등한 메서드)을 직접 구현하여 사용자 정의 에이전트를 구축하는 것은 강력한 제어 기능을 제공하지만, 미리 정의된 LlmAgent
또는 표준 WorkflowAgent
유형을 사용하는 것보다 더 복잡합니다. 사용자 정의 오케스트레이션 로직을 다루기 전에 이러한 기본 에이전트 유형을 먼저 이해하는 것이 좋습니다.
사용자 정의 에이전트¶
사용자 정의 에이전트는 BaseAgent
에서 직접 상속받아 자체 제어 흐름을 구현함으로써 임의의 오케스트레이션 로직을 정의할 수 있는 ADK의 궁극적인 유연성을 제공합니다. 이는 SequentialAgent
, LoopAgent
, ParallelAgent
의 미리 정의된 패턴을 넘어, 매우 구체적이고 복잡한 에이전트 워크플로를 구축할 수 있게 해줍니다.
소개: 미리 정의된 워크플로를 넘어서¶
사용자 정의 에이전트란 무엇인가요?¶
사용자 정의 에이전트는 본질적으로 google.adk.agents.BaseAgent
를 상속받아 _run_async_impl
비동기 메서드 내에서 핵심 실행 로직을 구현하는 클래스입니다. 이 메서드가 다른 에이전트(하위 에이전트)를 호출하고, 상태를 관리하며, 이벤트를 처리하는 방법을 완전히 제어할 수 있습니다.
Note
에이전트의 핵심 비동기 로직을 구현하기 위한 특정 메서드 이름은 SDK 언어에 따라 약간 다를 수 있습니다 (예: Java의 runAsyncImpl
, Python의 _run_async_impl
). 자세한 내용은 언어별 API 문서를 참조하세요.
왜 사용해야 하나요?¶
표준 워크플로 에이전트(SequentialAgent
, LoopAgent
, ParallelAgent
)가 일반적인 오케스트레이션 패턴을 다루지만, 요구 사항에 다음이 포함된 경우 사용자 정의 에이전트가 필요합니다:
- 조건부 로직: 런타임 조건이나 이전 단계의 결과에 따라 다른 하위 에이전트를 실행하거나 다른 경로를 택합니다.
- 복잡한 상태 관리: 단순한 순차적 전달을 넘어 워크플로 전반에 걸쳐 상태를 유지하고 업데이트하기 위한 복잡한 로직을 구현합니다.
- 외부 통합: 오케스트레이션 흐름 제어 내에서 직접 외부 API, 데이터베이스 또는 사용자 정의 라이브러리 호출을 통합합니다.
- 동적 에이전트 선택: 상황이나 입력에 대한 동적 평가를 기반으로 다음에 실행할 하위 에이전트를 선택합니다.
- 고유한 워크플로 패턴: 표준 순차, 병렬 또는 루프 구조에 맞지 않는 오케스트레이션 로직을 구현합니다.
사용자 정의 로직 구현:¶
모든 사용자 정의 에이전트의 핵심은 고유한 비동기 동작을 정의하는 메서드입니다. 이 메서드를 사용하면 하위 에이전트를 조율하고 실행 흐름을 관리할 수 있습니다.
모든 사용자 정의 에이전트의 핵심은 _run_async_impl
메서드입니다. 여기서 고유한 동작을 정의합니다.
- 시그니처:
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
- 비동기 생성기:
async def
함수여야 하며AsyncGenerator
를 반환해야 합니다. 이를 통해 하위 에이전트나 자체 로직에서 생성된 이벤트를 러너에게yield
할 수 있습니다. ctx
(InvocationContext): 중요한 런타임 정보, 특히 사용자 정의 에이전트가 조정하는 단계 간에 데이터를 공유하는 기본 방법인ctx.session.state
에 대한 접근을 제공합니다.
모든 사용자 정의 에이전트의 핵심은 BaseAgent
에서 재정의하는 runAsyncImpl
메서드입니다.
- 시그니처:
protected Flowable<Event> runAsyncImpl(InvocationContext ctx)
- 반응형 스트림 (
Flowable
):io.reactivex.rxjava3.core.Flowable<Event>
를 반환해야 합니다. 이Flowable
은 사용자 정의 에이전트의 로직에 의해 생성될 이벤트 스트림을 나타내며, 종종 하위 에이전트의 여러Flowable
을 결합하거나 변환하여 생성됩니다. ctx
(InvocationContext): 중요한 런타임 정보, 특히java.util.concurrent.ConcurrentMap<String, Object>
인ctx.session().state()
에 대한 접근을 제공합니다. 이것이 사용자 정의 에이전트가 조정하는 단계 간에 데이터를 공유하는 기본 방법입니다.
핵심 비동기 메서드 내의 주요 기능:
-
하위 에이전트 호출:
run_async
메서드를 사용하여 하위 에이전트(일반적으로self.my_llm_agent
와 같은 인스턴스 속성으로 저장됨)를 호출하고 해당 이벤트를 생성합니다: -
상태 관리: 세션 상태 사전(
ctx.session.state
)에서 읽고 써서 하위 에이전트 호출 간에 데이터를 전달하거나 결정을 내립니다: -
제어 흐름 구현: 표준 Python 구문(
if
/elif
/else
,for
/while
루프,try
/except
)을 사용하여 하위 에이전트를 포함하는 정교하고 조건부 또는 반복적인 워크플로를 만듭니다.
-
하위 에이전트 호출: 비동기 실행 메서드를 사용하여 하위 에이전트(일반적으로 인스턴스 속성 또는 객체로 저장됨)를 호출하고 해당 이벤트 스트림을 반환합니다:
일반적으로
concatWith
,flatMapPublisher
또는concatArray
와 같은 RxJava 연산자를 사용하여 하위 에이전트의Flowable
을 연결합니다.// 예제: 하나의 하위 에이전트 실행 // return someSubAgent.runAsync(ctx); // 예제: 하위 에이전트를 순차적으로 실행 Flowable<Event> firstAgentEvents = someSubAgent1.runAsync(ctx) .doOnNext(event -> System.out.println("에이전트 1의 이벤트: " + event.id())); Flowable<Event> secondAgentEvents = Flowable.defer(() -> someSubAgent2.runAsync(ctx) .doOnNext(event -> System.out.println("에이전트 2의 이벤트: " + event.id())) ); return firstAgentEvents.concatWith(secondAgentEvents);
Flowable.defer()
는 실행이 이전 단계의 완료 또는 상태에 따라 달라지는 후속 단계에 종종 사용됩니다. -
상태 관리: 세션 상태에서 읽고 써서 하위 에이전트 호출 간에 데이터를 전달하거나 결정을 내립니다. 세션 상태는
ctx.session().state()
를 통해 얻는java.util.concurrent.ConcurrentMap<String, Object>
입니다.// 이전 에이전트가 설정한 데이터 읽기 Object previousResult = ctx.session().state().get("some_key"); // 상태에 따라 결정 내리기 if ("some_value".equals(previousResult)) { // ... 특정 하위 에이전트의 Flowable을 포함하는 로직 ... } else { // ... 다른 하위 에이전트의 Flowable을 포함하는 로직 ... } // 나중 단계를 위해 결과 저장 (종종 하위 에이전트의 output_key를 통해 수행됨) // ctx.session().state().put("my_custom_result", "calculated_value");
-
제어 흐름 구현: 표준 언어 구문(
if
/else
, 루프,try
/catch
)을 반응형 연산자(RxJava)와 결합하여 정교한 워크플로를 만듭니다.- 조건부: 조건에 따라 구독할
Flowable
을 선택하기 위한Flowable.defer()
또는 스트림 내에서 이벤트를 필터링하는 경우filter()
. - 반복:
repeat()
,retry()
와 같은 연산자 또는 조건에 따라 자체의 일부를 재귀적으로 호출하는 반응형 체인을 구조화하여(종종flatMapPublisher
또는concatMap
으로 관리됨).
- 조건부: 조건에 따라 구독할
하위 에이전트 및 상태 관리¶
일반적으로 사용자 정의 에이전트는 다른 에이전트(예: LlmAgent
, LoopAgent
등)를 조율합니다.
- 초기화: 일반적으로 이러한 하위 에이전트의 인스턴스를 사용자 정의 에이전트의 생성자에 전달하고 인스턴스 필드/속성(예:
this.story_generator = story_generator_instance
또는self.story_generator = story_generator_instance
)으로 저장합니다. 이렇게 하면 사용자 정의 에이전트의 핵심 비동기 실행 로직(예:_run_async_impl
메서드) 내에서 접근할 수 있습니다. - 하위 에이전트 목록:
super()
생성자를 사용하여BaseAgent
를 초기화할 때sub agents
목록을 전달해야 합니다. 이 목록은 이 사용자 정의 에이전트의 즉각적인 계층 구조의 일부인 에이전트에 대해 ADK 프레임워크에 알립니다. 핵심 실행 로직(_run_async_impl
)이self.xxx_agent
를 통해 에이전트를 직접 호출하더라도 수명 주기 관리, 내부 검사 및 잠재적인 향후 라우팅 기능과 같은 프레임워크 기능에 중요합니다. 사용자 정의 로직이 최상위 수준에서 직접 호출하는 에이전트를 포함하세요. - 상태: 언급했듯이,
ctx.session.state
는 하위 에이전트(특히output key
를 사용하는LlmAgent
s)가 오케스트레이터에게 결과를 전달하고 오케스트레이터가 필요한 입력을 아래로 전달하는 표준 방법입니다.
디자인 패턴 예제: StoryFlowAgent
¶
사용자 정의 에이전트의 힘을 조건부 로직을 사용한 다단계 콘텐츠 생성 워크플로 예제 패턴으로 설명해 보겠습니다.
목표: 이야기를 생성하고, 비평과 수정을 통해 반복적으로 개선하며, 최종 확인을 수행하고, 결정적으로 최종 톤 확인에 실패하면 이야기를 다시 생성하는 시스템을 만듭니다.
왜 사용자 정의인가? 여기서 사용자 정의 에이전트가 필요한 핵심 요구 사항은 톤 확인 결과에 따른 조건부 재생성입니다. 표준 워크플로 에이전트에는 하위 에이전트 작업의 결과에 따른 내장된 조건부 분기 기능이 없습니다. 오케스트레이터 내에 사용자 정의 로직(if tone == "negative": ...
)이 필요합니다.
1부: 간소화된 사용자 정의 에이전트 초기화¶
BaseAgent
를 상속하는 StoryFlowAgent
를 정의합니다. __init__
에서 필요한 하위 에이전트(전달됨)를 인스턴스 속성으로 저장하고 이 사용자 정의 에이전트가 직접 조율할 최상위 에이전트에 대해 BaseAgent
프레임워크에 알립니다.
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
BaseAgent
를 확장하여 StoryFlowAgentExample
을 정의합니다. 생성자에서 필요한 하위 에이전트 인스턴스(매개변수로 전달됨)를 인스턴스 필드로 저장합니다. 이 사용자 정의 에이전트가 직접 조율할 이러한 최상위 하위 에이전트는 목록으로 BaseAgent
의 super
생성자에도 전달됩니다.
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
2부: 사용자 정의 실행 로직 정의¶
이 메서드는 표준 Python async/await 및 제어 흐름을 사용하여 하위 에이전트를 조율합니다.
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
- 초기
story_generator
가 실행됩니다. 출력은ctx.session.state["current_story"]
에 있을 것으로 예상됩니다. loop_agent
가 실행되어 내부적으로critic
과reviser
를max_iterations
횟수만큼 순차적으로 호출합니다. 이들은 상태에서current_story
와criticism
을 읽고 씁니다.sequential_agent
가 실행되어grammar_check
를 호출한 다음tone_check
를 호출하여current_story
를 읽고grammar_suggestions
와tone_check_result
를 상태에 씁니다.- 사용자 정의 부분:
if
문은 상태에서tone_check_result
를 확인합니다. "negative"이면story_generator
가 다시 호출되어 상태의current_story
를 덮어씁니다. 그렇지 않으면 흐름이 종료됩니다.
runAsyncImpl
메서드는 비동기 제어 흐름을 위해 RxJava의 Flowable 스트림과 연산자를 사용하여 하위 에이전트를 조율합니다.
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
- 초기
storyGenerator.runAsync(invocationContext)
Flowable이 실행됩니다. 출력은invocationContext.session().state().get("current_story")
에 있을 것으로 예상됩니다. loopAgent
의 Flowable이 다음에 실행됩니다(Flowable.concatArray
및Flowable.defer
로 인해). LoopAgent는critic
및reviser
하위 에이전트를maxIterations
까지 순차적으로 호출합니다. 이들은 상태에서current_story
와criticism
을 읽고 씁니다.- 그런 다음
sequentialAgent
의 Flowable이 실행됩니다.grammar_check
를 호출한 다음tone_check
를 호출하여current_story
를 읽고grammar_suggestions
와tone_check_result
를 상태에 씁니다. - 사용자 정의 부분: sequentialAgent가 완료된 후
Flowable.defer
내의 로직이invocationContext.session().state()
에서 "tone_check_result"를 확인합니다. "negative"이면storyGenerator
Flowable이 조건부로 연결되어 다시 실행되어 "current_story"를 덮어씁니다. 그렇지 않으면 빈 Flowable이 사용되고 전체 워크플로가 완료로 진행됩니다.
3부: LLM 하위 에이전트 정의하기¶
이들은 특정 작업을 담당하는 표준 LlmAgent
정의입니다. output key
매개변수는 결과를 다른 에이전트나 사용자 정의 오케스트레이터가 접근할 수 있는 session.state
에 배치하는 데 중요합니다.
GEMINI_2_FLASH = "gemini-2.0-flash" # 모델 상수 정의
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
4부: 사용자 정의 에이전트 인스턴스화 및 실행¶
마지막으로, StoryFlowAgent
를 인스턴스화하고 평소와 같이 Runner
를 사용합니다.
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
# --- Setup Runner and Session ---
session_service = InMemorySessionService()
initial_state = {"topic": "a brave kitten exploring a haunted house"}
session = session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID,
state=initial_state # Pass initial state here
)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
# --- Function to Interact with the Agent ---
def call_agent(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
current_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
if not current_session:
logger.error("Session not found!")
return
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
call_agent("a lonely robot finding a friend in a junkyard")
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
(참고: 가져오기 및 실행 로직을 포함한 전체 실행 가능한 코드는 아래에 링크되어 있습니다.)
전체 코드 예제¶
Storyflow 에이전트
# StoryFlowAgent 예제의 전체 실행 가능한 코드
import logging
from typing import AsyncGenerator
from typing_extensions import override
from google.adk.agents import LlmAgent, BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.invocation_context import InvocationContext
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from pydantic import BaseModel, Field
# --- Constants ---
APP_NAME = "story_app"
USER_ID = "12345"
SESSION_ID = "123344"
GEMINI_2_FLASH = "gemini-2.0-flash"
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Custom Orchestrator Agent ---
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
# --- Setup Runner and Session ---
session_service = InMemorySessionService()
initial_state = {"topic": "a brave kitten exploring a haunted house"}
session = session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID,
state=initial_state # Pass initial state here
)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
# --- Function to Interact with the Agent ---
def call_agent(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
current_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
if not current_session:
logger.error("Session not found!")
return
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
call_agent("a lonely robot finding a friend in a junkyard")
# StoryFlowAgent 예제의 전체 실행 가능한 코드
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.InvocationContext;
import com.google.adk.agents.LoopAgent;
import com.google.adk.agents.SequentialAgent;
import com.google.adk.events.Event;
import com.google.adk.runner.InMemoryRunner;
import com.google.adk.sessions.Session;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
public class StoryFlowAgentExample extends BaseAgent {
// --- Constants ---
private static final String APP_NAME = "story_app";
private static final String USER_ID = "user_12345";
private static final String SESSION_ID = "session_123344";
private static final String MODEL_NAME = "gemini-2.0-flash"; // Ensure this model is available
private static final Logger logger = Logger.getLogger(StoryFlowAgentExample.class.getName());
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
public static void main(String[] args) {
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
StoryFlowAgentExample storyFlowAgentExample =
new StoryFlowAgentExample(APP_NAME, storyGenerator, loopAgent, sequentialAgent);
// --- Run the Agent ---
runAgent(storyFlowAgentExample, "a lonely robot finding a friend in a junkyard");
}
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
private boolean isStoryGenerated(InvocationContext ctx) {
Object currentStoryObj = ctx.session().state().get("current_story");
return currentStoryObj != null && !String.valueOf(currentStoryObj).isEmpty();
}
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
@Override
protected Flowable<Event> runLiveImpl(InvocationContext invocationContext) {
return Flowable.error(new UnsupportedOperationException("runLive not implemented."));
}
}