Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm

Example: a customer service chatbot project

Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm
Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm. Image by DALL-E-3

Happy Lunar New Year, my friends!

In the last article, I introduced the Workflow framework of LlamaIndex.

Deep Dive into LlamaIndex Workflow: Event-driven LLM architecture
What I think about the progress and shortcomings after practice

Today, I will show you how to use LlamaIndex Workflow to implement a multi-agent orchestration feature similar to OpenAI Swarm, using a customer service chatbot project as an example.

Introduction

Remember the Swarm framework released by OpenAI not long ago? Its biggest feature is agents and handoffs.

The agents are straightforward: they use a set of specific commands and tools to get tasks done. It's like putting a LLM function call into a neat package.

And handoffs are different. They allow an agent to pass the work to another agent seamlessly based on the context of the current conversation, making agents work together without any hiccups.

Why this is important

Let's look at a diagram explaining the whole process of a ReactAgent.

The ReactAgent needs at least three accesses to LLM to complete.
The ReactAgent needs at least three accesses to LLM to complete. Image by Author

Just a simple agent call, like one, two, three, needs at least three accesses to LLM to complete.

Traditional agent applications are like this, keeping conversation context and user state, and the agent call chain is usually fixed. For each user request, agents have to call LLM multiple times to check the state, and honestly, some calls are unnecessary.

Here's an example: imagine we have an e-commerce website, and we need a customer service team to answer users' questions.

In an agent chain, agents are invoked sequentially. Image by Author

In a chain agent application, every question from a user goes to the front desk, and then the front desk asks for the pre-sales service. If they can't answer, the front desk asks for after-sales service, and then the front desk reorganizes the answers from the backend and replies to the customer.

Isn't that silly? Look at all the unnecessary delays and call costs it causes!

How Swarm does it

Swarm uses a handoff approach that fits the real world better. Let me use that customer service example again:

Agent handoff allows you to interact directly with the corresponding customer service.
Agent handoff allows you to interact directly with the corresponding customer service. Image by Author

Imagine a store called Swarm. When a customer asks the front desk a question, the front desk figures out what kind of question it is (pre-sale or after-sale) and passes the customer to the corresponding service. Then, the customer talks to that service directly.

Sounds reasonable, right? So why don't we just use Swarm?

Why not just use Swarm

Because Swarm is still just an experimental framework. According to the official statement:

Swarm is currently an experimental sample framework intended to explore ergonomic interfaces for multi-agent systems. It is not intended to be used in production and therefore has no official support. (This also means we will not be reviewing PRs or issues!)

So, we can't use Swarm directly in production systems.

But what we need is the agent handoff capability, right? Since that's the case, why not build a similar framework yourself?

Today's article is written for this purpose. We will develop a project using a customer service system as an example, which will use Workflow to implement agent orchestration and handoff capabilities. Let's get started.


Project in Practice: A Customer Service Chatbot with Agent Handoff Capability

This project is quite complex. To help you understand my implementation, I have put the entire project code at the end of the article. You can freely read and modify it without my permission.

💡
Want to know more about my work in LLM applications or the field of data science? Feel free to subscribe to my personal blog, everything is free!

Step one, set up an interactive interface

Whether you use an agent or not, you always need to adjust your prompts and code logic. At this point, a what-you-see-is-what-you-get chat UI becomes very important.

In this section, I'll use chainlit to quickly implement a super cool web-based chat window.

Chainlit is a Python library built on Streamlit. This means you don't need any frontend skills to quickly build a Chatbot prototype. (Hooray)

Let's get moving.

The scaffold of our project.
The scaffold of our project. Image by Author

First, we create a .env file in the project's root directory, which stores important environmental variables like OPENAI_API_KEY and OPENAI_BASE_URL. Later, I will use dotenv to read it.

This is important because by using the .env file, you can strip the API_KEY from your code, then you can freely publish your code.

Next, we need to set up a simple project scaffold. Our project will contain two folders: src and data. Our Python source code files will be placed in the src folder, while text source files for RAG use will be placed in the data folder.

In the src directory, first create an app.py file, which will act as the view to launch the chainlit interface. This file consists of three parts:

  1. Code to prepare the Workflow program.
  2. Code to respond to the user lifecycle, outputting intermediate processes.
  3. Actual code to call the Workflow agent and conduct the conversation.

The code flowchart is shown below:

Flowchart of the project UI interface.
Flowchart of the project UI interface. Image by Author

As a production-ready system, we often need to connect to the enterprise-private deployment of large model ports. How to connect to a private large model can be referred to in this article.

How to Connect LlamaIndex with Private LLM API Deployments
When your enterprise doesn’t use public models like OpenAI

To make our customer service less rigid, we can set the temperature a bit higher. Here is the code for initializing the system environment, I will talk about the implementation of CustomerService later:

llm = OpenAILike(
    model="qwen-max-latest",
    is_chat_model=True,
    is_function_calling_model=True,
    temperature=0.35
)
Settings.llm = llm

Imagine, when the next customer service takes over to answer your question, what will she do first? Right, she needs to check the conversation history first.

So we need to create a unique, conversation-context and user-state-preserving workflow for each distinguished user in the user session:

GREETINGS = "Hello, what can I do for you?"


def ready_my_workflow() -> CustomerService:
    memory = ChatMemoryBuffer(
        llm=llm,
        token_limit=5000
    )

    agent = CustomerService(
        memory=memory,
        timeout=None,
        user_state=initialize_user_state()
    )
    return agent


def initialize_user_state() -> dict[str, str | None]:
    return {
        "name": None
    }


@cl.on_chat_start
async def start():
    workflow = ready_my_workflow()
    cl.user_session.set("workflow", workflow)

    await cl.Message(
        author="assistant", content=GREETINGS
    ).send()

At the same time, I will also use chainlit's cl.step decorator to implement a simple logging method, which can help us output some process logs on the page, letting users know where we are now:

@cl.step(type="run", show_input=False)
async def on_progress(message: str):
    return message

Then there is the main method, which is called every round of conversation.

@cl.on_message
async def main(message: cl.Message):
    workflow: CustomerService = cl.user_session.get("workflow")
    context = cl.user_session.get("context")
    msg = cl.Message(content="", author="assistant")
    user_msg = message.content
    handler = workflow.run(
        msg=user_msg,
        ctx=context
    )
    async for event in handler.stream_events():
        if isinstance(event, ProgressEvent):
            await on_progress(event.msg)

    await msg.send()
    result = await handler
    msg.content = result
    await msg.update()
    cl.user_session.set("context", handler.ctx)

In this method, we first get the user-inputted dialogue, then call the workflow's run method to start the agent routing, while iterating through the events in the workflow pipeline and calling on_progress to output to the page. Finally, we output the result of the dialogue on the page and update the Context.

To match the construction of the chainlit interface, we can first write a simple workflow:

class CustomerService(Workflow):
    def __init__(
            self,
            llm: OpenAILike | None = None,
            memory: ChatMemoryBuffer = None,
            user_state: dict[str, str | None] = None,
            *args,
            **kwargs
    ):
        self.llm = llm or Settings.llm
        self.memory = memory or ChatMemoryBuffer()
        self.user_state = user_state
        super().__init__(*args, **kwargs)

    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="We're making some progress."))
        return StopEvent(result="Hello World")

Tada, our interactive interface is out:

Our UI interface for this project.
Our UI interface for this project. Image by Author

Next, we can start preparing the ingredients for today, and text source files for RAG use.

Step two, generate text files

Since this project is about simulating a customer support team for an online drone e-commerce website, I plan to set the background to an online unmanned aerial vehicle e-commerce site.

I need two files: one file to introduce the drones being sold in the store and their details. Another file contains common FAQs about drone use and after-sales terms.

To avoid business and data licensing issues, I plan to use LLM to generate the text I want. I specifically instructed LLM not to include any brands or real product information.

Here is a screenshot of my file generation:

Screenshot of data file generated using LLM.
Screenshot of data file generated using LLM. Image by Author

You can take my prompt as a reference:

SKUS_TEMPLATE_EN = """
    You are the owner of an online drone store, please generate a description in English of all the drones for sale.
    Include the drone model number, selling price, detailed specifications, and a detailed description in more than 400 words.
    Do not include brand names.
    No less than 20 types of drones, ranging from consumer to industrial use.
"""
TERMS_TEMPLATE_EN = """
    You are the head of a brand's back office department, and you are asked to generate a standardized response to after-sales FAQs in English that is greater than 25,000 words.
    The text should include common usage questions, as well as questions related to returns and repairs after the sale.
    This text will be used as a reference for the customer service team when answering customer questions about after-sales issues.
    Only the body text is generated, no preamble or explanation is added.
"""

Step three, handle indexing and retrieve privatized data

The foundation LLM does not contain corporate internal data. For enterprise applications, it is inevitable to use RAG to allow the LLM to access corporate privatized data.

Our drone store is no exception. Before letting the agent staff start work, we need to provide them with some tools to access the product catalog and after-sales policy.

LlamaIndex provides many indexes suitable for different occasions. If used in a real system, I would prefer to use KnowledgeGraphIndex for product information text.

However, to make the sample project easy to understand, I still choose to use chromadb and VectorStoreIndex:

def get_index(collection_name: str,
              files: list[str]) -> VectorStoreIndex:
    chroma_client = chromadb.PersistentClient(path="temp/.chroma")

    collection = chroma_client.get_or_create_collection(collection_name)
    vector_store = ChromaVectorStore(chroma_collection=collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)

    ready = collection.count()
    if ready > 0:
        print("File already loaded")
        index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
    else:
        print("File not loaded.")
        docs = SimpleDirectoryReader(input_files=files).load_data()
        index = VectorStoreIndex.from_documents(
            docs, storage_context=storage_context, embed_model=embed_model,
            transformer=[SentenceSplitter(chunk_size=512, chunk_overlap=20)]
        )

    return index


INDEXES = {
    "SKUS": get_index("skus_docs", ["data/skus_en.txt"]),
    "TERMS": get_index("terms_docs", ["data/terms_en.txt"])
}

The running flowchart of this code is as follows:

The running flowchart of the code.
The running flowchart of the code. Image by Author

If vector data already exists, return the index directly. If the data has not been loaded yet, first load the data into the vector store, then return the index.

Then we add a tool method to help the agent get the corresponding retriever:

async def query_docs(
        index: VectorStoreIndex, query: str,
        similarity_top_k: int = 1
) -> str:
    retriever = index.as_retriever(similarity_top_k=similarity_top_k)
    nodes = await retriever.aretrieve(query)
    result = ""
    for node in nodes:
        result += node.get_content() + "\n\n"
    return result

Step four, hire a few agents

Since we are building a smart customer service project, it is necessary to hire a few customer service agents.

We also need to set up a data class for agents, which needs to include instructions and a set of tools, just like Swarm agents. Here we use AgentConfig to constrain, which is inherited from Pydantic's BaseModel.

class AgentConfig(BaseModel):
    """
    Detailed configuration for an agent
    """
    model_config = ConfigDict(arbitrary_types_allowed=True)
    name: str = Field(description="agent name")
    description: str = Field(
        description="agent description, which describes what the agent does"
    )
    system_prompt: str | None = None
    tools: list[BaseTool] | None = Field(
        description="function tools available for this agent"
    )

We need to hire a lobby manager agent, this agent needs to mark which agent the user will be handed over to.

class TransferToAgent(BaseModel):
    """Used to explain which agent to transfer to next."""
    agent_name: str = Field(description="The name of the agent to transfer to.")

We also need to design a request transfer agent, which uses this agent to notify the workflow when a certain customer service cannot answer the user's question.

class RequestTransfer(BaseModel):
    """
    Used to indicate that you don't have the necessary permission to complete the user's request,
    or that you've already completed the user's request and want to transfer to another agent.
    """
    pass

Then we need to prepare a few tools for the agents to use:

First is a login tool, this tool is only used to register the user's name. If you need to handle the user's login action, you can implement the details in this method. I use a closure to return a tool list.

def get_authentication_tools() -> list[BaseTool]:
    async def login(ctx: Context, username: str) -> bool:
        """When the user provides their name, you can use this method to update their status.。
        :param username The user's title or name.
        """
        if not username:
            return False

        user_state = await ctx.get("user_state", None)
        user_state["name"] = username.strip()
        await ctx.set("user_state", user_state)
        return True

    return [FunctionToolWithContext.from_defaults(async_fn=login)]

Here is a detail, since the tool needs to handle the user state saved in the workflow Context, we need to access the ctx object. But when the tool is called by the agent, the agent cannot sense the ctx object, so we need to let the agent ignore it.

Here I modified the behavior of the FunctionTool module of LlamaIndex and rewrote the FunctionToolWithContext module. To save time, I referred to an example on the official website, you can find it here. Of course, you can also find the source code at the end of the project code in the article.

We also need tools to get the product catalog and after-sales terms, these two tools are direct calls to the retriever, quite simple.

def get_pre_sales_tools() -> list[BaseTool]:
    async def skus_info_retrieve(ctx: Context, query: str) -> str:
        """
        When the user asks about a product, you can use this tool to look it up.
        :param query: The user's request.
        :return: The information found.
        """
        sku_info = await query_docs(INDEXES["SKUS"], query)
        return sku_info

    return [FunctionToolWithContext.from_defaults(async_fn=skus_info_retrieve)]


def get_after_sales_tools() -> list[BaseTool]:
    async def terms_info_retrieve(ctx: Context, query: str) -> str:
        """
        When the user asks about how to use a product, or about after-sales and repair options, you can use this tool to look it up.
        :param query: The user's request.
        :return: The information found.
        """
        terms_info = await query_docs(INDEXES["TERMS"], query)
        return terms_info
    return [FunctionToolWithContext.from_defaults(async_fn=terms_info_retrieve)]

To sum up, we need three professional customer service agents:

  1. The first one is the front desk, used to register the user's visit.
  2. The second one is the pre-sales service, used to recommend various products to users.
  3. The third one is the after-sales service, used to answer various usage questions and after-sales terms.
def _get_agent_configs() -> list[AgentConfig]:
    return [
        AgentConfig(
            name="Authentication Agent",
            description="Record the user's name. If there's no name, you need to ask this from the customer.",
            system_prompt="""
            You are a front desk customer service agent for registration.
            If the user hasn't provided their name, you need to ask them.
            When the user has other requests, transfer the user's request.
            """,
            tools=get_authentication_tools()
        ),
        AgentConfig(
            name="Pre Sales Agent",
            description="When the user asks about product information, you need to consult this customer service agent.",
            system_prompt="""
            You are a customer service agent answering pre-sales questions for customers.
            You will respond to users' inquiries based on the context of the conversation.
            
            When the context is not enough, you will use tools to supplement the information.
            You can only handle user inquiries related to product pre-sales. 
            Please use the RequestTransfer tool to transfer other user requests.
            """,
            tools=get_pre_sales_tools()
        ),
        AgentConfig(
            name="After Sales Agent",
            description="When the user asks about after-sales information, you need to consult this customer service agent.",
            system_prompt="""
            You are a customer service agent answering after-sales questions for customers, including how to use the product, return and exchange policies, and repair solutions.
            You respond to users' inquiries based on the context of the conversation.
            When the context is not enough, you will use tools to supplement the information.
            You can only handle user inquiries related to product after-sales. 
            Please use the RequestTransfer tool to transfer other user requests.
            """,
            tools=get_after_sales_tools()
        )
    ]

According to the needs of the workflow, we also need to write two methods to register agents to the workflow.

def get_agent_config_pair() -> dict[str, AgentConfig]:
    agent_configs = _get_agent_configs()
    return {agent.name: agent for agent in agent_configs}


def get_agent_configs_str() -> str:
    agent_configs = _get_agent_configs()
    pair_list = [f"{agent.name}: {agent.description}" for agent in agent_configs]
    return "\n".join(pair_list)

Finally, we also need a workflow system_prompt for orchestration, which will contain all agent information and user status, and hand over the user to the correct customer service agent when needed.

This prompt will be used directly by the workflow, no separate agent is needed, so just put the prompt here:

ORCHESTRATION_PROMPT = """  
    You are a customer service manager for a drone store.
    Based on the user's current status, latest request, and the available customer service agents, you help the user decide which agent to consult next.

    You don't focus on the dependencies between agents; the agents will handle those themselves.
    If the user asks about something unrelated to drones, you should politely and briefly decline to answer.

    Here is the list of available customer service agents:
    {agent_configs_str}

    Here is the user's current status:
    {user_state_str}
"""

Step five, build the core workflow

After so much preparation, we can finally get to the main course, and I'm sure everyone is eager to get started haha.

Since the workflow is an event-driven framework, we need to define several events like before:

class OrchestrationEvent(Event):
    query: str


class ActiveSpeakerEvent(Event):
    query: str


class ToolCallEvent(Event):
    tool_call: ToolSelection
    tools: list[BaseTool]


class ToolCallResultEvent(Event):
    chat_message: ChatMessage


class ProgressEvent(Event):
    msg: str
  1. OrchestrationEvent to indicate that the workflow needs to transfer agents.
  2. After the agent transfer is completed, ActiveSpeakerEvent will tell the workflow to use the new agent to answer the user.
  3. If the agent needs to make a Function call, ToolCallEvent will be thrown to execute concurrently.
  4. The results of concurrent execution will be thrown out with ToolCallResultEvent, and summarized into the final result.
  5. Finally, we also need a ProgressEvent to stream the intermediate steps, making it easy for users to know where we are now. To avoid too much information interference, we only output the information of agent transfer here.

After defining various events, we need to start writing the workflow. The workflow this time is a bit complex, so to make it easier for everyone to understand, I still drew a flowchart:

The flowchart of our workflow.
The flowchart of our workflow. Image by Author

First, let's look at the start method. The start method is relatively simple, as the entry method for user dialogue, it is responsible for storing the user's message in ChatMemory, and then judging whether there is an available agent currently, if there is, it throws an ActiveSpeakerEvent, entering the next step, if not, it throws an OrchestrationEvent, entering the agent orchestration.

The flowchart of the start method.
The flowchart of the start method. Image by Author
class CustomerService(Workflow):
    ...
    
    @step
    async def start(
            self, ctx: Context, ev: StartEvent
    ) -> ActiveSpeakerEvent | OrchestrationEvent:
        self.memory.put(ChatMessage(
            role="user",
            content=ev.msg
        ))
        user_state = await ctx.get("user_state", None)
        if not user_state:
            await ctx.set("user_state", self.user_state)

        user_msg = ev.msg
        active_speaker = await ctx.get("active_speaker", default=None)

        if active_speaker:
            return ActiveSpeakerEvent(query=user_msg)
        else:
            return OrchestrationEvent(query=user_msg)

From shallow to deep, let's look at how the agent orchestration orchestrate method works:

The flowchart of the orchestrate method.
The flowchart of the orchestrate method. Image by Author

This method first takes the currently available agent and user state, and updates our ORCHESTRATION_PROMPT written in agents.py, thus getting the complete system_prompt.

Then we use TransferToAgent, system_prompt, and chat_history all to LLM, letting LLM judge which agent to hand over to next.

class CustomerService(Workflow):
    ...

    @step
    async def orchestrate(
            self, ctx: Context, ev: OrchestrationEvent
    ) -> ActiveSpeakerEvent | StopEvent:
        chat_history = self.memory.get()
        user_state_str = await self._get_user_state_str(ctx)
        system_prompt = ORCHESTRATION_PROMPT.format(
            agent_configs_str=get_agent_configs_str(),
            user_state_str=user_state_str
        )
        messages = [ChatMessage(role="system", content=system_prompt)] + chat_history
        tools = [get_function_tool(TransferToAgent)]
        event, tool_calls, _ = await self.achat_to_tool_calls(ctx, tools, messages)
        if event is not None:
            return event
        tool_call = tool_calls[0]
        selected_agent = tool_call.tool_kwargs["agent_name"]
        await ctx.set("active_speaker", selected_agent)
        ctx.write_event_to_stream(
            ProgressEvent(msg=f"In step orchestrate:\nTransfer to agent: {selected_agent}")
        )
        return ActiveSpeakerEvent(query=ev.query)

After getting the latest agent, we update the context and throw an ActiveSpeakerEvent.

We also need to define achat_to_tool_calls and _get_user_state_str as these two tool methods. achat_to_tool_calls method is responsible for getting the currently needed tools from LLM. _get_user_state_str is used to convert the user state into a string.

class CustomerService(Workflow):
    ...

    async def achat_to_tool_calls(self,
                              ctx: Context,
                              tools: list[FunctionTool],
                              chat_history: list[ChatMessage]
    ) -> tuple[StopEvent | None, list[ToolSelection], ChatResponse]:
    response = await self.llm.achat_with_tools(tools, chat_history=chat_history)
    tool_calls: list[ToolSelection] = self.llm.get_tool_calls_from_response(
        response=response, error_on_no_tool_call=False
    )
    stop_event = None
    if len(tool_calls) == 0:
        await self.memory.aput(response.message)
        stop_event = StopEvent(
            result=response.message.content
        )
    return stop_event, tool_calls, response

    @staticmethod
    async def _get_user_state_str(ctx: Context) -> str:
        user_state = await ctx.get("user_state", None)
        user_state_list = [f"{k}: {v}" for k, v in user_state.items()]
        return "\n".join(user_state_list)

After studying the Orchestrate branch, let's see how the ActiveSpeaker branch works, which is the speak_with_sub_agent method:

The flowchart of the speak_with_sub_agent method.
The flowchart of the speak_with_sub_agent method. Image by Author

This method first gets the current service-providing agent, as well as chat_history and user_state.

Then use the current agent's sys_prompt and tools, as well as chat_history, to let LLM judge which tools to call next.

class CustomerService(Workflow):
    ...

    @step
    async def speak_with_sub_agent(
            self, ctx: Context, ev: ActiveSpeakerEvent
    ) -> OrchestrationEvent | ToolCallEvent | StopEvent:
        active_speaker = await ctx.get("active_speaker", default="")
        agent_config: AgentConfig = get_agent_config_pair()[active_speaker]
        chat_history = self.memory.get()
        user_state_str = await self._get_user_state_str(ctx)

        system_prompt = (
                agent_config.system_prompt.strip()
                + f"\n\n<user state>:\n{user_state_str}"
        )
        llm_input = [ChatMessage(role="system", content=system_prompt)] + chat_history
        tools = [get_function_tool(RequestTransfer)] + agent_config.tools
        event, tool_calls, response = await self.achat_to_tool_calls(ctx, tools, llm_input)

        if event is not None:
            return event
        await ctx.set("num_tool_calls", len(tool_calls))
        for tool_call in tool_calls:
            if tool_call.tool_name == "RequestTransfer":
                await ctx.set("active_speaker", None)
                ctx.write_event_to_stream(
                    ProgressEvent(msg="The agent is requesting a transfer, please hold on...")
                )
                return OrchestrationEvent(query=ev.query)
            else:
                ctx.send_event(
                    ToolCallEvent(tool_call=tool_call, tools=agent_config.tools)
                )
        await self.memory.aput(response.message)

It is important to note that although the sample project is relatively simple, each agent's tools only have one, but in actual projects, there are often multiple tools to be called concurrently. So we need to iterate through tool_calls, throwing ToolCallEvent separately.

At the same time, we also need to consider the situation where the current agent may not be able to handle the user's request, thus calling RequestTransfer, then we need to go back to the orchestrate step and re-select the agent.

Let's look at the handle_tool_calls section, the code in this section looks a lot, but the actual thing to do is very simple, just get the tool to be executed and execute it, so simple that I don't even want to draw a flowchart.

class CustomerService(Workflow):
    ...

    @step(num_workers=4)
    async def handle_tool_calls(
            self, ctx: Context, ev: ToolCallEvent
    ) -> ToolCallResultEvent:
        tool_call = ev.tool_call
        tools_by_name = {tool.metadata.get_name(): tool for tool in ev.tools}
        tool_msg = None
        tool = tools_by_name[tool_call.tool_name]
        additional_kwargs = {
            "tool_call_id": tool_call.tool_id,
            "name": tool.metadata.get_name()
        }
        if not tool:
            tool_msg = ChatMessage(
                role="tool",
                content=f"Tool {tool_call.tool_name} does not exists.",
                additional_kwargs=additional_kwargs
            )
            return ToolCallResultEvent(chat_message=tool_msg)

        try:
            if isinstance(tool, FunctionToolWithContext):
                tool_output = await tool.acall(ctx, **tool_call.tool_kwargs)
            else:
                tool_output = await tool.acall(**tool_call.tool_kwargs)

            tool_msg = ChatMessage(
                role="tool",
                content=tool_output.content,
                additional_kwargs=additional_kwargs
            )
        except Exception as e:
            tool_msg = ChatMessage(
                role="tool",
                content=f"Encountered error in tool call: {e}",
                additional_kwargs=additional_kwargs
            )
        return ToolCallResultEvent(chat_message=tool_msg)

There is a small detail here, I set a parameter num_workers=4 for the step decorator. This is to tell the workflow that the concurrency is only up to 4, to avoid too high concurrency causing downstream system blockage.

Then we come to the last method aggregate_too_results.

class CustomerService(Workflow):
    ...

    @step
    async def aggregate_tool_results(
            self, ctx: Context, ev: ToolCallResultEvent
    ) -> ActiveSpeakerEvent | None:
        num_tool_calls = await ctx.get("num_tool_calls")
        results = ctx.collect_events(ev, [ToolCallResultEvent] * num_tool_calls)
        if not results:
            return None

        for result in results:
            await self.memory.aput(result.chat_message)
        return ActiveSpeakerEvent(query="")

This method is relatively simple, just get all the execution results of tool_calls, then write them into ChatMemory, and finally hand them back to the agent to evaluate the results and answer the user.

Step six, check our hard work

At this point in the plot, our customer service team has been built, let's check if these agents are working hard. Start!

chainlit run src/app.py

Not bad, the front desk agent first asks for my name, simulating the login process, then based on my needs, hands it over to the pre-sales agent:

When my request was handed off to the pre-sales agent.
When my request was handed off to the pre-sales agent. Image by Author

It can also be transferred to the after-sales agent based on my request:

When my request was handed off to the aftermarket agent.
When my request was handed off to the aftermarket agent. Image by Author

I can even play with them all day!


Project Source Code

The code for this project is stored below, everyone can use or modify it without my permission: