Observability
DSPy.rb provides an event-driven observability system based on OpenTelemetry. The system eliminates complex monkey-patching while providing powerful, extensible observability features.
Overview
The observability system offers:
- Event System: Simple
DSPy.event()API for structured event emission - Pluggable Listeners: Subscribe to events with pattern matching
- OpenTelemetry Integration: Automatic span creation with semantic conventions
- Langfuse Export: Zero-config export to Langfuse via OpenTelemetry (requires environment variables)
- Type Safety: Sorbet T::Struct event validation
- Non-Blocking Exports: Dedicated single-thread executor keeps telemetry off hot paths
- Zero Breaking Changes: All existing
DSPy.log()calls work unchanged
Installation
Add the observability gems alongside dspy:
gem 'dspy'
gem 'dspy-o11y' # core spans + helpers
gem 'dspy-o11y-langfuse' # Langfuse/OpenTelemetry adapter (optional)
When hacking inside this monorepo, run DSPY_WITH_O11Y=1 DSPY_WITH_O11Y_LANGFUSE=1 bundle install to pull in the sibling gems.
Architecture
The event system is built around three core components:
# Event emission
DSPy.event('llm.generate', provider: 'openai', tokens: 150)
# Event listening
DSPy.events.subscribe('llm.*') { |name, attrs| track_usage(attrs) }
# Custom tracking (pattern for reusable subscribers)
class MyTracker
def initialize
@subscriptions = []
@subscriptions << DSPy.events.subscribe('optimization.*') { |name, attrs| handle_trial(attrs) }
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
end
Dedicated Export Worker
Telemetry export happens on a Concurrent::SingleThreadExecutor, so your LLM workflows never compete with OTLP networking. The queue buffers spans as they finish, and the dedicated worker:
- Drains spans in batches based on configurable thresholds
- Applies exponential backoff on failures without blocking request threads
- Shuts down cleanly during process exit while flushing remaining spans
This design keeps observability reliable while ensuring DSPy.rb stays out of your LLMs’ way.
Quick Start
Basic Event Emission
# Emit events with attributes
DSPy.event('llm.response', {
provider: 'openai',
model: 'gpt-4',
tokens: 150,
duration_ms: 1200
})
# Events automatically create OpenTelemetry spans and log entries
Event Listeners
# Subscribe to specific events
DSPy.events.subscribe('llm.response') do |event_name, attributes|
puts "LLM call: #{attributes[:model]} used #{attributes[:tokens]} tokens"
end
# Pattern matching with wildcards
DSPy.events.subscribe('llm.*') do |event_name, attributes|
track_llm_usage(attributes)
end
# Unsubscribe when done
subscription_id = DSPy.events.subscribe('test.*') { |name, attrs| }
DSPy.events.unsubscribe(subscription_id)
Custom Subscribers
class TokenTracker
attr_reader :total_tokens
def initialize
@total_tokens = 0
@subscriptions = []
subscribe
end
def subscribe
@subscriptions << DSPy.events.subscribe('llm.*') do |event_name, attributes|
tokens = attributes['gen_ai.usage.total_tokens'] || 0
@total_tokens += tokens
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
end
tracker = TokenTracker.new
# Now automatically tracks token usage from any LLM events
Observation Types
DSPy.rb uses Langfuse’s semantic observation types to classify spans correctly in observability systems. These types provide meaningful categorization for different kinds of operations:
Observation Type Classification
# DSPy automatically selects appropriate observation types based on the module:
module_class = DSPy::ChainOfThought
observation_type = DSPy::ObservationType.for_module_class(module_class)
# => DSPy::ObservationType::Chain
# Available observation types:
DSPy::ObservationType::Generation # Direct LLM calls
DSPy::ObservationType::Agent # ReAct (core) and CodeAct (dspy-code_act) agents
DSPy::ObservationType::Tool # Tool invocations
DSPy::ObservationType::Chain # ChainOfThought reasoning
DSPy::ObservationType::Retriever # Memory/document search
DSPy::ObservationType::Embedding # Embedding generation
DSPy::ObservationType::Evaluator # Evaluation modules
DSPy::ObservationType::Span # Generic operations
DSPy::ObservationType::Event # Event emissions
When to Emit Each Type
Generation (generation):
- Direct LLM API calls (OpenAI, Anthropic, etc.)
- Raw prompt-response interactions
- Core inference operations
# Automatically used for:
lm = DSPy::LM.new('openai/gpt-4', api_key: ENV['OPENAI_API_KEY'])
lm.raw_chat([
{ role: 'user', content: 'What is 2+2?' }
])
# Creates span with langfuse.observation.type = 'generation'
Agent (agent):
- Multi-step reasoning agents (ReAct core, CodeAct via dspy-code_act)
- Iterative decision-making processes
- Tool-using autonomous agents
# Automatically used for:
DSPy::ReAct.new(signature, tools: [calculator]).forward(question: "Calculate 15 * 23")
# Creates spans with langfuse.observation.type = 'agent'
Tool (tool):
- External tool invocations
- Function calls within agents
- API integrations
# Automatically used for:
# Tool calls within ReAct agents get langfuse.observation.type = 'tool'
Chain (chain):
- Sequential reasoning operations
- ChainOfThought modules
- Multi-step logical processes
# Automatically used for:
DSPy::ChainOfThought.new(signature).forward(question: "Explain gravity")
# Creates spans with langfuse.observation.type = 'chain'
Retriever (retriever):
- Memory/document search operations
- RAG retrieval steps
- Similarity matching
# Automatically used for:
memory_manager = DSPy::Memory::MemoryManager.new
memory_manager.search_memories("find documents about Ruby")
# Creates spans with langfuse.observation.type = 'retriever'
Embedding (embedding):
- Text embedding generation
- Vector space operations
- Semantic encoding
# Automatically used for:
embedding_engine = DSPy::Memory::LocalEmbeddingEngine.new
embedding_engine.embed("Convert this text to vectors")
# Creates spans with langfuse.observation.type = 'embedding'
Custom Observation Types
For custom modules, specify observation types manually:
class CustomModule < DSPy::Module
def forward_untyped(**input_values)
DSPy::Context.with_span(
operation: 'custom.process',
**DSPy::ObservationType::Evaluator.langfuse_attributes, # Use evaluator type
'custom.attribute' => 'value'
) do |span|
# Your custom logic
result
end
end
end
Built-in Events
DSPy modules automatically emit events following OpenTelemetry semantic conventions:
LLM Events
# Emitted automatically by DSPy::LM (lib/dspy/lm.rb:300)
DSPy.event('lm.tokens', {
'gen_ai.system' => 'openai',
'gen_ai.request.model' => 'gpt-4',
input_tokens: 150,
output_tokens: 50,
total_tokens: 200,
'dspy.signature' => 'QuestionAnswering',
request_id: 'abc123def', # If available
duration: 1.25 # Seconds, if available
})
Module Events
# ChainOfThought reasoning (lib/dspy/chain_of_thought.rb:199)
DSPy.event('chain_of_thought.reasoning_complete', {
'dspy.signature' => 'QuestionAnswering',
'cot.reasoning_steps' => 3,
'cot.reasoning_length' => 245,
'cot.has_reasoning' => true
})
# ReAct iterations (lib/dspy/re_act.rb:424)
DSPy.event('react.iteration_complete', {
iteration: 2,
thought: 'I need to search for information',
action: 'search',
observation: 'Found relevant results'
})
# CodeAct code execution (see dspy-code_act gem)
DSPy.event('codeact.iteration_complete', {
iteration: 1,
code_executed: 'puts "Hello World"',
execution_result: 'Hello World'
})
Type-Safe Events
Create structured events with validation:
# Type-safe LLM event
llm_event = DSPy::Events::LLMEvent.new(
name: 'llm.generate',
provider: 'openai',
model: 'gpt-4',
usage: DSPy::Events::TokenUsage.new(
prompt_tokens: 150,
completion_tokens: 75
),
duration_ms: 1250
)
DSPy.event(llm_event)
# Automatically includes OpenTelemetry semantic conventions
Available Event Types
# Basic event
DSPy::Events::Event.new(name: 'custom.event', attributes: { key: 'value' })
# Module execution event
DSPy::Events::ModuleEvent.new(
name: 'module.forward',
module_name: 'ChainOfThought',
signature_name: 'QuestionAnswering'
)
# Optimization event
DSPy::Events::OptimizationEvent.new(
name: 'optimization.trial_complete',
optimizer_name: 'MIPROv2',
score: 0.85
)
Common Patterns
Token Budget Tracking
class TokenBudgetTracker
attr_reader :total_tokens, :total_cost
def initialize(budget_limit: 10000)
@budget_limit = budget_limit
@total_tokens = 0
@total_cost = 0.0
@subscriptions = []
subscribe
end
def subscribe
@subscriptions << DSPy.events.subscribe('llm.*') do |event_name, attributes|
prompt_tokens = attributes['gen_ai.usage.prompt_tokens'] || 0
completion_tokens = attributes['gen_ai.usage.completion_tokens'] || 0
@total_tokens += prompt_tokens + completion_tokens
# Calculate cost (example pricing)
model = attributes['gen_ai.request.model']
cost_per_1k = model == 'gpt-4' ? 0.03 : 0.002
@total_cost += (@total_tokens / 1000.0) * cost_per_1k
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
def budget_exceeded?
@total_tokens > @budget_limit
end
end
tracker = TokenBudgetTracker.new(budget_limit: 5000)
# Automatically tracks all LLM token usage
Optimization Progress Tracking
class OptimizationTracker
attr_reader :trials, :best_score
def initialize
@trials = []
@best_score = nil
@subscriptions = []
subscribe
end
def subscribe
@subscriptions << DSPy.events.subscribe('optimization.*') do |event_name, attributes|
case event_name
when 'optimization.trial_complete'
score = attributes[:score]
@trials << { trial: attributes[:trial_number], score: score }
@best_score = score if !@best_score || score > @best_score
end
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
end
tracker = OptimizationTracker.new
# Automatically tracks DSPy teleprompters like MIPROv2
Module Performance Tracking
class ModulePerformanceTracker
attr_reader :module_stats
def initialize
@module_stats = Hash.new { |h, k|
h[k] = { total_calls: 0, total_duration: 0, avg_duration: 0 }
}
@subscriptions = []
subscribe
end
def subscribe
@subscriptions << DSPy.events.subscribe('*.complete') do |event_name, attributes|
module_name = event_name.split('.').first
duration = attributes[:duration_ms] || 0
stats = @module_stats[module_name]
stats[:total_calls] += 1
stats[:total_duration] += duration
stats[:avg_duration] = stats[:total_duration] / stats[:total_calls].to_f
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
end
tracker = ModulePerformanceTracker.new
# Tracks ChainOfThought, ReAct, CodeAct performance (CodeAct requires dspy-code_act)
Integration with External Systems
Event Filtering and Routing
# Route different events to different systems
class EventRouter
def initialize(datadog_client:, slack_webhook:)
@datadog = datadog_client
@slack = slack_webhook
@subscriptions = []
subscribe
end
def subscribe
# Send LLM events to Datadog for cost tracking
@subscriptions << DSPy.events.subscribe('llm.*') do |event_name, attributes|
@datadog.increment('dspy.llm.requests', tags: [
"provider:#{attributes['gen_ai.system']}",
"model:#{attributes['gen_ai.request.model']}"
])
end
# Send optimization events to Slack
@subscriptions << DSPy.events.subscribe('optimization.trial_complete') do |event_name, attributes|
if attributes[:score] > 0.9
@slack.send("Trial #{attributes[:trial_number]} achieved #{attributes[:score]} score!")
end
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
end
Custom Analytics
class EventAnalytics
def initialize
@analytics = Concurrent::Hash.new
@subscriptions = []
subscribe
end
def subscribe
@subscriptions << DSPy.events.subscribe('*') do |event_name, attributes|
# Thread-safe analytics collection
category = event_name.split('.').first
@analytics.compute(category) { |old_val| (old_val || 0) + 1 }
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
def report
@analytics.to_h
end
end
Backward Compatibility
All existing DSPy.log() calls automatically benefit from the event system:
# Existing code (unchanged)
DSPy.log('chain_of_thought.reasoning_complete', {
signature_name: 'QuestionAnswering',
reasoning_steps: 3
})
# Now automatically:
# ✅ Logs to stdout/file (same as before)
# ✅ Creates OpenTelemetry spans
# ✅ Notifies event listeners
# ✅ Exports to Langfuse when configured
No code changes required - existing modules get enhanced observability automatically.
Configuration
DSPy.configure do |config|
# Logger configuration (same as before)
config.logger = Dry.Logger(:dspy, formatter: :json)
end
# Events work immediately - no additional setup needed
# Langfuse: Just set environment variables
# Custom subscribers: Create and they start working
Best Practices
-
Use Semantic Names: Follow dot notation (
llm.generate,module.forward) - Clean Up Subscribers: Always call
unsubscribe()when donetracker = MyTracker.new # ... use tracker tracker.unsubscribe # Clean up listeners - Handle Listener Errors: Event system isolates failures
DSPy.events.subscribe('llm.*') do |name, attrs| risky_operation(attrs) rescue => e # Error logged automatically, other listeners continue end - Use OpenTelemetry Conventions: Follow semantic naming for LLM events
DSPy.event('llm.generate', { 'gen_ai.system' => 'openai', # Required 'gen_ai.request.model' => 'gpt-4', # Required 'gen_ai.usage.prompt_tokens' => 100 # Recommended }) - Pattern Matching: Use wildcards for broad tracking
add_subscription('optimization.*') # All optimization events add_subscription('llm.*') # All LLM events add_subscription('*') # All events (careful!)
Troubleshooting
Events Not Triggering Listeners
Check subscription patterns:
# Make sure pattern matches event names
DSPy.events.subscribe('llm.*') # Matches llm.generate, llm.stream
DSPy.events.subscribe('llm') # Only matches exact 'llm'
Memory Leaks with Subscribers
Always unsubscribe when done:
class MyClass
def initialize
@tracker = TokenTracker.new
end
def cleanup
@tracker.unsubscribe # Important!
end
end
Thread Safety
Event system is thread-safe by design:
# Multiple threads can safely emit events
threads = 10.times.map do |i|
Thread.new { DSPy.event('test.event', thread_id: i) }
end
threads.each(&:join)
Langfuse Integration (Zero Configuration)
DSPy.rb includes zero-config Langfuse integration via OpenTelemetry. Simply set your Langfuse environment variables and DSPy will automatically export spans to Langfuse alongside the normal logging.
Note: Integration requires the opentelemetry-sdk and opentelemetry-exporter-otlp gems to be available and proper network connectivity to your Langfuse instance.
🆕 Enhanced in v0.25.0: Comprehensive span reporting improvements including proper input/output capture, hierarchical nesting, accurate timing, token usage tracking, and correct Langfuse observation types (generation, chain, span).
Setup
# Required environment variables
export LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key
export LANGFUSE_SECRET_KEY=sk-lf-your-secret-key
# Optional: specify host (defaults to cloud.langfuse.com)
export LANGFUSE_HOST=https://cloud.langfuse.com # or https://us.cloud.langfuse.com
How It Works
When Langfuse environment variables are detected, DSPy automatically:
- Configures OpenTelemetry SDK with OTLP exporter
- Creates dual output: Both structured logs AND OpenTelemetry spans
- Exports to Langfuse using proper authentication and endpoints
- Falls back gracefully if OpenTelemetry gems are missing or configuration fails
Important: Automatic configuration only occurs when required gems are available and environment variables are properly set. Always verify your setup in development before relying on it in production.
Example Output
With Langfuse configured, your DSPy applications will send traces like this:
In your logs (as usual):
{
"severity": "INFO",
"time": "2025-08-08T22:02:57Z",
"trace_id": "abc-123-def",
"span_id": "span-456",
"parent_span_id": "span-789",
"operation": "ChainOfThought.forward",
"dspy.module": "ChainOfThought",
"event": "span.start"
}
In Langfuse (automatically):
Trace: abc-123-def
├─ ChainOfThought.forward [2000ms]
│ ├─ Module: ChainOfThought
│ ├─ Observation Type: chain
│ └─ llm.generate [1000ms]
│ ├─ Model: gpt-4-0613
│ ├─ Observation Type: generation
│ ├─ Temperature: 0.7
│ ├─ Tokens: 100 in / 50 out / 150 total
│ └─ Cost: $0.0021 (calculated by Langfuse)
Trace Examples by Observation Type
Based on actual DSPy.rb implementation, here’s what traces look like for different observation types:
Generation Type (Direct LLM calls):
Trace: gen-trace-123
├─ llm.generate [800ms]
│ ├─ Observation Type: generation
│ ├─ Provider: openai
│ ├─ Model: gpt-4
│ ├─ Response Model: gpt-4-0613
│ ├─ Input: [{"role":"user","content":"What is 2+2?"}]
│ ├─ Output: "4"
│ └─ Tokens: 10 in / 2 out / 12 total
Chain Type (ChainOfThought reasoning):
Trace: cot-trace-456
├─ ChainOfThought.forward [2100ms]
│ ├─ Observation Type: chain
│ ├─ Signature: QuestionAnswering
│ ├─ Input: {"question":"Explain gravity"}
│ ├─ Output: {"answer":"Gravity is...","reasoning":"..."}
│ └─ llm.generate [1800ms]
│ ├─ Observation Type: generation
│ ├─ Provider: openai
│ ├─ Model: gpt-4
│ └─ Tokens: 45 in / 120 out / 165 total
Agent Type (ReAct multi-step reasoning):
Trace: react-trace-789
├─ ReAct.forward [5200ms]
│ ├─ Observation Type: agent
│ ├─ Signature: AgentSignature
│ ├─ Tools: [calculator, search]
│ ├─ Iterations: 3
│ ├─ Final Answer: "The answer is 42"
│ ├─ llm.generate (Iteration 1) [1200ms]
│ │ ├─ Observation Type: generation
│ │ └─ Tokens: 80 in / 30 out / 110 total
│ ├─ Tool: calculator [50ms]
│ │ ├─ Observation Type: tool
│ │ ├─ Input: "15 * 23"
│ │ └─ Output: "345"
│ ├─ llm.generate (Iteration 2) [1100ms]
│ │ ├─ Observation Type: generation
│ │ └─ Tokens: 95 in / 25 out / 120 total
│ └─ llm.generate (Iteration 3) [900ms]
│ ├─ Observation Type: generation
│ └─ Tokens: 70 in / 20 out / 90 total
GenAI Semantic Conventions
DSPy automatically includes OpenTelemetry GenAI semantic conventions:
# LLM operations automatically include:
{
"gen_ai.system": "openai",
"gen_ai.request.model": "gpt-4",
"gen_ai.response.model": "gpt-4-0613",
"gen_ai.usage.prompt_tokens": 100,
"gen_ai.usage.completion_tokens": 50,
"gen_ai.usage.total_tokens": 150
}
Manual Configuration (Advanced)
For custom OpenTelemetry setups, you can disable auto-configuration and set up manually:
# Disable auto-config by not setting Langfuse env vars
# Then configure OpenTelemetry yourself:
require 'opentelemetry/sdk'
require 'opentelemetry/exporter/otlp'
OpenTelemetry::SDK.configure do |config|
config.service_name = 'my-dspy-app'
# Your custom configuration
end
Dependencies
The Langfuse integration requires these gems (automatically included):
opentelemetry-sdk(~> 1.8)opentelemetry-exporter-otlp(~> 0.30)
If these gems are not available, DSPy gracefully falls back to logging-only mode.
Troubleshooting Langfuse Integration
Spans not appearing in Langfuse:
- Verify environment variables are set correctly
- Check Langfuse host/region (EU vs US)
- Ensure network connectivity to Langfuse endpoints
OpenTelemetry errors:
- Check that required gems are installed:
bundle install - Look for observability error logs:
grep "observability.error" log/production.log
Authentication issues:
- Verify your public and secret keys are correct
- Check that keys have proper permissions in Langfuse dashboard
Score Reporting
DSPy.rb provides a Score Reporting API for exporting evaluation scores to Langfuse. This enables you to track model quality metrics alongside your traces.
Basic Usage
# Create a simple score
DSPy.score('accuracy', 0.95)
# With a comment
DSPy.score('relevance', 0.87, comment: 'High semantic similarity')
# Boolean score
DSPy.score('is_valid', 1, data_type: DSPy::Scores::DataType::Boolean)
# Categorical score
DSPy.score('sentiment', 'positive', data_type: DSPy::Scores::DataType::Categorical)
Score Data Types
DSPy uses Sorbet T::Enum for type-safe score data types:
# Available data types
DSPy::Scores::DataType::Numeric # Default - for 0.0 to 1.0 scores
DSPy::Scores::DataType::Boolean # For pass/fail scores (0 or 1)
DSPy::Scores::DataType::Categorical # For string labels like 'positive', 'negative'
Built-in Evaluators
DSPy provides common evaluators in DSPy::Scores::Evaluators:
# Exact string match (1.0 if equal, 0.0 otherwise)
score = DSPy::Scores::Evaluators.exact_match(
output: prediction.answer,
expected: example.expected_answer,
name: 'answer_accuracy'
)
# Case-insensitive match
score = DSPy::Scores::Evaluators.exact_match(
output: prediction.answer,
expected: example.expected_answer,
ignore_case: true
)
# Substring containment
score = DSPy::Scores::Evaluators.contains(
output: prediction.response,
expected: 'required keyword'
)
# Regex pattern matching
score = DSPy::Scores::Evaluators.regex_match(
output: prediction.email,
pattern: /\A[\w.+-]+@[\w.-]+\.[a-z]{2,}\z/i,
name: 'email_format'
)
# Length validation
score = DSPy::Scores::Evaluators.length_check(
output: prediction.summary,
min_length: 50,
max_length: 200
)
# Levenshtein similarity (0.0 to 1.0)
score = DSPy::Scores::Evaluators.similarity(
output: prediction.answer,
expected: example.expected_answer
)
# JSON validity check
score = DSPy::Scores::Evaluators.json_valid(
output: prediction.json_response
)
Automatic Score Export with Evals
The DSPy::Evals evaluator can automatically export scores for each example:
evaluator = DSPy::Evals.new(
program,
metric: my_metric,
export_scores: true, # Enable automatic score export
score_name: 'qa_accuracy' # Custom score name
)
result = evaluator.evaluate(test_examples)
# Scores are automatically exported for each example
# A batch score is created at the end with overall pass rate
Async Langfuse Export
For production use, configure the async exporter to send scores to Langfuse:
# Configure the exporter (typically in an initializer)
exporter = DSPy::Scores::Exporter.configure(
public_key: ENV['LANGFUSE_PUBLIC_KEY'],
secret_key: ENV['LANGFUSE_SECRET_KEY'],
host: 'https://cloud.langfuse.com' # or your Langfuse host
)
# Scores are now automatically exported in the background
DSPy.score('accuracy', 0.95)
# Shutdown gracefully when done (waits up to 5 seconds by default)
exporter.shutdown
The exporter:
- Uses a background thread with a Thread::Queue
- Automatically subscribes to
score.createevents - Includes retry logic with exponential backoff
- Queues scores for async processing
Context Propagation
Scores automatically inherit the current trace context:
# Inside a traced operation, scores attach to the current trace
DSPy::Context.with_span(operation: 'evaluate_response') do |span|
# This score will be attached to the current trace
DSPy.score('response_quality', 0.92)
end
# Explicit trace_id override
DSPy.score('accuracy', 0.95, trace_id: 'custom-trace-id')
Event-Driven Architecture
Scores emit score.create events that you can subscribe to:
# Subscribe to score events
DSPy.events.subscribe('score.create') do |event_name, attrs|
puts "Score created: #{attrs[:score_name]} = #{attrs[:score_value]}"
# Access all score attributes
# attrs[:score_id]
# attrs[:score_name]
# attrs[:score_value]
# attrs[:score_data_type]
# attrs[:score_comment]
# attrs[:trace_id]
# attrs[:observation_id]
# attrs[:timestamp]
end
Summary
The DSPy.rb event system provides:
- Event API: Simple
DSPy.event()for structured emission - Pluggable Listeners: Subscribe to events with pattern matching
- OpenTelemetry Integration: Automatic span creation and Langfuse export
- Type Safety: Sorbet T::Struct event validation
- Backward Compatibility: Existing
DSPy.log()calls enhanced automatically
Key benefits:
- Zero breaking changes: All existing code works unchanged
- Clean API: Rails-like event system developers expect
- Extensible: Easy to add custom observability providers
- Type safe: Structured events with validation
- Thread safe: Production-ready concurrent access
- No dependencies: Uses existing OpenTelemetry gems
The system eliminates complex monkey-patching while providing powerful observability features. See examples/event_system_demo.rb for hands-on demonstration.