Stateful Agents

Stateful agents maintain context and information across multiple interactions, enabling them to provide responses that take into account previous conversations and user preferences. This guide covers production patterns for building robust stateful agents with DSPy.rb.

Core Concepts

State vs Memory

State refers to temporary information that agents maintain during a conversation or session:

  • Current conversation context
  • User preferences for the session
  • Temporary calculations or intermediate results

Memory refers to persistent information stored across sessions at the application level:

  • User preferences and settings
  • Historical interactions
  • Learned patterns and behaviors

DSPy.rb provides the building blocks (modules, tools, ReAct agents) while your application manages persistence using your preferred storage backend (database, Redis, etc.).

Production Patterns

1. Session-Based Agent

A session-based agent maintains state during a conversation but doesn’t persist information between sessions:

class SessionAgent < DSPy::Module
  class ConversationSignature < DSPy::Signature
    description "Conversational agent that maintains context"

    input do
      const :user_message, String
      const :session_id, String
    end

    output do
      const :response, String
      const :context_summary, String
    end
  end

  def initialize
    super
    @sessions = {}
    @agent = DSPy::ReAct.new(ConversationSignature, tools: [])
  end

  def forward(user_message:, session_id:)
    # Get or create session context
    session = get_session(session_id)

    # Add context to the message
    contextual_message = build_contextual_message(user_message, session)

    # Get response from agent
    result = @agent.call(
      user_message: contextual_message,
      session_id: session_id
    )

    # Update session context
    update_session(session_id, user_message, result)

    result
  end

  private

  def get_session(session_id)
    @sessions[session_id] ||= {
      messages: [],
      context: "",
      started_at: Time.now
    }
  end

  def build_contextual_message(message, session)
    if session[:messages].empty?
      message
    else
      "Previous context: #{session[:context]}\n\nCurrent message: #{message}"
    end
  end

  def update_session(session_id, message, result)
    session = @sessions[session_id]
    session[:messages] << {
      user: message,
      assistant: result.response,
      timestamp: Time.now
    }
    session[:context] = result.context_summary
  end
end

2. Persistent Agent with Custom Storage

A persistent agent stores information across sessions using application-level storage. Define custom tools that wrap your storage backend:

# Custom tool for storing and retrieving data
class StorageTool < DSPy::Tools::Base
  tool_name "storage"
  tool_description "Store and retrieve key-value data"

  sig { params(action: String, key: String, value: T.nilable(String)).returns(String) }
  def call(action:, key:, value: nil)
    case action
    when "store"
      @store[key] = value
      "Stored '#{key}'"
    when "retrieve"
      @store.fetch(key, "No data found for '#{key}'")
    when "list"
      @store.keys.join(", ")
    else
      "Unknown action: #{action}"
    end
  end

  def initialize
    @store = {}
  end
end

class PersistentAgent < DSPy::Module
  class MemoryAwareSignature < DSPy::Signature
    description "Agent that stores and retrieves user context"

    input do
      const :user_message, String
      const :user_id, String
    end

    output do
      const :response, String
      const :actions_taken, T::Array[String]
    end
  end

  def initialize
    super
    @storage = StorageTool.new

    @agent = DSPy::ReAct.new(
      MemoryAwareSignature,
      tools: [@storage],
      max_iterations: 5
    )
  end

  def forward(user_message:, user_id:)
    result = @agent.call(
      user_message: user_message,
      user_id: user_id
    )

    # Application-level persistence (e.g., database, Redis)
    store_interaction(user_id, user_message, result.response)

    result
  end

  private

  def store_interaction(user_id, message, response)
    # Replace with your persistence layer (ActiveRecord, Redis, etc.)
    @interactions ||= Hash.new { |h, k| h[k] = [] }
    @interactions[user_id] << {
      user_message: message,
      assistant_response: response,
      timestamp: Time.now.iso8601
    }
  end
end

3. Multi-Context Agent

An agent that maintains different types of context and state:

class MultiContextAgent < DSPy::Module
  class ContextualSignature < DSPy::Signature
    description "Agent with rich context management"

    input do
      const :user_message, String
      const :user_id, String
      const :session_id, String
    end

    output do
      const :response, String
      const :confidence, Float
      const :context_used, T::Array[String]
    end
  end

  def initialize
    super
    @sessions = {}

    @agent = DSPy::ReAct.new(
      ContextualSignature,
      tools: DSPy::Tools::TextProcessingToolset.to_tools,
      max_iterations: 6
    )
  end

  def forward(user_message:, user_id:, session_id:)
    # 1. Get session context
    session_context = get_session_context(session_id)

    # 2. Build enriched context
    enriched_message = build_enriched_context(
      user_message,
      user_id,
      session_context
    )

    # 3. Get agent response
    result = @agent.call(
      user_message: enriched_message,
      user_id: user_id,
      session_id: session_id
    )

    # 4. Update contexts
    update_contexts(user_id, session_id, user_message, result)

    result
  end

  private

  def get_session_context(session_id)
    @sessions[session_id] ||= {
      turn_count: 0,
      topics: [],
      sentiment: "neutral",
      last_activity: Time.now
    }
  end

  def build_enriched_context(message, user_id, session_context)
    context_parts = [
      "User message: #{message}",
      "Session context: #{session_context[:turn_count]} turns, topics: #{session_context[:topics].join(', ')}"
    ]

    context_parts.join("\n")
  end

  def update_contexts(user_id, session_id, message, result)
    session = @sessions[session_id]
    session[:turn_count] += 1
    session[:last_activity] = Time.now
  end
end

4. Adaptive Learning Agent

An agent that learns from interactions and adapts its behavior:

class AdaptiveLearningAgent < DSPy::Module
  class LearningSignature < DSPy::Signature
    description "Agent that learns from interactions and adapts"

    input do
      const :user_message, String
      const :user_id, String
    end

    output do
      const :response, String
      const :learned_patterns, T::Array[String]
      const :adaptation_notes, String
    end
  end

  def initialize
    super
    @user_profiles = Hash.new { |h, k| h[k] = { patterns: [], interactions: 0 } }

    @agent = DSPy::ReAct.new(
      LearningSignature,
      tools: DSPy::Tools::TextProcessingToolset.to_tools,
      max_iterations: 8
    )
  end

  def forward(user_message:, user_id:)
    # Get user's interaction history for learning
    user_profile = @user_profiles[user_id]

    # Build adaptive prompt
    adaptive_message = build_adaptive_prompt(
      user_message,
      user_id,
      user_profile
    )

    result = @agent.call(
      user_message: adaptive_message,
      user_id: user_id
    )

    # Learn from this interaction
    learn_from_interaction(user_id, result)

    result
  end

  private

  def build_adaptive_prompt(message, user_id, profile)
    prompt_parts = [
      "User message: #{message}",
      "User ID: #{user_id}",
      "Interaction count: #{profile[:interactions]}",
      "Known patterns: #{profile[:patterns].last(5).join(', ')}"
    ]

    prompt_parts.join("\n")
  end

  def learn_from_interaction(user_id, result)
    profile = @user_profiles[user_id]
    profile[:interactions] += 1
    profile[:patterns].concat(result.learned_patterns)

    # Keep only recent patterns
    profile[:patterns] = profile[:patterns].last(50)
  end
end

Error Handling and Resilience

Failure Recovery

class ResilientAgent < DSPy::Module
  def initialize
    super
    @fallback_context = {}

    @agent = DSPy::ReAct.new(
      AgentSignature,
      tools: DSPy::Tools::TextProcessingToolset.to_tools
    )
  end

  def forward(user_message:, user_id:)
    begin
      result = @agent.call(
        user_message: user_message,
        user_id: user_id
      )

      # Store in fallback context as backup
      store_fallback(user_id, user_message, result.response)

      result
    rescue => e
      # Fall back to session-only mode
      DSPy.logger.warning("Agent failed: #{e.message}")
      fallback_response(user_message, user_id)
    end
  end

  private

  def store_fallback(user_id, message, response)
    @fallback_context[user_id] ||= []
    @fallback_context[user_id] << {
      message: message,
      response: response,
      timestamp: Time.now
    }

    # Keep only last 10 interactions per user
    @fallback_context[user_id] = @fallback_context[user_id].last(10)
  end

  def fallback_response(message, user_id)
    # Use fallback context for simple response
    context = @fallback_context[user_id]&.last(3) || []

    class SimpleSignature < DSPy::Signature
      description "Simple response without tools"

      input do
        const :message, String
        const :context, String
      end

      output do
        const :response, String
      end
    end

    simple_agent = DSPy::Predict.new(SimpleSignature)
    simple_agent.call(
      message: message,
      context: context.to_json
    )
  end
end

State Corruption Recovery

class StateRecoveryAgent < DSPy::Module
  class RecoverySignature < DSPy::Signature
    description "Agent with state recovery capabilities"

    input do
      const :user_message, String
      const :user_id, String
    end

    output do
      const :response, String
    end
  end

  def initialize
    super
    @state_version = 1
    @state = Hash.new { |h, k| h[k] = {} }
    @agent = DSPy::ReAct.new(RecoverySignature, tools: [])
  end

  def forward(user_message:, user_id:)
    # Check state integrity
    unless state_valid?(user_id)
      recover_state(user_id)
    end

    result = @agent.call(
      user_message: user_message,
      user_id: user_id
    )

    # Validate result before storing
    if result_valid?(result)
      store_with_checksum(user_id, result)
    else
      DSPy.logger.error("Invalid result detected for user #{user_id}")
    end

    result
  end

  private

  def state_valid?(user_id)
    state = @state[user_id]
    return true if state.empty?

    # Verify checksum integrity
    state[:checksum] == Digest::MD5.hexdigest(state[:response].to_s)
  end

  def recover_state(user_id)
    DSPy.logger.info("Recovering state for user #{user_id}")
    @state[user_id] = {}
  end

  def result_valid?(result)
    result.respond_to?(:response) &&
    result.response.is_a?(String) &&
    result.response.length > 0
  end

  def store_with_checksum(user_id, result)
    @state[user_id] = {
      response: result.response,
      checksum: Digest::MD5.hexdigest(result.response),
      version: @state_version,
      timestamp: Time.now.iso8601
    }
  end
end

Performance Considerations

State Management Optimization

class OptimizedStatefulAgent < DSPy::Module
  class OptimizedSignature < DSPy::Signature
    description "Optimized agent with state cleanup"

    input do
      const :user_message, String
      const :user_id, String
    end

    output do
      const :response, String
    end
  end

  MAX_HISTORY = 50
  CLEANUP_INTERVAL = 100

  def initialize
    super
    @interaction_count = 0
    @user_history = Hash.new { |h, k| h[k] = [] }
    @agent = DSPy::ReAct.new(OptimizedSignature, tools: [])
  end

  def forward(user_message:, user_id:)
    # Periodically trigger cleanup
    cleanup_old_state(user_id) if should_cleanup?

    result = @agent.call(
      user_message: user_message,
      user_id: user_id
    )

    # Store only essential information
    store_essential_data(user_id, user_message, result)

    result
  end

  private

  def should_cleanup?
    @interaction_count += 1
    @interaction_count % CLEANUP_INTERVAL == 0
  end

  def cleanup_old_state(user_id)
    history = @user_history[user_id]
    @user_history[user_id] = history.last(MAX_HISTORY) if history.size > MAX_HISTORY
  end

  def store_essential_data(user_id, message, result)
    return if result.response.length < 10

    @user_history[user_id] << {
      message_summary: message.length > 100 ? "#{message[0..97]}..." : message,
      timestamp: Time.now.iso8601
    }
  end
end

Testing Stateful Agents

Unit Testing

RSpec.describe SessionAgent do
  let(:agent) { described_class.new }

  describe '#forward' do
    it 'maintains session context across calls', vcr: { cassette_name: "session_agent_context" } do
      session_id = "test_session"

      # First interaction
      result1 = agent.call(
        user_message: "My name is Alice",
        session_id: session_id
      )

      # Second interaction
      result2 = agent.call(
        user_message: "What is my name?",
        session_id: session_id
      )

      expect(result2.response).to include("Alice")
    end

    it 'isolates different sessions', vcr: { cassette_name: "session_agent_isolation" } do
      agent.call(
        user_message: "My name is Alice",
        session_id: "session_1"
      )

      result = agent.call(
        user_message: "What is my name?",
        session_id: "session_2"
      )

      expect(result.response).not_to include("Alice")
    end
  end
end

Integration Testing

RSpec.describe "Stateful Agent Integration", vcr: { cassette_name: "stateful_agent_integration" } do
  let(:agent) { MultiContextAgent.new }

  it 'maintains context across a conversation' do
    session_id = "test_session_#{Time.now.to_i}"

    # Conversation flow
    responses = [
      "I'm planning a trip to Japan",
      "What's the weather like there?",
      "Should I pack warm clothes?"
    ].map do |message|
      agent.call(
        user_message: message,
        user_id: "test_user",
        session_id: session_id
      )
    end

    # Verify context awareness
    expect(responses.last.response).to include("Japan")
    expect(responses.last.context_used).to include("trip")
  end
end

Best Practices

1. State Management

  • Use application-level storage: Manage persistence with your preferred backend (database, Redis, etc.)
  • Keep state minimal: Don’t store every interaction; focus on important information
  • Clean up regularly: Implement cleanup strategies to prevent unbounded growth

2. Error Handling

  • Graceful degradation: Provide fallback behavior when state is unavailable
  • State validation: Validate stored state before using it
  • Recovery mechanisms: Implement ways to recover from corrupted state

3. Performance

  • Batch operations: Store multiple state updates in batches when possible
  • Selective storage: Only store information that will be useful later
  • Monitor usage: Track state management performance in production

4. Privacy and Security

  • Data minimization: Store only necessary information
  • User consent: Ensure users understand what information is being stored
  • Secure storage: Use appropriate security measures for sensitive data

Common Pitfalls

1. Unbounded State Growth

# BAD: Storing too much detail
@history[user_id] << full_conversation_transcript  # Grows forever

# GOOD: Store essential information with limits
@history[user_id] << conversation_summary
@history[user_id] = @history[user_id].last(50)  # Bounded

2. Context Confusion

# BAD: Mixing contexts
def build_context(user_id, session_id)
  all_history = @history[user_id]  # Too broad
  all_history.join("\n")
end

# GOOD: Focused context
def build_context(user_id, session_id)
  recent = @history[user_id].last(5)  # Relevant subset
  recent.map { |h| h[:summary] }.join("\n")
end

3. State Inconsistency

# BAD: Not validating state
def use_stored_preference(user_id)
  pref = get_user_preference(user_id)
  pref.value  # Could be nil or invalid
end

# GOOD: Validate before use
def use_stored_preference(user_id)
  pref = get_user_preference(user_id)
  return default_preference unless pref&.valid?
  pref.value
end

Stateful agents require careful design and implementation, but they enable much more sophisticated and personalized user experiences. By following these patterns and best practices, you can build robust agents that maintain context effectively while handling edge cases gracefully.