Skip to content

Agent-Database Integration

Part of Data & Database. The agent-database integration layer connects all trading agents (Technical, Fundamental, Sentiment, News, Risk) with the persistence layer. It enables storing agent analyses in TimescaleDB, retrieving similar past situations via pgvector similarity search, caching results in Redis, and tracking debate outcomes. The services are designed for use by the orchestrator or by application code; instantiate and pass a database session (and optional Redis cache) as needed.

Components

Component Location Purpose
AgentDatabaseService backend/services/agent_database.py Store and query agent analyses
DebateDatabaseService backend/services/debate_database.py Manage debate lifecycle and transcripts
AgentCache backend/services/agent_cache.py Redis caching for agent analyses
AgentPerformance backend/services/agent_performance.py Track prediction accuracy and metrics

Database Models

Model Table Storage
AgentAnalysis agent_analyses TimescaleDB hypertable
TechnicalAnalysis technical_analyses Specialized technical data
FundamentalAnalysis fundamental_analyses Valuation and financial health
SentimentAnalysis sentiment_analyses Sentiment scores and sources
NewsAnalysis news_analyses News items and impact scores
RiskAnalysis risk_analyses Risk scores and factors
AgentMemory agent_memory pgvector embeddings for similarity search
DebateSession debate_sessions Debate session metadata
DebateMessage debate_messages TimescaleDB hypertable for transcripts
DebateVote debate_votes Agent votes with confidence and reasoning

Agent Analysis Storage

Storing Analyses

The AgentDatabaseService provides type-specific methods for each agent:

from backend.services import AgentDatabaseService, AgentCache
from backend.database.models.enums import AgentType, TrendDirection, SentimentLabel, RiskLevel

# Initialize with optional Redis cache
cache = AgentCache()  # connects to Redis
service = AgentDatabaseService(cache=cache)

# Store a technical analysis
service.store_technical_analysis(
    db_session,
    symbol="AAPL",
    analysis_text="Golden cross detected. RSI at 62.",
    confidence=0.85,
    indicators={"rsi": 62.3, "macd": 1.45, "sma_50": 155.2},
    trend=TrendDirection.BULLISH,
    support_levels=[148.0, 142.5],
    resistance_levels=[162.0, 170.0],
    session_id=session_id,  # links to trading session
)

# Store a sentiment analysis
service.store_sentiment_analysis(
    db_session,
    symbol="AAPL",
    analysis_text="Positive news coverage post-earnings",
    confidence=0.90,
    sentiment_score=0.78,
    sentiment_label=SentimentLabel.POSITIVE,
    sources={"news_count": 15, "positive_ratio": 0.8},
)

Each store_* method writes to both the specialized table and the unified agent_analyses table, then updates the Redis cache.

Querying Analyses

# Latest analyses for a symbol (all agents)
analyses = service.get_latest_analyses(db_session, symbol="AAPL", limit=5)

# Historical analyses by agent type
from datetime import datetime, timedelta
start = datetime.now() - timedelta(days=30)
history = service.get_historical_analyses(
    db_session,
    agent_type=AgentType.TECHNICAL,
    symbol="AAPL",
    start=start,
    limit=100,
)

Agent memory enables agents to retrieve relevant past experiences using vector similarity search. Memories are stored with 1536-dimensional embeddings (OpenAI text-embedding-3-small).

Storing Memories

service.store_memory(
    db_session,
    agent_type=AgentType.TECHNICAL,
    situation="RSI below 30 with MACD bullish crossover on high volume",
    recommendation="Strong buy signal, similar to Q3 2024 pattern",
    embedding=embedding_vector,  # list[float], 1536 dimensions
    outcome="Price rose 12% in 2 weeks",  # optional, for learning
    extra_metadata={"symbol": "AAPL", "rsi": 28.5},
)

Retrieving Similar Memories

The find_similar_memories method uses pgvector cosine distance with an HNSW index for fast approximate nearest neighbor search:

results = service.find_similar_memories(
    db_session,
    embedding=query_embedding,
    agent_type=AgentType.TECHNICAL,  # optional filter
    limit=5,
    threshold=0.7,  # minimum cosine similarity
)

for memory, similarity_score in results:
    print(f"Score: {similarity_score:.3f}")
    print(f"Situation: {memory.situation}")
    print(f"Recommendation: {memory.recommendation}")

HNSW Index Configuration

The HNSW index is created in Alembic migration alembic/versions/002_initial_schema.py:

CREATE INDEX IF NOT EXISTS agent_memory_embedding_hnsw_idx ON agent_memory
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
Parameter Value Effect
m 16 Connections per node (higher = better recall, more memory)
ef_construction 64 Build-time search width (higher = better index quality)
Distance metric cosine 1 - cosine_similarity. A distance of 0 means identical vectors (similarity 1), while 2 means opposite vectors (similarity -1).

Redis Caching

The AgentCache caches the latest analysis per agent type and symbol with a 30-minute TTL. Connection uses the same Redis settings as the rest of the system (REDIS_HOST, REDIS_PORT, etc.); see Configuration Reference. TTL is configurable when constructing AgentCache(default_ttl=...) (default 1800 seconds).

Key Format

agent:{AGENT_TYPE}:{SYMBOL}:latest

Example: agent:TECHNICAL:AAPL:latest

Usage

from backend.services import AgentCache

cache = AgentCache(default_ttl=1800)  # 30 minutes

# Get cached analysis
data = cache.get(AgentType.TECHNICAL, "AAPL")

# Get all cached analyses for a symbol
all_data = cache.get_all_for_symbol("AAPL")

# Invalidate on new data
cache.invalidate(AgentType.TECHNICAL, "AAPL")
cache.invalidate_symbol("AAPL")  # all agent types

Caching is automatic when using AgentDatabaseService with a cache instance.


Debate Storage

The DebateDatabaseService manages the full debate lifecycle.

Debate Lifecycle

create_session -> record_message (N times) -> record_vote (per agent) -> finalize_session

Schema

DebateSession stores the debate metadata:

Field Type Description
session_id UUID Reference to main trading session
symbol String Stock symbol being debated
phase Enum RESEARCH, RISK_ASSESSMENT, FINAL_DECISION
start_time DateTime When the debate started
end_time DateTime When the debate ended
final_decision JSONB Final decision (action, confidence, consensus)

DebateMessage stores individual agent arguments (TimescaleDB hypertable):

Field Type Description
debate_session_id UUID Reference to debate session
agent_type Enum Agent posting the message
message Text Message content
timestamp DateTime When posted

DebateVote stores agent votes:

Field Type Description
debate_session_id UUID Reference to debate session
agent_type Enum Agent casting the vote
vote Enum BULLISH, BEARISH, NEUTRAL
confidence Float Confidence in vote (0-1)
reasoning Text Explanation for the vote

Usage

from backend.services import DebateDatabaseService
from backend.database.models.enums import AgentType, DebatePhase, VoteType

debate_svc = DebateDatabaseService()

# 1. Create session
debate = debate_svc.create_session(db_session, symbol="AAPL", phase=DebatePhase.RESEARCH)

# 2. Record messages
debate_svc.record_message(
    db_session, debate.id, AgentType.TECHNICAL,
    "RSI oversold at 28. MACD bullish crossover forming."
)

# 3. Record votes
debate_svc.record_vote(
    db_session, debate.id, AgentType.TECHNICAL,
    vote=VoteType.BULLISH, confidence=0.85,
    reasoning="Strong technical setup with oversold bounce pattern"
)

# 4. Finalize
debate_svc.finalize_session(
    db_session, debate.id,
    final_decision={"action": "BUY", "confidence": 0.80}
)

# Query voting patterns
patterns = debate_svc.get_voting_patterns(db_session, debate.id)
# Returns: {total_votes, breakdown: {BULLISH: {count, avg_confidence}}, by_agent: {...}}

Performance Tracking

The AgentPerformance service evaluates agent prediction accuracy by comparing past analyses against recorded market outcomes.

Recording Outcomes

After the market reveals the actual result, record it against the original analysis:

from backend.services import AgentPerformance

perf = AgentPerformance()
perf.record_outcome(db_session, analysis_id, {
    "direction": "BULLISH",
    "price_change": 5.2,
    "timeframe_days": 5,
})

Querying Metrics

# Accuracy by agent type
accuracy = perf.get_accuracy(db_session, AgentType.TECHNICAL, symbol="AAPL")
# Returns: {total_predictions, correct_predictions, accuracy, by_direction}

# Precision / Recall / F1
metrics = perf.get_metrics(db_session, AgentType.TECHNICAL)
# Returns: {per_direction: {BULLISH: {precision, recall, f1}}, macro_precision, ...}

# Confidence calibration
calibration = perf.get_confidence_calibration(db_session, AgentType.TECHNICAL, buckets=10)
# Returns: [{bucket_min, bucket_max, count, accuracy}, ...]

# Summary across all agents
summary = perf.get_performance_summary(db_session)

Testing

Unit Tests

Unit tests use mocked repositories and Redis (no database required):

pytest tests/services/test_agent_cache.py
pytest tests/services/test_agent_database.py
pytest tests/services/test_agent_performance.py
pytest tests/services/test_debate_database.py
pytest tests/database/repositories/test_agent_analysis.py
pytest tests/database/repositories/test_agent_memory.py
pytest tests/database/repositories/test_debate.py

Integration Tests

Integration tests require Docker services (PostgreSQL + TimescaleDB + pgvector + Redis):

docker-compose up -d postgres redis
pytest tests/integration/test_agent_database.py -m integration
pytest tests/integration/test_agent_memory_search.py -m integration
pytest tests/integration/test_debate_storage.py -m integration

Tests skip automatically if PostgreSQL is unavailable.