Agent-Database Integration¶
Part of Data & Database. The agent-database integration layer connects all trading agents (Technical, Fundamental, Sentiment, News, Risk) with the persistence layer. It enables storing agent analyses in TimescaleDB, retrieving similar past situations via pgvector similarity search, caching results in Redis, and tracking debate outcomes. The services are designed for use by the orchestrator or by application code; instantiate and pass a database session (and optional Redis cache) as needed.
Components¶
| Component | Location | Purpose |
|---|---|---|
AgentDatabaseService |
backend/services/agent_database.py |
Store and query agent analyses |
DebateDatabaseService |
backend/services/debate_database.py |
Manage debate lifecycle and transcripts |
AgentCache |
backend/services/agent_cache.py |
Redis caching for agent analyses |
AgentPerformance |
backend/services/agent_performance.py |
Track prediction accuracy and metrics |
Database Models¶
| Model | Table | Storage |
|---|---|---|
AgentAnalysis |
agent_analyses |
TimescaleDB hypertable |
TechnicalAnalysis |
technical_analyses |
Specialized technical data |
FundamentalAnalysis |
fundamental_analyses |
Valuation and financial health |
SentimentAnalysis |
sentiment_analyses |
Sentiment scores and sources |
NewsAnalysis |
news_analyses |
News items and impact scores |
RiskAnalysis |
risk_analyses |
Risk scores and factors |
AgentMemory |
agent_memory |
pgvector embeddings for similarity search |
DebateSession |
debate_sessions |
Debate session metadata |
DebateMessage |
debate_messages |
TimescaleDB hypertable for transcripts |
DebateVote |
debate_votes |
Agent votes with confidence and reasoning |
Agent Analysis Storage¶
Storing Analyses¶
The AgentDatabaseService provides type-specific methods for each agent:
from backend.services import AgentDatabaseService, AgentCache
from backend.database.models.enums import AgentType, TrendDirection, SentimentLabel, RiskLevel
# Initialize with optional Redis cache
cache = AgentCache() # connects to Redis
service = AgentDatabaseService(cache=cache)
# Store a technical analysis
service.store_technical_analysis(
db_session,
symbol="AAPL",
analysis_text="Golden cross detected. RSI at 62.",
confidence=0.85,
indicators={"rsi": 62.3, "macd": 1.45, "sma_50": 155.2},
trend=TrendDirection.BULLISH,
support_levels=[148.0, 142.5],
resistance_levels=[162.0, 170.0],
session_id=session_id, # links to trading session
)
# Store a sentiment analysis
service.store_sentiment_analysis(
db_session,
symbol="AAPL",
analysis_text="Positive news coverage post-earnings",
confidence=0.90,
sentiment_score=0.78,
sentiment_label=SentimentLabel.POSITIVE,
sources={"news_count": 15, "positive_ratio": 0.8},
)
Each store_* method writes to both the specialized table and the unified agent_analyses table, then updates the Redis cache.
Querying Analyses¶
# Latest analyses for a symbol (all agents)
analyses = service.get_latest_analyses(db_session, symbol="AAPL", limit=5)
# Historical analyses by agent type
from datetime import datetime, timedelta
start = datetime.now() - timedelta(days=30)
history = service.get_historical_analyses(
db_session,
agent_type=AgentType.TECHNICAL,
symbol="AAPL",
start=start,
limit=100,
)
Agent Memory (pgvector Similarity Search)¶
Agent memory enables agents to retrieve relevant past experiences using vector similarity search. Memories are stored with 1536-dimensional embeddings (OpenAI text-embedding-3-small).
Storing Memories¶
service.store_memory(
db_session,
agent_type=AgentType.TECHNICAL,
situation="RSI below 30 with MACD bullish crossover on high volume",
recommendation="Strong buy signal, similar to Q3 2024 pattern",
embedding=embedding_vector, # list[float], 1536 dimensions
outcome="Price rose 12% in 2 weeks", # optional, for learning
extra_metadata={"symbol": "AAPL", "rsi": 28.5},
)
Retrieving Similar Memories¶
The find_similar_memories method uses pgvector cosine distance with an HNSW index for fast approximate nearest neighbor search:
results = service.find_similar_memories(
db_session,
embedding=query_embedding,
agent_type=AgentType.TECHNICAL, # optional filter
limit=5,
threshold=0.7, # minimum cosine similarity
)
for memory, similarity_score in results:
print(f"Score: {similarity_score:.3f}")
print(f"Situation: {memory.situation}")
print(f"Recommendation: {memory.recommendation}")
HNSW Index Configuration¶
The HNSW index is created in Alembic migration alembic/versions/002_initial_schema.py:
CREATE INDEX IF NOT EXISTS agent_memory_embedding_hnsw_idx ON agent_memory
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
| Parameter | Value | Effect |
|---|---|---|
m |
16 | Connections per node (higher = better recall, more memory) |
ef_construction |
64 | Build-time search width (higher = better index quality) |
| Distance metric | cosine | 1 - cosine_similarity. A distance of 0 means identical vectors (similarity 1), while 2 means opposite vectors (similarity -1). |
Redis Caching¶
The AgentCache caches the latest analysis per agent type and symbol with a 30-minute TTL. Connection uses the same Redis settings as the rest of the system (REDIS_HOST, REDIS_PORT, etc.); see Configuration Reference. TTL is configurable when constructing AgentCache(default_ttl=...) (default 1800 seconds).
Key Format¶
Example: agent:TECHNICAL:AAPL:latest
Usage¶
from backend.services import AgentCache
cache = AgentCache(default_ttl=1800) # 30 minutes
# Get cached analysis
data = cache.get(AgentType.TECHNICAL, "AAPL")
# Get all cached analyses for a symbol
all_data = cache.get_all_for_symbol("AAPL")
# Invalidate on new data
cache.invalidate(AgentType.TECHNICAL, "AAPL")
cache.invalidate_symbol("AAPL") # all agent types
Caching is automatic when using AgentDatabaseService with a cache instance.
Debate Storage¶
The DebateDatabaseService manages the full debate lifecycle.
Debate Lifecycle¶
Schema¶
DebateSession stores the debate metadata:
| Field | Type | Description |
|---|---|---|
session_id |
UUID | Reference to main trading session |
symbol |
String | Stock symbol being debated |
phase |
Enum | RESEARCH, RISK_ASSESSMENT, FINAL_DECISION |
start_time |
DateTime | When the debate started |
end_time |
DateTime | When the debate ended |
final_decision |
JSONB | Final decision (action, confidence, consensus) |
DebateMessage stores individual agent arguments (TimescaleDB hypertable):
| Field | Type | Description |
|---|---|---|
debate_session_id |
UUID | Reference to debate session |
agent_type |
Enum | Agent posting the message |
message |
Text | Message content |
timestamp |
DateTime | When posted |
DebateVote stores agent votes:
| Field | Type | Description |
|---|---|---|
debate_session_id |
UUID | Reference to debate session |
agent_type |
Enum | Agent casting the vote |
vote |
Enum | BULLISH, BEARISH, NEUTRAL |
confidence |
Float | Confidence in vote (0-1) |
reasoning |
Text | Explanation for the vote |
Usage¶
from backend.services import DebateDatabaseService
from backend.database.models.enums import AgentType, DebatePhase, VoteType
debate_svc = DebateDatabaseService()
# 1. Create session
debate = debate_svc.create_session(db_session, symbol="AAPL", phase=DebatePhase.RESEARCH)
# 2. Record messages
debate_svc.record_message(
db_session, debate.id, AgentType.TECHNICAL,
"RSI oversold at 28. MACD bullish crossover forming."
)
# 3. Record votes
debate_svc.record_vote(
db_session, debate.id, AgentType.TECHNICAL,
vote=VoteType.BULLISH, confidence=0.85,
reasoning="Strong technical setup with oversold bounce pattern"
)
# 4. Finalize
debate_svc.finalize_session(
db_session, debate.id,
final_decision={"action": "BUY", "confidence": 0.80}
)
# Query voting patterns
patterns = debate_svc.get_voting_patterns(db_session, debate.id)
# Returns: {total_votes, breakdown: {BULLISH: {count, avg_confidence}}, by_agent: {...}}
Performance Tracking¶
The AgentPerformance service evaluates agent prediction accuracy by comparing past analyses against recorded market outcomes.
Recording Outcomes¶
After the market reveals the actual result, record it against the original analysis:
from backend.services import AgentPerformance
perf = AgentPerformance()
perf.record_outcome(db_session, analysis_id, {
"direction": "BULLISH",
"price_change": 5.2,
"timeframe_days": 5,
})
Querying Metrics¶
# Accuracy by agent type
accuracy = perf.get_accuracy(db_session, AgentType.TECHNICAL, symbol="AAPL")
# Returns: {total_predictions, correct_predictions, accuracy, by_direction}
# Precision / Recall / F1
metrics = perf.get_metrics(db_session, AgentType.TECHNICAL)
# Returns: {per_direction: {BULLISH: {precision, recall, f1}}, macro_precision, ...}
# Confidence calibration
calibration = perf.get_confidence_calibration(db_session, AgentType.TECHNICAL, buckets=10)
# Returns: [{bucket_min, bucket_max, count, accuracy}, ...]
# Summary across all agents
summary = perf.get_performance_summary(db_session)
Testing¶
Unit Tests¶
Unit tests use mocked repositories and Redis (no database required):
pytest tests/services/test_agent_cache.py
pytest tests/services/test_agent_database.py
pytest tests/services/test_agent_performance.py
pytest tests/services/test_debate_database.py
pytest tests/database/repositories/test_agent_analysis.py
pytest tests/database/repositories/test_agent_memory.py
pytest tests/database/repositories/test_debate.py
Integration Tests¶
Integration tests require Docker services (PostgreSQL + TimescaleDB + pgvector + Redis):
docker-compose up -d postgres redis
pytest tests/integration/test_agent_database.py -m integration
pytest tests/integration/test_agent_memory_search.py -m integration
pytest tests/integration/test_debate_storage.py -m integration
Tests skip automatically if PostgreSQL is unavailable.