CrewAI Flows 이해
CrewAI Flows를 사용하여 AI 워크플로우를 생성하고 관리하는 방법
소개
CrewAI Flows는 AI 워크플로우 생성을 간소화하고 관리하기 위해 설계된 강력한 기능입니다. Flows를 사용하면 개발자가 여러 코드 작업과 크루를 효율적으로 결합하고 조정할 수 있어, 복잡한 AI 자동화를 구축할 수 있는 견고한 프레임워크를 제공합니다.
Flows는 구조화된 이벤트 기반 워크플로우를 생성할 수 있게 해줍니다. 이를 통해 여러 작업을 연결하고 상태를 관리하며, AI 애플리케이션 내 실행 흐름을 제어할 수 있습니다. Flows를 사용하면 CrewAI의 기능을 최대한 활용하는 다단계 프로세스를 쉽게 설계하고 구현할 수 있습니다.
간편한 워크플로우 생성: 여러 크루와 태스크를 쉽게 연결하여 복잡한 AI 워크플로우를 생성할 수 있습니다.
상태 관리: 작업 간 상태를 쉽게 공유하고 관리할 수 있습니다.
이벤트 기반 아키텍처: 이벤트 기반 모델 위에 구축되어 동적이고 반응적인 워크플로우를 구현할 수 있습니다.
유연한 제어 흐름: 조건문, 루프, 분기 등을 워크플로우 내에 구현할 수 있습니다.
시작하기
OpenAI를 사용하여 첫 번째 태스크에서 무작위 도시를 생성하고, 두 번째 태스크에서 해당 도시에 대한 재미있는 사실을 생성하는 간단한 Flow를 만들어 봅시다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
class ExampleFlow(Flow):
model = "gpt-4o-mini"
@start()
def generate_city(self):
print("Starting flow")
# 각 Flow 상태는 자동으로 고유 ID를 가집니다
print(f"Flow State ID: {self.state['id']}")
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
random_city = response["choices"][0]["message"]["content"]
# 상태에 도시 저장
self.state["city"] = random_city
print(f"Random City: {random_city}")
return random_city
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
fun_fact = response["choices"][0]["message"]["content"]
# 상태에 재미있는 사실 저장
self.state["fun_fact"] = fun_fact
return fun_fact
flow = ExampleFlow()
result = flow.kickoff()
print(f"Generated fun fact: {result}")
위 예제에서는 OpenAI를 이용해 무작위 도시를 생성하고, 해당 도시에 대한 재미있는 사실을 생성하는 간단한 Flow를 구성했습니다. Flow는 generate_city
와 generate_fun_fact
라는 두 개의 태스크로 구성되어 있으며, generate_city
는 Flow의 시작 지점이고, generate_fun_fact
는 generate_city
의 출력을 수신(listen)합니다.
각 Flow 인스턴스는 자동으로 고유 식별자(UUID)를 상태에 부여받아, Flow 실행을 추적하고 관리하는 데 도움을 줍니다. 상태는 실행 도중 생성된 도시와 재미있는 사실과 같은 데이터를 저장하는 데도 사용됩니다.
Flow를 실행하면 다음과 같은 순서로 동작합니다:
- Flow 상태에 고유 ID를 생성
- 무작위 도시를 생성하고 상태에 저장
- 해당 도시에 대한 재미있는 사실을 생성하고 상태에 저장
- 콘솔에 결과 출력
고유 ID와 상태에 저장된 데이터는 Flow 실행을 추적하고 태스크 간 컨텍스트를 유지하는 데 유용합니다.
주의: .env
파일을 설정하여 OPENAI_API_KEY
를 저장해야 합니다. 이 키는 OpenAI API 요청을 인증하는 데 필요합니다.
@start()
@start()
데코레이터는 Flow의 시작 지점을 지정하는 데 사용됩니다. Flow가 시작되면 @start()
가 적용된 모든 메서드가 병렬로 실행됩니다. 하나의 Flow에서 여러 개의 시작 메서드를 가질 수 있으며, Flow가 시작되면 이들 모두 실행됩니다.
@listen()
@listen()
데코레이터는 Flow 내 다른 태스크의 출력을 수신하는 리스너 메서드를 지정합니다. 지정된 태스크가 출력을 생성하면, @listen()
이 적용된 메서드가 실행됩니다. 이 메서드는 리스닝 대상 태스크의 출력을 인자로 받을 수 있습니다.
사용법
@listen()
데코레이터는 다음과 같은 방식으로 사용할 수 있습니다:
메서드 이름으로 리스닝: 문자열로 리스닝 대상 메서드 이름을 지정할 수 있습니다.
1 2 3
@listen("generate_city") def generate_fun_fact(self, random_city): # 구현 내용
메서드를 직접 지정: 리스닝 대상 메서드를 직접 참조할 수도 있습니다.
1 2 3
@listen(generate_city) def generate_fun_fact(self, random_city): # 구현 내용
Flow 출력
Flow의 출력을 접근하고 처리하는 것은 AI 워크플로우를 더 큰 애플리케이션이나 시스템에 통합할 때 중요합니다. CrewAI Flows는 최종 출력, 중간 결과, 전체 상태에 쉽게 접근하고 관리할 수 있는 메커니즘을 제공합니다.
최종 출력값 가져오기
Flow를 실행하면 마지막에 완료된 메서드의 출력이 최종 출력으로 간주됩니다. kickoff()
메서드는 이 마지막 메서드의 결과를 반환합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
def first_method(self):
return "Output from first_method"
@listen(first_method)
def second_method(self, first_output):
return f"Second method received: {first_output}"
flow = OutputExampleFlow()
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
이 예시에서는 second_method
가 마지막으로 실행되므로, 해당 출력이 Flow의 최종 출력이 됩니다. kickoff()
메서드가 이 값을 반환하며, 콘솔에 출력됩니다.
상태 접근 및 업데이트
최종 출력값 외에도 Flow 실행 도중 상태에 접근하고 업데이트할 수 있습니다. 상태는 Flow 내 여러 메서드 간 데이터를 저장하고 공유하는 데 사용됩니다. Flow 실행 후에는 저장된 상태를 통해 어떤 데이터가 갱신되었는지 확인할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StateExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
return self.state.message
flow = StateExampleFlow()
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
이 예시에서 상태는 first_method
와 second_method
모두에 의해 업데이트됩니다. Flow 실행 후 상태를 확인하면 이들 메서드가 어떻게 상태를 수정했는지 확인할 수 있습니다.
Flow 상태 관리
안정적이고 유지보수 가능한 AI 워크플로우를 구축하려면 상태 관리를 효과적으로 수행하는 것이 중요합니다. CrewAI Flows는 비구조적 상태와 구조적 상태 모두를 지원하는 강력한 메커니즘을 제공합니다.
비구조적 상태 관리
비구조적 상태에서는 모든 상태가 Flow
클래스의 state
속성에 저장됩니다. 이 방식은 유연성을 제공하여 엄격한 스키마 없이 상태 속성을 자유롭게 추가하거나 수정할 수 있습니다. 비구조적 상태에서도 CrewAI는 각 상태 인스턴스에 대해 자동으로 고유 식별자(UUID)를 생성하고 유지합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# 상태에는 자동으로 'id' 필드가 포함됩니다
print(f"State ID: {self.state['id']}")
self.state['counter'] = 0
self.state['message'] = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated"
@listen(second_method)
def third_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated again"
print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow.kickoff()
참고: id
필드는 자동으로 생성되며 Flow 실행 전체에 걸쳐 유지됩니다. 수동으로 설정할 필요 없이 새로운 데이터를 추가할 때도 자동으로 보존됩니다.
핵심 요점:
- 유연성:
self.state
에 속성을 동적으로 추가할 수 있습니다. - 단순성: 상태 구조가 최소하거나 작업마다 크게 달라질 경우 적합합니다.
구조적 상태 관리
구조적 상태 관리는 사전에 정의된 스키마를 활용하여 워크플로우 전반에 걸쳐 일관성과 타입 안정성을 보장합니다. Pydantic의 BaseModel
과 같은 모델을 사용하면 상태의 형태를 명확하게 정의할 수 있어, 개발 환경에서 더 나은 유효성 검사 및 자동 완성을 활용할 수 있습니다.
CrewAI Flows에서 각 상태는 자동으로 고유 식별자(UUID)를 부여받아 상태 인스턴스를 추적하고 관리하는 데 도움을 줍니다. 이 ID는 Flow 시스템에 의해 자동으로 생성되고 관리됩니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
# 참고: 'id' 필드는 모든 상태에 자동으로 추가됩니다
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
# 필요 시 자동 생성된 ID 접근
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow()
flow.kickoff()
핵심 요점:
- 명확한 스키마:
ExampleState
는 상태 구조를 명확하게 정의하여 코드 가독성과 유지보수성을 높여줍니다. - 타입 안정성: Pydantic을 활용하면 상태 속성이 지정된 타입을 따르도록 보장하여 런타임 오류를 줄일 수 있습니다.
- 자동 완성 지원: 명시된 상태 모델을 기반으로 IDE에서 자동 완성과 오류 검사를 효과적으로 지원합니다.
비구조적 상태 vs 구조적 상태 선택 기준
- 비구조적 상태 관리가 적합한 경우:
- 워크플로우 상태가 간단하거나 매우 동적일 때
- 엄격한 상태 정의보다 유연성이 우선시될 때
- 스키마 정의 없이 빠르게 프로토타이핑할 때
- 구조적 상태 관리가 적합한 경우:
- 워크플로우에 명확하고 일관된 상태 구조가 필요할 때
- 애플리케이션의 신뢰성을 위해 타입 안정성과 유효성 검사가 중요한 경우
- IDE의 자동 완성과 타입 검사 기능을 적극 활용하고 싶을 때
CrewAI Flows는 이러한 두 가지 상태 관리 방식을 모두 제공함으로써, 다양한 애플리케이션 요구사항에 맞춰 유연하면서도 견고한 AI 워크플로우를 구축할 수 있도록 돕습니다.
Flow 상태 지속성 (Persistence)
@persist
데코레이터를 사용하면 Flow 상태를 자동으로 저장(persist)할 수 있어, 워크플로우 실행 간 또는 재시작 후에도 상태를 유지할 수 있습니다. 이 데코레이터는 클래스 단위 또는 메서드 단위로 사용할 수 있어, 상태 지속성에 유연성을 제공합니다.
클래스 레벨 지속성
클래스에 @persist
를 적용하면 해당 Flow의 모든 메서드 상태가 자동으로 저장됩니다:
1
2
3
4
5
6
7
8
9
10
11
12
13
@persist # 기본적으로 SQLiteFlowPersistence 사용
class MyFlow(Flow[MyState]):
@start()
def initialize_flow(self):
# 이 메서드의 상태는 자동으로 저장됩니다
self.state.counter = 1
print("Initialized flow. State ID:", self.state.id)
@listen(initialize_flow)
def next_step(self):
# 상태가 자동으로 복원됨
self.state.counter += 1
print("Flow state is persisted. Counter:", self.state.counter)
메서드 레벨 지속성
더 세밀한 제어를 원할 경우, @persist
를 개별 메서드에 적용할 수 있습니다:
1
2
3
4
5
6
7
8
class AnotherFlow(Flow[dict]):
@persist # 해당 메서드 상태만 저장됨
@start()
def begin(self):
if "runs" not in self.state:
self.state["runs"] = 0
self.state["runs"] += 1
print("Method-level persisted runs:", self.state["runs"])
@persist
기능을 활용하면 CrewAI Flows의 상태를 다양한 방식으로 저장 및 재사용할 수 있어, 장기 실행 흐름이나 중단 복원 시 매우 유용합니다.
작동 방식
- 고유 상태 식별
- 각 Flow 상태는 자동으로 고유 UUID를 부여받습니다
- 이 ID는 상태 업데이트와 메서드 호출 전반에서 유지됩니다
- 구조적 상태(Pydantic BaseModel)와 비구조적 상태(dict) 모두 지원됩니다
- 기본 SQLite 백엔드
- 기본 저장소는
SQLiteFlowPersistence
입니다 - 상태는 로컬 SQLite 데이터베이스에 자동 저장됩니다
- 데이터베이스 작업 실패 시 명확한 에러 메시지 제공으로 강력한 오류 처리 기능 제공
- 기본 저장소는
- 오류 처리
- 데이터베이스 작업 시 명확하고 포괄적인 오류 메시지 제공
- 저장 및 로딩 중 자동 상태 검증 수행
- 상태 지속성 처리 중 문제가 발생하면 명확한 피드백 제공
주요 고려사항
- 상태 타입: 구조적(Pydantic BaseModel), 비구조적(dict) 상태 모두 지원됨
- 자동 ID 부여:
id
필드는 없을 경우 자동으로 추가됨 - 상태 복구: 실패하거나 재시작된 Flow는 자동으로 이전 상태를 복원할 수 있음
- 커스텀 구현 가능: 특수 저장소 요구사항을 위해 사용자 정의
FlowPersistence
구현 제공 가능
기술적 장점
- 낮은 수준의 접근을 통한 정밀한 제어
- 고급 사용 사례를 위한 상태 지속성 작업 직접 접근 가능
- 메서드 수준의 지속성 데코레이터로 세밀한 제어 가능
- 내장 상태 검사 및 디버깅 기능 제공
- 상태 변경 및 지속성 작업에 대한 완전한 가시성 확보
- 신뢰성 향상
- 시스템 장애나 재시작 이후에도 상태 자동 복구 가능
- 트랜잭션 기반 상태 업데이트로 데이터 무결성 확보
- 명확한 에러 메시지를 포함한 포괄적인 오류 처리
- 저장 및 로딩 시 강력한 검증 수행
- 확장 가능한 아키텍처
FlowPersistence
인터페이스를 통한 커스텀 백엔드 구현 가능- SQLite를 넘어선 특수 저장소 솔루션 지원
- 구조적(Pydantic), 비구조적(dict) 상태 모두와 호환됨
- 기존 CrewAI Flow 패턴과 매끄럽게 통합 가능
이러한 상태 지속성 시스템 아키텍처는 기술적 정밀성과 커스터마이징 옵션을 강조하여, 개발자가 상태 관리를 완전히 통제하면서도 내장된 신뢰성 기능을 활용할 수 있도록 지원합니다.
Flow 제어
조건 로직: or
or_
함수는 여러 메서드의 출력을 리슨(listen)할 수 있게 해주며, 그중 하나라도 출력이 발생하면 해당 리스너 메서드를 실행합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@start()
def start_method(self):
return "Hello from the start method"
@listen(start_method)
def second_method(self):
return "Hello from the second method"
@listen(or_(start_method, second_method))
def logger(self, result):
print(f"Logger: {result}")
flow = OrExampleFlow()
flow.kickoff()
출력 예시:
Logger: Hello from the start method
Logger: Hello from the second method
or_
함수는 지정된 메서드 중 하나라도 출력이 발생하면 logger
메서드를 트리거합니다.
조건 로직: and
and_
함수는 지정된 모든 메서드의 출력이 발생해야 리스너 메서드를 실행합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@start()
def start_method(self):
self.state["greeting"] = "Hello from the start method"
@listen(start_method)
def second_method(self):
self.state["joke"] = "What do computers eat? Microchips."
@listen(and_(start_method, second_method))
def logger(self):
print("---- Logger ----")
print(self.state)
flow = AndExampleFlow()
flow.kickoff()
출력 예시:
---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
and_
함수는 지정된 모든 메서드가 출력을 생성한 경우에만 logger
메서드를 실행합니다. 이를 통해 다단계 조건 로직을 쉽게 구성할 수 있습니다.
라우터 (@router
)
Flows에서 @router()
데코레이터를 사용하면 메서드 출력에 따라 조건부 라우팅 로직을 정의할 수 있습니다. 메서드 출력 값에 따라 서로 다른 실행 흐름을 제어할 수 있도록 해줍니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class ExampleState(BaseModel):
success_flag: bool = False
class RouterFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting the structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
def second_method(self):
if self.state.success_flag:
return "success"
else:
return "failed"
@listen("success")
def third_method(self):
print("Third method running")
@listen("failed")
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.kickoff()
이 예제에서 start_method
는 무작위 불리언 값을 생성하고 상태에 저장합니다. second_method
는 @router()
를 사용해 해당 값에 따라 "success"
또는 "failed"
를 반환합니다. 그리고 third_method
와 fourth_method
는 각각 그 출력값에 리슨하여 조건에 따라 실행됩니다.
Flow를 실행할 때, start_method
가 생성한 불리언 값에 따라 실행 흐름이 달라집니다.
Flow에 에이전트 추가하기
에이전트는 Flow에 자연스럽게 통합되어, 전체 Crew 없이 간단하고 집중된 작업 실행을 가능하게 합니다. 아래는 시장 조사 업무를 수행하는 에이전트를 Flow 내에서 사용하는 예시입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
# 구조화된 출력 형식 정의
class MarketAnalysis(BaseModel):
key_trends: List[str] = Field(description="List of identified market trends")
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Flow 상태 정의
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis | None = None
# Flow 클래스 정의
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
async def analyze_market(self) -> Dict[str, Any]:
# 시장 조사용 에이전트 생성
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in identifying market trends and opportunities.",
tools=[SerperDevTool()],
verbose=True,
)
# 조사 요청 쿼리 정의
query = f"""
Research the market for {self.state.product}. Include:
1. Key market trends
2. Market size
3. Major competitors
Format your response according to the specified structure.
"""
# 구조화된 출력 형식으로 분석 실행
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
if isinstance(analysis, dict):
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
# 사용 예시
async def run_flow():
flow = MarketResearchFlow()
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
if __name__ == "__main__":
asyncio.run(run_flow())
이 예제에서는 다음과 같은 핵심 기능들을 보여줍니다:
- 구조화된 출력:
MarketAnalysis
모델을 사용해 예상되는 출력 형식을 정의함으로써 타입 안정성과 구조화된 데이터를 유지합니다. - 상태 관리:
MarketResearchState
를 통해 입력과 출력을 저장하고 단계 간 컨텍스트를 유지합니다. - 도구 통합: 에이전트는
SerperDevTool
과 같은 도구를 사용하여 기능을 강화할 수 있습니다.
Flow에 크루(Crews) 추가하기
CrewAI에서는 여러 크루를 포함하는 Flow를 쉽게 생성할 수 있습니다.
다음 명령어를 실행하면 여러 크루를 포함하는 Flow 생성을 위한 기본 템플릿이 자동으로 구성된 새로운 CrewAI 프로젝트를 생성할 수 있습니다:
1
crewai create flow name_of_flow
이 명령어는 필요한 폴더 구조와 함께 미리 정의된 poem_crew
크루가 포함된 프로젝트를 생성합니다. poem_crew
를 복사 및 수정하여 새로운 크루를 쉽게 만들 수 있습니다.
폴더 구조 예시
명령어 실행 후 생성되는 폴더 구조는 다음과 유사합니다:
디렉토리/파일 | 설명 |
---|---|
name_of_flow/ | Flow의 루트 디렉토리 |
├── crews/ | 개별 크루를 위한 디렉토리 |
│ └── poem_crew/ | poem_crew 의 구성과 스크립트를 담고 있는 디렉토리 |
│ ├── config/ | poem_crew 의 설정 파일 디렉토리 |
│ │ ├── agents.yaml | poem_crew 의 에이전트를 정의한 YAML 파일 |
│ │ └── tasks.yaml | poem_crew 의 태스크를 정의한 YAML 파일 |
│ ├── poem_crew.py | poem_crew 의 실제 기능 구현 스크립트 |
├── tools/ | Flow에서 사용하는 추가 도구 디렉토리 |
│ └── custom_tool.py | 커스텀 도구 구현 파일 |
├── main.py | Flow를 실행하는 메인 스크립트 |
├── README.md | 프로젝트 설명 및 실행 가이드 |
├── pyproject.toml | 프로젝트 설정 및 의존성 관리 파일 |
└── .gitignore | 버전 관리에서 제외할 파일 및 디렉토리 목록 |
크루 구성하기
crews
폴더 내에서 여러 크루를 정의할 수 있습니다. 각 크루는 자신만의 디렉토리를 가지며 설정 파일과 크루 정의 스크립트를 포함합니다. 예: poem_crew
폴더는 다음과 같은 구성으로 되어 있습니다:
config/agents.yaml
: 크루의 에이전트를 정의config/tasks.yaml
: 크루의 태스크를 정의poem_crew.py
: 에이전트, 태스크, 크루 정의 포함
기존 poem_crew
를 복사해서 다른 크루를 쉽게 만들 수 있습니다.
main.py
에서 크루 연결하기
main.py
파일에서 Flow 클래스를 정의하고, @start
, @listen
데코레이터를 활용하여 크루 간 흐름을 연결할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
print("Poem generated", result.raw)
self.state.poem = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot()
if __name__ == "__main__":
kickoff()
이 예시에서는 PoemFlow
클래스가 문장 수를 생성하고, PoemCrew
를 사용해 시를 생성하며, 결과를 파일로 저장하는 전체 흐름을 정의합니다.
Flow 실행하기
선택 사항으로, Flow 실행 전에 다음 명령어로 의존성을 설치할 수 있습니다:
1
crewai install
의존성 설치 후, 가상 환경을 활성화하려면:
1
source .venv/bin/activate
이후 Flow를 실행하려면 다음 명령어 중 하나를 사용하면 됩니다:
1
crewai flow kickoff
또는
1
uv run kickoff
실행 결과는 콘솔에 출력됩니다.
Flow 시각화 (Plot)
AI 워크플로우를 시각화하면 구조와 실행 경로를 더 쉽게 이해하고 최적화할 수 있습니다. CrewAI는 Flow를 인터랙티브하게 시각화할 수 있는 강력한 플롯 도구를 제공합니다.
플롯(Plots)이란?
CrewAI의 플롯은 AI 워크플로우를 시각적으로 표현한 것입니다. 각 태스크, 그들 간의 연결, 데이터 흐름을 그래픽으로 보여주며, 실행 순서 파악, 병목 현상 식별, 논리 흐름 검증에 매우 유용합니다.
플롯 생성 방법
CrewAI는 Flow의 플롯을 생성할 수 있는 두 가지 방법을 제공합니다:
옵션 1: plot()
메서드 사용
Flow 인스턴스에서 직접 plot()
메서드를 호출하면 HTML 형태의 인터랙티브한 플롯이 생성됩니다.
1
2
# Flow 인스턴스를 가지고 있다면
flow.plot("my_flow_plot")
현재 디렉토리에 my_flow_plot.html
파일이 생성되며, 웹 브라우저에서 열어볼 수 있습니다.
옵션 2: CLI(Command Line Interface) 사용
CrewAI 프로젝트에서는 CLI를 통해 플롯을 생성할 수 있습니다:
1
crewai flow plot
이 명령어도 plot()
메서드와 동일하게 HTML 파일을 생성하며, 프로젝트 디렉토리에 저장된 플롯 파일을 웹 브라우저에서 열어볼 수 있습니다.
플롯 이해하기
생성된 플롯은 노드 형태로 각 태스크를 표현하며, 화살표는 실행 흐름을 나타냅니다. 인터랙티브 방식으로 확대/축소가 가능하고, 노드에 마우스를 올리면 추가 정보를 확인할 수 있습니다.
플롯을 통해 워크플로우 구조를 시각적으로 파악할 수 있어 디버깅, 최적화, 협업 시 유용하게 활용할 수 있습니다.
결론
플롯 기능은 복잡한 AI 워크플로우를 설계하고 관리하는 데 있어 매우 강력한 도구입니다. plot()
메서드 또는 CLI를 통해 플롯을 생성하면 시각적으로 전체 흐름을 쉽게 이해하고 공유할 수 있습니다.
다음 단계
더 많은 예시를 보고 싶다면, CrewAI의 예제 저장소에서 다양한 Flow 사례를 확인해보세요. 아래는 대표적인 4가지 예시입니다:
- Email Auto Responder Flow
- 무한 루프 기반으로 이메일 자동 응답을 반복 수행
- 반복적인 백그라운드 작업 자동화에 적합
- 예시 보기
- Lead Score Flow
- Router를 활용한 조건 분기 + Human-in-the-loop 피드백 처리 흐름 예시
- 동적 의사결정 및 사용자 피드백 반영이 필요한 워크플로우에 적합
- 예시 보기
- Write a Book Flow
- 여러 크루를 체이닝하여 책의 목차 생성 → 챕터 생성 → 전체 책 작성 흐름 구현
- 복잡한 멀티 스텝 작업 및 크루 간 연동 예시로 적합
- 예시 보기
- Meeting Assistant Flow
- 단일 이벤트 발생 후 Trello, Slack, DB 저장 등 여러 후속 작업을 브로드캐스트로 처리
- 다양한 작업이 동시에 트리거되어야 하는 협업 워크플로우에 적합
- 예시 보기
이 예시들을 통해 반복 작업 자동화, 멀티태스킹 관리, 사람 중심 피드백 등 다양한 활용 방식을 참고할 수 있습니다.
Flow 실행하기
Flow를 실행하는 방법은 두 가지입니다:
1. Flow API 사용
직접 인스턴스를 생성하여 kickoff()
메서드 호출:
1
2
flow = ExampleFlow()
result = flow.kickoff()
2. CLI 사용
CrewAI v0.103.0 이상부터는 crewai run
명령어로 실행하는 것이 권장됩니다:
1
crewai run
pyproject.toml
에 type = "flow"
가 설정되어 있다면 자동으로 Flow로 인식하여 실행됩니다.
하위 호환을 위해 기존 방식도 지원됩니다:
1
crewai flow kickoff
하지만 crewai run
명령이 크루와 플로우 모두에 대응되므로 더 추천되는 방식입니다.