Skip to main content

Documentation Index

Fetch the complete documentation index at: https://crewai-lorenze-imp-docs-improvements.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Overview

CrewAI Flows is a powerful feature designed to streamline the creation and management of AI workflows. Flows allow developers to combine and coordinate coding tasks and Crews efficiently, providing a robust framework for building sophisticated AI automations. Flows allow you to create structured, event-driven workflows. They provide a seamless way to connect multiple tasks, manage state, and control the flow of execution in your AI applications. With Flows, you can easily design and implement multi-step processes that leverage the full potential of CrewAI’s capabilities.
  1. Simplified Workflow Creation: Easily chain together multiple Crews and tasks to create complex AI workflows.
  2. State Management: Flows make it super easy to manage and share state between different tasks in your workflow.
  3. Event-Driven Architecture: Built on an event-driven model, allowing for dynamic and responsive workflows.
  4. Flexible Control Flow: Implement conditional logic, loops, and branching within your workflows.

When to Use Flows

  • You need deterministic orchestration and branching logic.
  • You need explicit state transitions across multiple steps.
  • You need resumable workflows with persistence.
  • You need to combine crews, direct model calls, and Python logic in one runtime.

When Not to Use Flows

  • A single prompt/response call is sufficient.
  • A single crew kickoff with no orchestration logic is sufficient.
  • You do not need stateful multi-step execution.

Getting Started

The example below shows a realistic Flow for support-ticket triage. It demonstrates features teams use in production: typed state, routing, memory access, and persistence.
Code
from crewai.flow.flow import Flow, listen, router, start
from crewai.flow.persistence import persist
from pydantic import BaseModel, Field


class SupportTriageState(BaseModel):
    ticket_id: str = ""
    customer_tier: str = "standard"  # standard | enterprise
    issue: str = ""
    urgency: str = "normal"
    route: str = ""
    draft_reply: str = ""
    internal_notes: list[str] = Field(default_factory=list)


@persist()
class SupportTriageFlow(Flow[SupportTriageState]):
    @start()
    def ingest_ticket(self):
        # kickoff(inputs={...}) is merged into typed state fields
        print(f"Flow State ID: {self.state.id}")

        self.remember(
            f"Ticket {self.state.ticket_id}: {self.state.issue}",
            scope=f"/support/{self.state.ticket_id}",
        )

        issue = self.state.issue.lower()
        if "security" in issue or "breach" in issue:
            self.state.urgency = "critical"
        elif self.state.customer_tier == "enterprise":
            self.state.urgency = "high"
        else:
            self.state.urgency = "normal"

        return self.state.issue

    @router(ingest_ticket)
    def route_ticket(self):
        issue = self.state.issue.lower()
        if "security" in issue or "breach" in issue:
            self.state.route = "security"
            return "security_review"
        if self.state.customer_tier == "enterprise" or self.state.urgency == "high":
            self.state.route = "priority"
            return "priority_queue"
        self.state.route = "standard"
        return "standard_queue"

    @listen("security_review")
    def handle_security(self):
        self.state.internal_notes.append("Escalated to Security Incident Response")
        self.state.draft_reply = (
            "We have escalated your case to our security team and will update you shortly."
        )
        return self.state.draft_reply

    @listen("priority_queue")
    def handle_priority(self):
        history = self.recall("SLA commitments for enterprise support", limit=2)
        self.state.internal_notes.append(
            f"Loaded {len(history)} memory hits for priority handling"
        )
        self.state.draft_reply = (
            "Your ticket has been prioritized and assigned to a senior support engineer."
        )
        return self.state.draft_reply

    @listen("standard_queue")
    def handle_standard(self):
        self.state.internal_notes.append("Routed to standard support queue")
        self.state.draft_reply = "Thanks for reporting this. Our team will follow up soon."
        return self.state.draft_reply


flow = SupportTriageFlow()
flow.plot("support_triage_flow")
result = flow.kickoff(
    inputs={
        "ticket_id": "TCK-1024",
        "customer_tier": "enterprise",
        "issue": "Cannot access SSO after enabling new policy",
    }
)
print("Final reply:", result)
print("Route:", flow.state.route)
print("Notes:", flow.state.internal_notes)
Flow Visual image In this example, one flow demonstrates several core features together:
  1. @start() initializes and normalizes state for downstream steps.
  2. @router() performs deterministic branching into labeled routes.
  3. Route listeners implement lane-specific behavior (security, priority, standard).
  4. @persist() keeps the flow state recoverable between runs.
  5. Built-in memory methods (remember, recall) add durable context beyond a single method call.
This pattern mirrors typical production workflows where request classification, policy-aware routing, and auditable state all happen in one orchestrated flow.

@start()

The @start() decorator marks entry points for a Flow. You can:
  • Declare multiple unconditional starts: @start()
  • Gate a start on a prior method or router label: @start("method_or_label")
  • Provide a callable condition to control when a start should fire
All satisfied @start() methods will execute (often in parallel) when the Flow begins or resumes.

@listen()

The @listen() decorator is used to mark a method as a listener for the output of another task in the Flow. The method decorated with @listen() will be executed when the specified task emits an output. The method can access the output of the task it is listening to as an argument.

Usage

The @listen() decorator can be used in several ways:
  1. Listening to a Method by Name: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.
    Code
    @listen("upstream_method")
    def downstream_method(self, upstream_result):
        # Implementation
    
  2. Listening to a Method Directly: You can pass the method itself. When that method completes, the listener method will be triggered.
    Code
    @listen(upstream_method)
    def downstream_method(self, upstream_result):
        # Implementation
    

Flow Output

Accessing and handling the output of a Flow is essential for integrating your AI workflows into larger applications or systems. CrewAI Flows provide straightforward mechanisms to retrieve the final output, access intermediate results, and manage the overall state of your Flow.

Retrieving the Final Output

When you run a Flow, the final output is determined by the last method that completes. The kickoff() method returns the output of this final method. Here’s how you can access the final output:
from crewai.flow.flow import Flow, listen, start

class OutputExampleFlow(Flow):
    @start()
    def first_method(self):
        return "Output from first_method"

    @listen(first_method)
    def second_method(self, first_output):
        return f"Second method received: {first_output}"


flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()

print("---- Final Output ----")
print(final_output)
Flow Visual image In this example, the second_method is the last method to complete, so its output will be the final output of the Flow. The kickoff() method will return the final output, which is then printed to the console. The plot() method will generate the HTML file, which will help you understand the flow.

Accessing and Updating State

In addition to retrieving the final output, you can also access and update the state within your Flow. The state can be used to store and share data between different methods in the Flow. After the Flow has run, you can access the state to retrieve any information that was added or updated during the execution. Here’s an example of how to update and access the state:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""

class StateExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from first_method"
        self.state.counter += 1

    @listen(first_method)
    def second_method(self):
        self.state.message += " - updated by second_method"
        self.state.counter += 1
        return self.state.message

flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
Flow Visual image In this example, the state is updated by both first_method and second_method. After the Flow has run, you can access the final state to see the updates made by these methods. By ensuring that the final method’s output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems, while also maintaining and accessing the state throughout the Flow’s execution.

Flow State Management

Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management, allowing developers to choose the approach that best fits their application’s needs.

Unstructured State Management

In unstructured state management, all state is stored in the state attribute of the Flow class. This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema. Even with unstructured states, CrewAI Flows automatically generates and maintains a unique identifier (UUID) for each state instance.
Code
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        # The state automatically includes an 'id' field
        print(f"State ID: {self.state['id']}")
        self.state['counter'] = 0
        self.state['message'] = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image Note: The id field is automatically generated and preserved throughout the flow’s execution. You don’t need to manage or set it manually, and it will be maintained even when updating the state with new data. Key Points:
  • Flexibility: You can dynamically add attributes to self.state without predefined constraints.
  • Simplicity: Ideal for straightforward workflows where state structure is minimal or varies significantly.

Structured State Management

Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow. By using models like Pydantic’s BaseModel, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments. Each state in CrewAI Flows automatically receives a unique identifier (UUID) to help track and manage state instances. This ID is automatically generated and managed by the Flow system.
Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    # Note: 'id' field is automatically added to all states
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        # Access the auto-generated ID if needed
        print(f"State ID: {self.state.id}")
        self.state.message = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()
Flow Visual image Key Points:
  • Defined Schema: ExampleState clearly outlines the state structure, enhancing code readability and maintainability.
  • Type Safety: Leveraging Pydantic ensures that state attributes adhere to the specified types, reducing runtime errors.
  • Auto-Completion: IDEs can provide better auto-completion and error checking based on the defined state model.

Choosing Between Unstructured and Structured State Management

  • Use Unstructured State Management when:
    • The workflow’s state is simple or highly dynamic.
    • Flexibility is prioritized over strict state definitions.
    • Rapid prototyping is required without the overhead of defining schemas.
  • Use Structured State Management when:
    • The workflow requires a well-defined and consistent state structure.
    • Type safety and validation are important for your application’s reliability.
    • You want to leverage IDE features like auto-completion and type checking for better developer experience.
By providing both unstructured and structured state management options, CrewAI Flows empowers developers to build AI workflows that are both flexible and robust, catering to a wide range of application requirements.

Flow Persistence

The @persist decorator enables automatic state persistence in CrewAI Flows, allowing you to maintain flow state across restarts or different workflow executions. This decorator can be applied at either the class level or method level, providing flexibility in how you manage state persistence.

Class-Level Persistence

When applied at the class level, the @persist decorator automatically persists all flow method states:
@persist  # Using SQLiteFlowPersistence by default
class MyFlow(Flow[MyState]):
    @start()
    def initialize_flow(self):
        # This method will automatically have its state persisted
        self.state.counter = 1
        print("Initialized flow. State ID:", self.state.id)

    @listen(initialize_flow)
    def next_step(self):
        # The state (including self.state.id) is automatically reloaded
        self.state.counter += 1
        print("Flow state is persisted. Counter:", self.state.counter)

Method-Level Persistence

For more granular control, you can apply @persist to specific methods:
class AnotherFlow(Flow[dict]):
    @persist  # Persists only this method's state
    @start()
    def begin(self):
        if "runs" not in self.state:
            self.state["runs"] = 0
        self.state["runs"] += 1
        print("Method-level persisted runs:", self.state["runs"])

How It Works

  1. Unique State Identification
    • Each flow state automatically receives a unique UUID
    • The ID is preserved across state updates and method calls
    • Supports both structured (Pydantic BaseModel) and unstructured (dictionary) states
  2. Default SQLite Backend
    • SQLiteFlowPersistence is the default storage backend
    • States are automatically saved to a local SQLite database
    • Robust error handling ensures clear messages if database operations fail
  3. Error Handling
    • Comprehensive error messages for database operations
    • Automatic state validation during save and load
    • Clear feedback when persistence operations encounter issues

Important Considerations

  • State Types: Both structured (Pydantic BaseModel) and unstructured (dictionary) states are supported
  • Automatic ID: The id field is automatically added if not present
  • State Recovery: Failed or restarted flows can automatically reload their previous state
  • Custom Implementation: You can provide your own FlowPersistence implementation for specialized storage needs

Technical Advantages

  1. Precise Control Through Low-Level Access
    • Direct access to persistence operations for advanced use cases
    • Fine-grained control via method-level persistence decorators
    • Built-in state inspection and debugging capabilities
    • Full visibility into state changes and persistence operations
  2. Enhanced Reliability
    • Automatic state recovery after system failures or restarts
    • Transaction-based state updates for data integrity
    • Comprehensive error handling with clear error messages
    • Robust validation during state save and load operations
  3. Extensible Architecture
    • Customizable persistence backend through FlowPersistence interface
    • Support for specialized storage solutions beyond SQLite
    • Compatible with both structured (Pydantic) and unstructured (dict) states
    • Seamless integration with existing CrewAI flow patterns
The persistence system’s architecture emphasizes technical precision and customization options, allowing developers to maintain full control over state management while benefiting from built-in reliability features.

Flow Control

Conditional Logic: or

The or_ function in Flows allows you to listen to multiple methods and trigger the listener method when any of the specified methods emit an output.
from crewai.flow.flow import Flow, listen, or_, start

class OrExampleFlow(Flow):

    @start()
    def start_method(self):
        return "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        return "Hello from the second method"

    @listen(or_(start_method, second_method))
    def logger(self, result):
        print(f"Logger: {result}")



flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image When you run this Flow, the logger method will be triggered by the output of either the start_method or the second_method. The or_ function is used to listen to multiple methods and trigger the listener method when any of the specified methods emit an output.

Conditional Logic: and

The and_ function in Flows allows you to listen to multiple methods and trigger the listener method only when all the specified methods emit an output.
from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):

    @start()
    def start_method(self):
        self.state["greeting"] = "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        self.state["joke"] = "What do computers eat? Microchips."

    @listen(and_(start_method, second_method))
    def logger(self):
        print("---- Logger ----")
        print(self.state)

flow = AndExampleFlow()
flow.plot()
flow.kickoff()
Flow Visual image When you run this Flow, the logger method will be triggered only when both the start_method and the second_method emit an output. The and_ function is used to listen to multiple methods and trigger the listener method only when all the specified methods emit an output.

Router

The @router() decorator in Flows allows you to define conditional routing logic based on the output of a method. You can specify different routes based on the output of the method, allowing you to control the flow of execution dynamically.
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    success_flag: bool = False

class RouterFlow(Flow[ExampleState]):

    @start()
    def start_method(self):
        print("Starting the structured flow")
        random_boolean = random.choice([True, False])
        self.state.success_flag = random_boolean

    @router(start_method)
    def second_method(self):
        if self.state.success_flag:
            return "success"
        else:
            return "failed"

    @listen("success")
    def third_method(self):
        print("Third method running")

    @listen("failed")
    def fourth_method(self):
        print("Fourth method running")


flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
Flow Visual image In the above example, the start_method generates a random boolean value and sets it in the state. The second_method uses the @router() decorator to define conditional routing logic based on the value of the boolean. If the boolean is True, the method returns "success", and if it is False, the method returns "failed". The third_method and fourth_method listen to the output of the second_method and execute based on the returned value. When you run this Flow, the output will change based on the random boolean value generated by the start_method.

Human in the Loop (human feedback)

The @human_feedback decorator requires CrewAI version 1.8.0 or higher.
The @human_feedback decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
Code
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult

class ReviewFlow(Flow):
    @start()
    @human_feedback(
        message="Do you approve this content?",
        emit=["approved", "rejected", "needs_revision"],
        llm="gpt-4o-mini",
        default_outcome="needs_revision",
    )
    def generate_content(self):
        return "Content to be reviewed..."

    @listen("approved")
    def on_approval(self, result: HumanFeedbackResult):
        print(f"Approved! Feedback: {result.feedback}")

    @listen("rejected")
    def on_rejection(self, result: HumanFeedbackResult):
        print(f"Rejected. Reason: {result.feedback}")
When emit is specified, the human’s free-form feedback is interpreted by an LLM and collapsed into one of the specified outcomes, which then triggers the corresponding @listen decorator. You can also use @human_feedback without routing to simply collect feedback:
Code
@start()
@human_feedback(message="Any comments on this output?")
def my_method(self):
    return "Output for review"

@listen(my_method)
def next_step(self, result: HumanFeedbackResult):
    # Access feedback via result.feedback
    # Access original output via result.output
    pass
Access all feedback collected during a flow via self.last_human_feedback (most recent) or self.human_feedback_history (all feedback as a list). For a complete guide on human feedback in flows, including async/non-blocking feedback with custom providers (Slack, webhooks, etc.), see Human Feedback in Flows.

Adding Agents to Flows

Agents can be seamlessly integrated into your flows, providing a lightweight alternative to full Crews when you need simpler, focused task execution. Here’s an example of how to use an Agent within a flow to perform market research:
import asyncio
from typing import Any, Dict, List

from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field

from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start


# Define a structured output format
class MarketAnalysis(BaseModel):
    key_trends: List[str] = Field(description="List of identified market trends")
    market_size: str = Field(description="Estimated market size")
    competitors: List[str] = Field(description="Major competitors in the space")


# Define flow state
class MarketResearchState(BaseModel):
    product: str = ""
    analysis: MarketAnalysis | None = None


# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
    @start()
    def initialize_research(self) -> Dict[str, Any]:
        print(f"Starting market research for {self.state.product}")
        return {"product": self.state.product}

    @listen(initialize_research)
    async def analyze_market(self) -> Dict[str, Any]:
        # Create an Agent for market research
        analyst = Agent(
            role="Market Research Analyst",
            goal=f"Analyze the market for {self.state.product}",
            backstory="You are an experienced market analyst with expertise in "
            "identifying market trends and opportunities.",
            tools=[SerperDevTool()],
            verbose=True,
        )

        # Define the research query
        query = f"""
        Research the market for {self.state.product}. Include:
        1. Key market trends
        2. Market size
        3. Major competitors

        Format your response according to the specified structure.
        """

        # Execute the analysis with structured output format
        result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
        if result.pydantic:
            print("result", result.pydantic)
        else:
            print("result", result)

        # Return the analysis to update the state
        return {"analysis": result.pydantic}

    @listen(analyze_market)
    def present_results(self, analysis) -> None:
        print("\nMarket Analysis Results")
        print("=====================")

        if isinstance(analysis, dict):
            # If we got a dict with 'analysis' key, extract the actual analysis object
            market_analysis = analysis.get("analysis")
        else:
            market_analysis = analysis

        if market_analysis and isinstance(market_analysis, MarketAnalysis):
            print("\nKey Market Trends:")
            for trend in market_analysis.key_trends:
                print(f"- {trend}")

            print(f"\nMarket Size: {market_analysis.market_size}")

            print("\nMajor Competitors:")
            for competitor in market_analysis.competitors:
                print(f"- {competitor}")
        else:
            print("No structured analysis data available.")
            print("Raw analysis:", analysis)


# Usage example
async def run_flow():
    flow = MarketResearchFlow()
    flow.plot("MarketResearchFlowPlot")
    result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
    return result


# Run the flow
if __name__ == "__main__":
    asyncio.run(run_flow())
Flow Visual image This example demonstrates several key features of using Agents in flows:
  1. Structured Output: Using Pydantic models to define the expected output format (MarketAnalysis) ensures type safety and structured data throughout the flow.
  2. State Management: The flow state (MarketResearchState) maintains context between steps and stores both inputs and outputs.
  3. Tool Integration: Agents can use tools (like WebsiteSearchTool) to enhance their capabilities.

Multi-Crew Flows and Plotting

Detailed build walkthroughs and project scaffolding are documented in guide pages to keep this concepts page focused. For visualization:
  • Use flow.plot("my_flow_plot") in code, or
  • Use crewai flow plot in CLI projects.

Running Flows

There are two ways to run a flow:

Using the Flow API

You can run a flow programmatically by creating an instance of your flow class and calling the kickoff() method:
flow = SupportTriageFlow()
result = flow.kickoff()

Streaming Flow Execution

For real-time visibility into flow execution, you can enable streaming to receive output as it’s generated:
class StreamingFlow(Flow):
    stream = True  # Enable streaming

    @start()
    def research(self):
        # Your flow implementation
        pass

# Iterate over streaming output
flow = StreamingFlow()
streaming = flow.kickoff()
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access final result
result = streaming.result
Learn more about streaming in the Streaming Flow Execution guide.

Memory in Flows

Every Flow automatically has access to CrewAI’s unified Memory system. You can store, recall, and extract memories directly inside any flow method using three built-in convenience methods.

Built-in Methods

MethodDescription
self.remember(content, **kwargs)Store content in memory. Accepts optional scope, categories, metadata, importance.
self.recall(query, **kwargs)Retrieve relevant memories. Accepts optional scope, categories, limit, depth.
self.extract_memories(content)Break raw text into discrete, self-contained memory statements.
A default Memory() instance is created automatically when the Flow initializes. You can also pass a custom one:
from crewai.flow.flow import Flow
from crewai import Memory

custom_memory = Memory(
    recency_weight=0.5,
    recency_half_life_days=7,
    embedder={"provider": "ollama", "config": {"model_name": "mxbai-embed-large"}},
)

flow = MyFlow(memory=custom_memory)

Example: Research and Analyze Flow

from crewai.flow.flow import Flow, listen, start


class ResearchAnalysisFlow(Flow):
    @start()
    def gather_data(self):
        # Simulate research findings
        findings = (
            "PostgreSQL handles 10k concurrent connections with connection pooling. "
            "MySQL caps at around 5k. MongoDB scales horizontally but adds complexity."
        )

        # Extract atomic facts and remember each one
        memories = self.extract_memories(findings)
        for mem in memories:
            self.remember(mem, scope="/research/databases")

        return findings

    @listen(gather_data)
    def analyze(self, raw_findings):
        # Recall relevant past research (from this run or previous runs)
        past = self.recall("database performance and scaling", limit=10, depth="shallow")

        context_lines = [f"- {m.record.content}" for m in past]
        context = "\n".join(context_lines) if context_lines else "No prior context."

        return {
            "new_findings": raw_findings,
            "prior_context": context,
            "total_memories": len(past),
        }


flow = ResearchAnalysisFlow()
result = flow.kickoff()
print(result)
Because memory persists across runs (backed by LanceDB on disk), the analyze step will recall findings from previous executions too — enabling flows that learn and accumulate knowledge over time. See the Memory documentation for details on scopes, slices, composite scoring, embedder configuration, and more.

Using the CLI

Starting from version 0.103.0, you can run flows using the crewai run command:
crewai run
This command automatically detects if your project is a flow (based on the type = "flow" setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line. For backward compatibility, you can also use:
crewai flow kickoff
However, the crewai run command is now the preferred method as it works for both crews and flows.

Common Failure Modes

Router branch not firing

  • Cause: returned label does not match a @listen("label") value.
  • Fix: align router return strings with listener labels exactly.

State fields missing at runtime

  • Cause: untyped dynamic fields or missing kickoff inputs.
  • Fix: use typed state and validate required fields in @start().

Prompt/token growth over time

  • Cause: appending unbounded message history in state.
  • Fix: apply sliding-window state and summary compaction patterns.

Non-idempotent retries

  • Cause: side effects executed on retried steps.
  • Fix: add idempotency keys/markers to state and guard external writes.