Skip to content

Signal Service

Service that manages the complete lifecycle of trading signals - creation, validation, storage, retrieval, aggregation, performance tracking, and conflict resolution. Provides both sync and async interfaces.

Location

backend/services/signal_service.py

Architecture

API / Orchestration
   SignalService  ←── CacheableMixin (Redis)
        │                  │
   ValidationPipeline   BaseService (transactions, retries)
   SignalRepository  (TimescaleDB)

Quick Start

from backend.services.signal_service import SignalService
from backend.database.models.enums import SignalAction

svc = SignalService()

# Create a signal (validated, dedup-checked, stored, cached)
signal = svc.create_signal(
    db_session,
    symbol="AAPL",
    action=SignalAction.BUY,
    confidence=0.85,
    price=152.30,
    reasoning="Strong RSI reversal with volume confirmation",
)

# Async variant
signal = await svc.create_signal_async(
    async_db_session,
    symbol="AAPL",
    action=SignalAction.BUY,
    confidence=0.85,
    price=152.30,
)

API Reference

Creation

create_signal(db_session, symbol, action, confidence, price, ...)

Validates, dedup-checks, persists, and caches a new trading signal.

  • Runs the validation pipeline when one is injected.
  • Rejects duplicates (same symbol + action within 60 seconds).
  • Normalises the symbol to uppercase.
  • Caches the latest signal per symbol (TTL 30 min).

create_signal_async(db_session, symbol, action, confidence, price, ...)

Async variant of create_signal.

Retrieval

get_signal_by_id(db_session, signal_id) / get_signal_by_id_async(...)

Retrieves a signal by UUID. Uses cache-aside: checks Redis first, falls back to the database, and populates the cache on miss.

get_latest_signal(db_session, symbol) / get_latest_signal_async(...)

Returns the most recent signal for a symbol.

get_signals(db_session, symbol, start?, end?, action?, min_confidence?, limit, offset) / get_signals_async(...)

Paginated query with optional filters (time range, action type, minimum confidence).

Aggregation

aggregate_signals(db_session, symbol, interval, start, end)

Time-bucket aggregation via TimescaleDB time_bucket.

interval Bucket size
1h 1 hour
1d 1 day
1w 1 week

Results are cached for 1 hour.

get_signal_statistics(db_session, symbol, start?, end?)

Returns summary statistics including most common reasoning patterns:

{
  "total": 42,
  "by_action": {"BUY": 20, "SELL": 15, "HOLD": 7},
  "avg_confidence": 0.78,
  "signals_per_day": 3.5,
  "top_reasoning_patterns": [
    ["Strong RSI reversal", 12],
    ["MACD crossover", 8]
  ]
}

Performance Tracking

track_signal_outcome(db_session, signal_id, outcome)

Attaches a market outcome dict to a signal's extra_metadata for later accuracy analysis. Invalidates relevant caches.

get_signal_performance(db_session, symbol, start?, end?)

Computes performance metrics including Pearson confidence-vs-accuracy correlation:

{
  "total_evaluated": 30,
  "win_rate": 0.6333,
  "avg_pnl": 2.15,
  "by_action": {
    "BUY": {"count": 20, "wins": 14, "total_pnl": 50.0},
    "SELL": {"count": 10, "wins": 5, "total_pnl": 14.5}
  },
  "confidence_accuracy_correlation": 0.72
}

A positive confidence_accuracy_correlation means higher-confidence signals tend to be correct more often. null is returned when there are fewer than 2 evaluated signals or zero variance.

Conflict Resolution

detect_signal_conflicts(db_session, symbol, start, end)

Finds BUY vs SELL conflicts within a time window. Returns a list of conflict dicts with severity (absolute confidence difference).

resolve_signal_conflict(signals, strategy)

Picks a winning signal from a list of conflicting signals. Logs the resolution for audit at INFO level.

Strategy Behaviour
highest_confidence Signal with the highest confidence wins
most_recent Most recently created signal wins
weighted_average Signal closest to the group's mean conf

Duplicate Detection

Before persisting, create_signal checks whether a signal with the same symbol and action was created within the last 60 seconds. If so, a ConflictError is raised. The window is configurable via DUPLICATE_WINDOW_SECONDS.

Caching Strategy

Key pattern TTL Invalidated on
signal:latest:{SYMBOL} 30 min create_signal, track_signal_outcome
signal:id:{UUID} 30 min track_signal_outcome
signal:recent:{SYMBOL} 15 min create_signal
signal:agg:{SYMBOL}:{INTV} 1 hour -

Validation Pipeline Integration

When a ValidationPipeline is injected at construction, every create_signal call runs the full pipeline before persisting. If validation fails, a ValidationError is raised and nothing is stored.

from backend.validation.pipeline import ValidationPipeline
from backend.validation.validators.completeness import CompletenessValidator

pipeline = ValidationPipeline()
pipeline.add_validator(CompletenessValidator())

svc = SignalService(validation_pipeline=pipeline)

Testing

# Unit tests (37 tests)
pytest tests/services/test_signal_service.py -v

# Integration tests (18 tests)
pytest tests/integration/test_signal_service_integration.py -v