Deep Dive into LlamaIndex Workflow: Event-driven LLM architecture

What I think about the progress and shortcomings after practice

Deep Dive into LlamaIndex Workflows: Event-driven LLM architecture.
Deep Dive into LlamaIndex Workflows: Event-driven LLM architecture. Image by DALL-E-3

Recently, LlamaIndex introduced a new feature called Workflow in one of its versions, providing event-driven and logic decoupling capabilities for LLM applications.

In today's article, we'll take a deep dive into this feature through a practical mini-project, exploring what's new and still lacking. Let's get started.


Introduction

Why event-driven?

More and more LLM applications are shifting towards intelligent agent architectures, expecting LLMs to meet user requests through calling different APIs or multiple iterative calls.

This shift, however, brings a problem: as agent applications make more API calls, program responses slow down and code logic becomes more complex.

A typical example is ReActAgent, which involves steps like Thought, Action, Observation, and Final Answer, requiring at least three LLM calls and one tool call. If loops are needed, there will be even more I/O calls.

A typical ReAct agent will make at least three calls to LLM.
A typical ReAct agent will make at least three calls to LLM. Image by Author

Is there a way to optimize this?

As shown in the diagram above, in a traditional programming model, all I/O calls are linear; the next task must wait until the previous one is completed.

Although mainstream LLMs now support result generation via stream output, in agent applications, we still need to wait for the LLM to finish generating results before returning or moving to the next phase.

Actually, we don’t need all I/O calls to proceed sequentially; they can be executed concurrently, as shown in the diagram below:

In concurrent programming, multiple steps are executed in parallel.
In concurrent programming, multiple steps are executed in parallel. Image by Author

Does this diagram look familiar? Yes, Python's asyncio package provides the ability to execute I/O-bound tasks concurrently, and nearly all I/O-based APIs, including LLM clients, support concurrent execution.

LlamaIndex's Workflow also utilizes the principles of concurrent programming. It goes further by not only encapsulating the details of the asyncio library but also providing an event mechanism that allows us to decouple different segments of the business process.

Now that we understand the background, let's step through LlamaIndex Workflow with an actual project.


First Impressions

Before the main course, let's have an appetizer by familiarizing ourselves with the elements and basic principles through a simple code example.

Importing necessary packages

First, we need to import the necessary tools. Workflow is already included in the latest version of LlamaIndex, no separate installation is needed.

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    Context,
    step,
)

Defining some events

Since Workflow is an event-driven framework, we should start by defining some events.

To avoid inconsistencies, we can first define a BaseEvent, ensuring all events use the key payload for message passing.

class BaseEvent(Event):
    payload: str | dict | None

Let’s define our first event of the day: SecondStepEvent

class SecondStepEvent(BaseEvent):
    ...

Starting simple

Next, let’s start coding our first Workflow program, which is a subclass of Workflow containing two methods:

class SimpleWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> SecondStepEvent:
        return SecondStepEvent(payload=ev.payload)
    
    @step
    async def second_step(self, ev: SecondStepEvent) -> StopEvent:
        return StopEvent(result=ev.payload)
  1. The method start accepts a StartEvent and then returns a SecondStepEvent.
  2. The method second_step accepts a SecondStepEvent and then returns a StopEvent.

Let's get the code up and running to see how it works.

s_wf = SimpleWorkflow(timeout=10, verbose=True)
result = await s_wf.run(payload="hello world")
print(result)

We have turned on the verbose option so that we can see in detail how the code is executed.

The result of the execution of our first Workflow program.
The result of the execution of our first Workflow program. Image by Author

Trying out the visualization tool

LlamaIndex also generously provides a small tool that allows us to see the entire workflow process, which is very intuitive.

from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(SimpleWorkflow, filename="simple_workflow.html")
Flowchart of the first Workflow code.
Flowchart of the first Workflow code. Image by Author

Explaining the principles

A quick look at the source code reveals that Workflow internally maintains a Context, which not only keeps an event queue but also maintains a dictionary containing each step.

Workflow uses a run_flow loop to listen for events and execute steps.
Workflow uses a run_flow loop to listen for events and execute steps. Image by Author

When Workflow is initialized, the step decorator analyzes the signature of each method to determine which events it will receive and return, starting to listen to the event queue, and then storing this method in the step dictionary.

When the Workflow's run method is launched, it starts a runflow loop, initially placing a StartEvent in the event queue. If there's a method that accepts this StartEvent, it starts executing and returns the corresponding event, putting it back into the event queue.

The step method can also directly call the Context's send_event method to place an event in the queue.

If the runflow loop detects a StopEvent in the queue, it exits the flow and returns the final result.

With a basic understanding of the elements and implementation principles, we can now explore the advantages and shortcomings of the Workflow through a hands-on project.

💡
Want to know more about my work in AI applications or the field of data science? Feel free to Subscribe Now, everything is free!

Hands-on Project

In today's hands-on project, we'll step by step implement an automated trading robot that listens to market sentiment and executes stock trades, demonstrating Workflow's branching and looping control, Streaming events, and concurrent execution features.

Note: The following code uses pseudocode to explain the Workflow mechanism and does not contain any trading logic or investment advice.

Branching and looping control

In the first version of the trading robot, we'll continuously monitor the latest news of a certain stock, analyze the sentiment implied in the news, and then make corresponding trades.

The entire code logic is shown in the diagram below:

Flowchart of our TradeMonitor program.
Flowchart of our TradeMonitor program. Image by Author

First, we'll define a Trading class that uses async to implement the buy and sell methods.

class StockTrader:
    async def buy(self, stock: str) -> None:
        await asyncio.sleep(0.5)
        print(f"Buying {stock}")
    
    async def sell(self, stock: str) -> None:
        await asyncio.sleep(0.5)
        print(f"Selling {stock}")

We also need to implement four events: LoopEvent, GetSentimentEvent, BuyEvent, and SellEvent, all of which are subclasses of BaseEvent, ensuring they follow a unified message-passing interface.

class LoopEvent(BaseEvent):
    ...
    
class GetSentimentEvent(BaseEvent):
    ...

class BuyEvent(BaseEvent):
    ...

class SellEvent(BaseEvent):
    ...

Next, we start implementing the TradeMonitorWorkflow class, which contains the core business logic.

class TradeMonitorWorkflow(Workflow):
    def __init__(self, total_cycle: int = 1, *args, **kwargs) -> None:
        self.total_cycle = total_cycle
        self.counter = 0
        self.trader = StockTrader()
        super().__init__(*args, **kwargs)
        
    @step    
    async def begin(self, ev: StartEvent | LoopEvent) \
            -> GetSentimentEvent | StopEvent:
        print("We now return to the begin step")
        if isinstance(ev, StartEvent):
            self.stock = ev.payload
         
        if self.counter < self.total_cycle:
            await asyncio.sleep(3)
            self.counter += 1
            return GetSentimentEvent(payload=self.stock)
        else:
            return StopEvent(result="We're done for the day.")
    
    @step
    async def get_sentiment(self, ev: GetSentimentEvent) -> BuyEvent | SellEvent:
        print(f"Wil get the latest sentiment for stock {ev.payload}")
        if random.random() < 0.3:
            return SellEvent(payload='Bearish')
        else:
            return BuyEvent(payload='Bullish')
    
    @step    
    async def buy(self, ev: BuyEvent) -> LoopEvent:
        print(f"We now buy some stock with sentiment {ev.payload}.")
        await self.trader.buy(self.stock)
        return LoopEvent(payload="Start a new cycle.")
    
    @step
    async def sell(self, ev: SellEvent) -> LoopEvent:
        print(f"We now sell some stock with sentiment {ev.payload}")
        await self.trader.sell(self.stock)
        return LoopEvent(payload="Start a new cycle.")
  1. The begin method is our entry point, accepting StartEvent and LoopEvent.
  2. The StartEvent is the default event that starts the code, and we pass the stock code through this event.
  3. The GetSentimentEvent triggers the get_sentiment method to obtain sentiment information. For simplicity, we use the random method to generate two sentiments, Bullish and Bearish, and then return the corresponding BuyEvent or SellEvent based on the sentiment.
  4. After a transaction is completed, the LoopEvent reinitiates the begin method for a new round of looping. To simplify the code, we set only one loop.
  5. In each loop, the begin method returns a GetSentimentEvent to trigger the acquisition of the latest stock sentiment. If all loops are completed, it returns a StopEvent.
  6. When a BuyEvent or SellEvent is received, the corresponding step method executes the transaction based on the sentiment flag in the message body and returns a LoopEvent to start a new loop.

As you can see, by using events, we can decouple complex loops and branching processes, making it possible for corresponding events to trigger new loops.

Let's use the draw_all_possible_flows tool to see if the entire flow chart matches our designed business logic diagram.

draw_all_possible_flows(TradeMonitorWorkflow, filename="trade_monitor_workflow.html")
We use events to decouple branching and looping control.
We use events to decouple branching and looping control. Image by Author

Is that all? If it's just about decoupling loops and branching processes, couldn't I achieve that with some coding tricks?

Yes, but flow control is just the most superficial layer. Next, let's experience the powerful potential unleashed by combining asyncio with Workflow.

Streaming events

When building an agent chain, one of the most headache-inducing issues is how to feed back messages during the execution process to users, helping them understand the progress of code execution.

In the code above, we use the print method to print progress in real-time on the console, but this approach is not feasible for a web applications.

One solution is to launch a separate pipeline to push messages to users in real-time, but when multiple steps are executed concurrently, how to handle this pipeline becomes a challenge.

Fortunately, the Workflow's Context directly provides a message streaming pipeline, and we can conveniently write messages into this pipeline and handle them uniformly at the calling end through an async for loop.

LlamaIndex Workflow uses a streaming queue to output messages.
LlamaIndex Workflow uses a streaming queue to output messages. Image by Author

Let's modify our previous trading program:

💡I'm hiding the following content to prevent LLM companies from stealing it, but you can still read it by subscribing for free.

👇By subscribing, you'll get access to all the premium content on this site and can join community discussions.🎉