Signal Service¶
Service that manages the complete lifecycle of trading signals - creation, validation, storage, retrieval, aggregation, performance tracking, and conflict resolution. Provides both sync and async interfaces.
Location¶
backend/services/signal_service.py
Architecture¶
API / Orchestration
│
SignalService ←── CacheableMixin (Redis)
│ │
ValidationPipeline BaseService (transactions, retries)
│
SignalRepository (TimescaleDB)
Quick Start¶
from backend.services.signal_service import SignalService
from backend.database.models.enums import SignalAction
svc = SignalService()
# Create a signal (validated, dedup-checked, stored, cached)
signal = svc.create_signal(
db_session,
symbol="AAPL",
action=SignalAction.BUY,
confidence=0.85,
price=152.30,
reasoning="Strong RSI reversal with volume confirmation",
)
# Async variant
signal = await svc.create_signal_async(
async_db_session,
symbol="AAPL",
action=SignalAction.BUY,
confidence=0.85,
price=152.30,
)
API Reference¶
Creation¶
create_signal(db_session, symbol, action, confidence, price, ...)¶
Validates, dedup-checks, persists, and caches a new trading signal.
- Runs the validation pipeline when one is injected.
- Rejects duplicates (same symbol + action within 60 seconds).
- Normalises the symbol to uppercase.
- Caches the latest signal per symbol (TTL 30 min).
create_signal_async(db_session, symbol, action, confidence, price, ...)¶
Async variant of create_signal.
Retrieval¶
get_signal_by_id(db_session, signal_id) / get_signal_by_id_async(...)¶
Retrieves a signal by UUID. Uses cache-aside: checks Redis first, falls back to the database, and populates the cache on miss.
get_latest_signal(db_session, symbol) / get_latest_signal_async(...)¶
Returns the most recent signal for a symbol.
get_signals(db_session, symbol, start?, end?, action?, min_confidence?, limit, offset) / get_signals_async(...)¶
Paginated query with optional filters (time range, action type, minimum confidence).
Aggregation¶
aggregate_signals(db_session, symbol, interval, start, end)¶
Time-bucket aggregation via TimescaleDB time_bucket.
interval |
Bucket size |
|---|---|
1h |
1 hour |
1d |
1 day |
1w |
1 week |
Results are cached for 1 hour.
get_signal_statistics(db_session, symbol, start?, end?)¶
Returns summary statistics including most common reasoning patterns:
{
"total": 42,
"by_action": {"BUY": 20, "SELL": 15, "HOLD": 7},
"avg_confidence": 0.78,
"signals_per_day": 3.5,
"top_reasoning_patterns": [
["Strong RSI reversal", 12],
["MACD crossover", 8]
]
}
Performance Tracking¶
track_signal_outcome(db_session, signal_id, outcome)¶
Attaches a market outcome dict to a signal's extra_metadata for later
accuracy analysis. Invalidates relevant caches.
get_signal_performance(db_session, symbol, start?, end?)¶
Computes performance metrics including Pearson confidence-vs-accuracy correlation:
{
"total_evaluated": 30,
"win_rate": 0.6333,
"avg_pnl": 2.15,
"by_action": {
"BUY": {"count": 20, "wins": 14, "total_pnl": 50.0},
"SELL": {"count": 10, "wins": 5, "total_pnl": 14.5}
},
"confidence_accuracy_correlation": 0.72
}
A positive confidence_accuracy_correlation means higher-confidence
signals tend to be correct more often. null is returned when there are
fewer than 2 evaluated signals or zero variance.
Conflict Resolution¶
detect_signal_conflicts(db_session, symbol, start, end)¶
Finds BUY vs SELL conflicts within a time window. Returns a list of conflict dicts with severity (absolute confidence difference).
resolve_signal_conflict(signals, strategy)¶
Picks a winning signal from a list of conflicting signals. Logs the resolution for audit at INFO level.
| Strategy | Behaviour |
|---|---|
highest_confidence |
Signal with the highest confidence wins |
most_recent |
Most recently created signal wins |
weighted_average |
Signal closest to the group's mean conf |
Duplicate Detection¶
Before persisting, create_signal checks whether a signal with the same
symbol and action was created within the last 60 seconds. If so, a
ConflictError is raised. The window is configurable via
DUPLICATE_WINDOW_SECONDS.
Caching Strategy¶
| Key pattern | TTL | Invalidated on |
|---|---|---|
signal:latest:{SYMBOL} |
30 min | create_signal, track_signal_outcome |
signal:id:{UUID} |
30 min | track_signal_outcome |
signal:recent:{SYMBOL} |
15 min | create_signal |
signal:agg:{SYMBOL}:{INTV} |
1 hour | - |
Validation Pipeline Integration¶
When a ValidationPipeline is injected at construction, every
create_signal call runs the full pipeline before persisting. If
validation fails, a ValidationError is raised and nothing is stored.
from backend.validation.pipeline import ValidationPipeline
from backend.validation.validators.completeness import CompletenessValidator
pipeline = ValidationPipeline()
pipeline.add_validator(CompletenessValidator())
svc = SignalService(validation_pipeline=pipeline)