CrewAI enables you to build multi-agent teams (crews) that collaborate on complex tasks. By integrating Cognis, you can give your crews persistent memory that spans across crew executions — agents remember user preferences, past results, and context from previous runs.What you’ll build: A travel-planning crew with a researcher and planner agent, where Cognis injects user preferences into agent backstories and stores results for future reference.Integration pattern: Manual memory injection — Cognis runs outside the crew to enrich agents before execution and store results after.
from lyzr import Cognis, CognisMessagefrom crewai import Agent, Task, Crewcog = Cognis()# 1. Retrieve user preferencesmemories = cog.search(query="travel preferences", owner_id="user_123", limit=5)memory_context = "\n".join(f"- {m.content}" for m in memories)# 2. Inject memories into agent backstoryagent = Agent( role="Travel Researcher", goal="Find the best travel options", backstory=f"You know this about the user:\n{memory_context}",)# 3. Run the crewtask = Task(description="Research beach destinations", expected_output="Top 3 destinations", agent=agent)crew = Crew(agents=[agent], tasks=[task])result = crew.kickoff()# 4. Store the resultcog.add( messages=[ CognisMessage(role="user", content="Plan a beach vacation"), CognisMessage(role="assistant", content=str(result)), ], owner_id="user_123", agent_id="travel_crew",)
from cognis import Cognisfrom crewai import Agent, Task, Crewm = Cognis(owner_id="user_123", agent_id="travel_crew")# 1. Retrieve user preferencesresp = m.search("travel preferences", limit=5)memory_context = "\n".join(f"- {r['content']}" for r in resp["results"])# 2. Inject memories into agent backstoryagent = Agent( role="Travel Researcher", goal="Find the best travel options", backstory=f"You know this about the user:\n{memory_context}",)# 3. Run the crewtask = Task(description="Research beach destinations", expected_output="Top 3 destinations", agent=agent)crew = Crew(agents=[agent], tasks=[task])result = crew.kickoff()# 4. Store the resultm.add([ {"role": "user", "content": "Plan a beach vacation"}, {"role": "assistant", "content": str(result)},])m.close()
import osfrom crewai import Agent, Task, Crew, Processfrom lyzr import Cognis, CognisMessagecog = Cognis(api_key=os.getenv("LYZR_API_KEY"))OWNER_ID = "traveler_007"AGENT_ID = "travel_crew"def get_memory_context(query: str, owner_id: str) -> str: """Retrieve relevant memories and format for agent backstory.""" results = cog.search(query=query, owner_id=owner_id, limit=5) if not results: return "No prior preferences known." return "\n".join(f"- {r.content}" for r in results)def store_crew_result(user_request: str, crew_output: str, owner_id: str): """Persist the planning request and result in Cognis.""" cog.add( messages=[ CognisMessage(role="user", content=user_request), CognisMessage(role="assistant", content=crew_output), ], owner_id=owner_id, agent_id=AGENT_ID, )
from crewai import Agent, Task, Crew, Processfrom cognis import Cognism = Cognis(owner_id="traveler_007", agent_id="travel_crew")def get_memory_context(query: str) -> str: """Retrieve relevant memories and format for agent backstory.""" resp = m.search(query, limit=5) if not resp["results"]: return "No prior preferences known." return "\n".join(f"- {r['content']}" for r in resp["results"])def store_crew_result(user_request: str, crew_output: str): """Persist the planning request and result in Cognis.""" m.add([ {"role": "user", "content": user_request}, {"role": "assistant", "content": crew_output}, ])
Step 2: Build the Crew with Memory-Enriched Agents
Hosted (lyzr-adk)
Open Source (lyzr-cognis)
def plan_trip(destination: str, owner_id: str = OWNER_ID) -> str: memory_context = get_memory_context( query=f"travel preferences for {destination}", owner_id=owner_id, ) researcher = Agent( role="Travel Researcher", goal=f"Find the best travel options for {destination}", backstory=f"You are an expert travel researcher. Consider the traveler's known preferences:\n{memory_context}", verbose=True, ) planner = Agent( role="Trip Planner", goal=f"Create a detailed itinerary for {destination}", backstory=f"You create personalized itineraries. You know this about the traveler:\n{memory_context}", verbose=True, ) research_task = Task( description=f"Research top attractions, restaurants, and activities in {destination}.", expected_output="Recommended attractions, restaurants, and activities with descriptions.", agent=researcher, ) planning_task = Task( description=f"Create a 3-day itinerary for {destination} with morning, afternoon, and evening activities.", expected_output="A detailed 3-day itinerary with times, locations, and tips.", agent=planner, ) crew = Crew(agents=[researcher, planner], tasks=[research_task, planning_task], process=Process.sequential, verbose=True) result = crew.kickoff() crew_output = str(result) store_crew_result(f"Plan a trip to {destination}", crew_output, owner_id) return crew_output
def plan_trip(destination: str) -> str: memory_context = get_memory_context(f"travel preferences for {destination}") researcher = Agent( role="Travel Researcher", goal=f"Find the best travel options for {destination}", backstory=f"You are an expert travel researcher. Consider the traveler's known preferences:\n{memory_context}", verbose=True, ) planner = Agent( role="Trip Planner", goal=f"Create a detailed itinerary for {destination}", backstory=f"You create personalized itineraries. You know this about the traveler:\n{memory_context}", verbose=True, ) research_task = Task( description=f"Research top attractions, restaurants, and activities in {destination}.", expected_output="Recommended attractions, restaurants, and activities with descriptions.", agent=researcher, ) planning_task = Task( description=f"Create a 3-day itinerary for {destination} with morning, afternoon, and evening activities.", expected_output="A detailed 3-day itinerary with times, locations, and tips.", agent=planner, ) crew = Crew(agents=[researcher, planner], tasks=[research_task, planning_task], process=Process.sequential, verbose=True) result = crew.kickoff() crew_output = str(result) store_crew_result(f"Plan a trip to {destination}", crew_output) return crew_output
# Seed user preferencescog.add( messages=[ CognisMessage(role="user", content="I love beach destinations and seafood. I prefer boutique hotels."), CognisMessage(role="assistant", content="Noted! You prefer beaches, seafood, and boutique hotels."), ], owner_id=OWNER_ID, agent_id=AGENT_ID,)# First trip — agents receive preferences via backstoryresult1 = plan_trip("Bali, Indonesia")# Second trip — agents also see the Bali trip in their memoryresult2 = plan_trip("Lisbon, Portugal")
# Seed user preferencesm.add([ {"role": "user", "content": "I love beach destinations and seafood. I prefer boutique hotels."}, {"role": "assistant", "content": "Noted! You prefer beaches, seafood, and boutique hotels."},])# First trip — agents receive preferences via backstoryresult1 = plan_trip("Bali, Indonesia")# Second trip — agents also see the Bali trip in their memoryresult2 = plan_trip("Lisbon, Portugal")m.close()
Create a CrewAI tool that agents can call mid-execution to search memories:
from crewai.tools import tool@tool("Search User Memory")def search_user_memory(query: str) -> str: """Search the user's memory for relevant information.""" # Works with either hosted or OSS — adjust the search call accordingly results = cog.search(query=query, owner_id=OWNER_ID, limit=5) if not results: return "No relevant memories found." return "\n".join(f"- {r.content}" for r in results)researcher = Agent(role="Travel Researcher", tools=[search_user_memory], ...)
Cross-session search (cross_session=True) is a hosted-only feature. Open-source Cognis searches the global (owner_id, agent_id) scope by default, which already spans sessions.
results = cog.search( query="travel history and preferences", owner_id=OWNER_ID, cross_session=True, limit=10,)