Event-Driven Backtest
Implement an event-driven backtesting loop that processes bar-by-bar signals, supporting realistic order simulation and state tracking.
Event-Driven Backtest
Overview
This notebook implements an event-driven backtesting engine (Notebook 67) — the natural evolution of the vectorized backtest (Notebook 66).
Instead of applying logic across arrays in bulk, the engine processes a prioritized event queue tick-by-tick on 1-minute candles. Each event is handled in strict chronological order, enabling:
| Feature | Description |
|---|---|
| News Events | Simulated macro announcements that spike volatility and widen spreads |
| ATR-Based Exits | Dynamic stop-loss and take-profit levels computed from Average True Range |
| Volatility Regime Filter | Blocks new entries when ATR exceeds a configurable multiplier |
| Trailing Stops | Stop-loss that ratchets up (long) or down (short) as price moves favorably |
| Time-Based Exits | Automatically close positions that have been open beyond a max bar limit |
| Direction Reversal | Close and flip on signal change — identical to vectorized version |
| Max Drawdown Guard | Hard stop at 50% equity loss |
Event Types (Priority Queue)
Events are processed in this priority order per timestamp:
| Priority | Event Type | Description |
|---|---|---|
| 1 | NEWS | Macro shock — widens spread, may force close |
| 2 | ATR_EXIT | Volatility-driven stop triggered |
| 3 | TRAILING_STOP | Trailing stop updated or triggered |
| 4 | TP_HIT | Take-profit level breached |
| 5 | SL_HIT | Stop-loss level breached |
| 6 | TIME_EXIT | Maximum hold duration exceeded |
| 7 | SIGNAL | New directional prediction from model |
Tick-level precision on 1-minute OHLCV data. For bulk vectorized approximation, refer to Notebook 66.
Step 1 — Install Dependencies
# Install required packages
!pip install pandas numpy --quiet
Step 2 — Import Libraries
import pandas as pd
import numpy as np
import heapq # Min-heap for the priority event queue
import warnings
from dataclasses import dataclass, field
from typing import Optional
from enum import IntEnum
warnings.filterwarnings('ignore')
pd.set_option('display.float_format', '{:.6f}'.format)
pd.set_option('display.max_columns', 20)
pd.set_option('display.width', 140)
print(f"pandas version : {pd.__version__}")
print(f"numpy version : {np.__version__}")
print("Environment ready.")
pandas version : 2.2.2 numpy version : 2.0.2 Environment ready.
Step 3 — Event System
Event Priority Enum
Each event type carries an integer priority. Lower number = processed first within the same timestamp. The priority ordering ensures news shocks are handled before TP/SL checks, and TP/SL before new signal entries.
class EventPriority(IntEnum):
"""
Processing priority for events sharing the same timestamp.
Lower value = processed first.
"""
NEWS = 1 # Macro shock — highest priority
ATR_EXIT = 2 # Volatility-driven exit
TRAILING_STOP = 3 # Trailing stop update/trigger
TP_HIT = 4 # Take-profit trigger
SL_HIT = 5 # Stop-loss trigger
TIME_EXIT = 6 # Max hold duration exceeded
SIGNAL = 7 # Directional model prediction — lowest priority
@dataclass(order=True)
class Event:
"""
A single event in the simulation queue.
Sorting key: (timestamp, priority) — heapq uses __lt__ via dataclass ordering.
Attributes
----------
timestamp : np.datetime64
When the event fires.
priority : int
Processing order within the same timestamp (EventPriority values).
event_type : str
Human-readable event label (e.g., 'SIGNAL', 'NEWS', 'TP_HIT').
payload : dict
Arbitrary event data (direction, price levels, news magnitude, etc.).
"""
timestamp : object # np.datetime64 — sortable
priority : int # EventPriority value
event_type : str = field(compare=False)
payload : dict = field(default_factory=dict, compare=False)
print("Event system defined.")
print(f" Event priorities: { {e.name: int(e) for e in EventPriority} }")
Event system defined.
Event priorities: {'NEWS': 1, 'ATR_EXIT': 2, 'TRAILING_STOP': 3, 'TP_HIT': 4, 'SL_HIT': 5, 'TIME_EXIT': 6, 'SIGNAL': 7}
Step 4 — ATR Calculator
The Average True Range (ATR) measures market volatility by averaging the true range over a lookback window.
True Range = max(High − Low, |High − Prev_Close|, |Low − Prev_Close|)
ATR(n) = EMA(True Range, n)
ATR serves two roles in this engine:
- Dynamic TP/SL sizing — TP = entry ± ATR ×
atr_tp_mult, SL = entry ∓ ATR ×atr_sl_mult - Volatility regime filter — If ATR > threshold, new entries are blocked
def compute_atr(np_1m: np.ndarray,
idx_high: int,
idx_low: int,
idx_close: int,
period: int = 14) -> np.ndarray:
"""
Compute ATR for each row of a 1-minute OHLCV array.
Parameters
----------
np_1m : np.ndarray — full 1-minute candle array
idx_high : int — column index for High
idx_low : int — column index for Low
idx_close: int — column index for Close
period : int — ATR lookback (default 14)
Returns
-------
np.ndarray of shape (n,) — ATR value at each row (NaN for first `period` rows)
"""
highs = np_1m[:, idx_high].astype(float)
lows = np_1m[:, idx_low].astype(float)
closes = np_1m[:, idx_close].astype(float)
n = len(highs)
tr = np.empty(n)
tr[0] = highs[0] - lows[0]
for i in range(1, n):
hl = highs[i] - lows[i]
hpc = abs(highs[i] - closes[i - 1])
lpc = abs(lows[i] - closes[i - 1])
tr[i] = max(hl, hpc, lpc)
# Wilder smoothing (equivalent to EMA with alpha = 1/period)
atr = np.full(n, np.nan)
atr[period - 1] = np.mean(tr[:period])
alpha = 1.0 / period
for i in range(period, n):
atr[i] = alpha * tr[i] + (1 - alpha) * atr[i - 1]
return atr
print("ATR calculator defined.")ATR calculator defined.
Step 5 — Mock Data Generation
Generates synthetic 1-minute OHLCV data and model predictions.
News events are randomly injected at ~2% of all timestamps to simulate macro announcements (e.g., Fed rate decisions, CPI prints). Each news event has a magnitude (0–1) representing shock severity.
def generate_mock_1m_data(n_days: int = 15,
base_price: float = 30000.0,
seed: int = 42) -> tuple:
"""
Generate synthetic 1-minute OHLCV candle data.
Returns
-------
tuple: (np_1m, idx_datetime, idx_open, idx_high, idx_low, idx_close)
"""
n_minutes = n_days * 24 * 60
datetimes = pd.date_range(start='2024-01-01', periods=n_minutes, freq='1min').values
np.random.seed(seed)
returns = np.random.normal(0, 0.001, n_minutes)
close_prices = base_price * np.cumprod(1 + returns)
open_prices = np.roll(close_prices, 1)
open_prices[0] = base_price
high_prices = np.maximum(open_prices, close_prices) * (
1 + np.abs(np.random.normal(0, 0.0005, n_minutes)))
low_prices = np.minimum(open_prices, close_prices) * (
1 - np.abs(np.random.normal(0, 0.0005, n_minutes)))
volumes = np.random.randint(1, 100, n_minutes).astype(float)
np_1m = np.column_stack([
datetimes.astype(object),
open_prices, high_prices, low_prices, close_prices, volumes
])
return np_1m, 0, 1, 2, 3, 4 # array, idx_dt, idx_open, idx_high, idx_low, idx_close
def generate_mock_predictions(n_periods: int = 80,
freq: str = '4h',
seed: int = 99) -> pd.DataFrame:
"""
Generate synthetic directional prediction signals (+1, -1, 0).
"""
datetimes = pd.date_range(start='2024-01-01', periods=n_periods, freq=freq)
np.random.seed(seed)
directions = np.random.choice([1, -1, 0], size=n_periods, p=[0.45, 0.45, 0.10])
return pd.DataFrame({'datetime': datetimes, 'predicted_direction': directions})
def generate_news_events(np_1m: np.ndarray,
idx_datetime: int,
news_rate: float = 0.02,
seed: int = 7) -> list:
"""
Randomly inject news events at ~`news_rate` fraction of 1-minute timestamps.
Returns
-------
list of dicts: [{timestamp, magnitude, label}, ...]
"""
np.random.seed(seed)
n = len(np_1m)
mask = np.random.random(n) < news_rate
indices = np.where(mask)[0]
news_labels = [
'Fed Rate Decision', 'CPI Print', 'NFP Report',
'GDP Release', 'FOMC Minutes', 'Earnings Surprise',
'Geopolitical Shock', 'Liquidity Crunch',
]
events = []
for idx in indices:
events.append({
'timestamp' : np_1m[idx, idx_datetime],
'magnitude' : round(np.random.uniform(0.1, 1.0), 2),
'label' : np.random.choice(news_labels),
})
return events
print("Mock data generators defined.")Mock data generators defined.
Step 6 — Event-Driven Engine
Architecture
The EventDrivenBacktest class manages:
- Event Queue — A min-heap sorted by
(timestamp, priority) - Position State —
in_position,buy_price,trailing_stop,bars_held - ATR Array — Pre-computed for every 1-minute candle
- Dispatch Table — Maps event types to handler methods
Key Handler Methods
| Method | Trigger | Action |
|---|---|---|
_handle_signal() | New model prediction | Open/close/flip position |
_handle_news() | News event fires | Widen spread; force-close if magnitude > threshold |
_handle_atr_exit() | ATR spike detected | Close position if volatility too high |
_handle_trailing_stop() | Price moves favorably | Ratchet stop; trigger if breached |
_handle_tp_sl() | Price crosses TP/SL | Close with P&L |
_handle_time_exit() | Bars held > max | Close regardless of P&L |
class EventDrivenBacktest:
"""
Event-driven backtesting engine with news, ATR, trailing stop,
time-exit, and volatility regime filtering.
Parameters
----------
np_1m : np.ndarray — 1-minute OHLCV array
idx_datetime : int — column index: datetime
idx_open : int — column index: open
idx_high : int — column index: high
idx_low : int — column index: low
idx_close : int — column index: close
starting_balance : float — initial capital (USD)
atr_period : int — ATR lookback window (minutes)
atr_sl_mult : float — ATR × mult = stop-loss distance
atr_tp_mult : float — ATR × mult = take-profit distance
atr_vol_threshold : float — ATR × mult above which entries are blocked
trailing_stop_mult : float — ATR × mult = trailing stop distance
max_bars_in_trade : int — max 1-minute bars before time exit
news_force_close_mag: float — news magnitude above which position is force-closed
transaction_fee : float — per-leg fee (%)
slippage : float — per-trade slippage (%)
leverage : float — leverage multiplier
buy_after_minutes : int — candle offset before entry
"""
def __init__(
self,
np_1m,
idx_datetime, idx_open, idx_high, idx_low, idx_close,
starting_balance = 1000.0,
atr_period = 14,
atr_sl_mult = 1.5,
atr_tp_mult = 2.5,
atr_vol_threshold = 3.0,
trailing_stop_mult = 1.0,
max_bars_in_trade = 240,
news_force_close_mag= 0.6,
transaction_fee = 0.05,
slippage = 0.0,
leverage = 1.0,
buy_after_minutes = 0,
):
# ── Data ──────────────────────────────────────────────────────────
self.np_1m = np_1m
self.idx_dt = idx_datetime
self.idx_open = idx_open
self.idx_high = idx_high
self.idx_low = idx_low
self.idx_close = idx_close
# ── Parameters ────────────────────────────────────────────────────
self.starting_balance = starting_balance
self.atr_period = atr_period
self.atr_sl_mult = atr_sl_mult
self.atr_tp_mult = atr_tp_mult
self.atr_vol_threshold = atr_vol_threshold
self.trailing_stop_mult = trailing_stop_mult
self.max_bars_in_trade = max_bars_in_trade
self.news_force_close_mag = news_force_close_mag
self.fee_pct = transaction_fee * leverage
self.slippage = slippage
self.leverage = leverage
self.buy_after_minutes = buy_after_minutes
# ── State ─────────────────────────────────────────────────────────
self.balance = starting_balance
self.in_position = False
self.direction = 0 # +1 long / -1 short
self.buy_price = 0.0
self.tp_price = 0.0
self.sl_price = 0.0
self.trailing_stop_px = 0.0
self.bars_held = 0
self.entry_bar_idx = 0
self.ledger = [] # list of trade records
# ── Pre-compute ATR ───────────────────────────────────────────────
self.atr_array = compute_atr(
np_1m, idx_high, idx_low, idx_close, period=atr_period
)
# ── Build datetime lookup ─────────────────────────────────────────
self.dt_series = pd.to_datetime(np_1m[:, idx_datetime])
self.dt_to_idx = {dt: i for i, dt in enumerate(self.dt_series)}
# ── Ledger ────────────────────────────────────────────────────────────
def _record(self, timestamp, event_type: str, action: str,
buy_price: float, sell_price: float, pnl: float, note: str = ''):
self.ledger.append({
'datetime' : timestamp,
'event_type': event_type,
'direction' : 'long' if self.direction > 0 else ('short' if self.direction < 0 else 'none'),
'action' : action,
'buy_price' : round(buy_price, 4),
'sell_price': round(sell_price, 4),
'balance' : round(self.balance, 4),
'pnl' : round(pnl, 4),
'note' : note,
})
# ── PnL Calculator ────────────────────────────────────────────────────
def _calc_pnl(self, sell_price: float) -> float:
if self.direction > 0:
raw = ((sell_price - self.buy_price) / self.buy_price) * 100
else:
raw = ((self.buy_price - sell_price) / self.buy_price) * 100
return raw * self.leverage - self.fee_pct - self.slippage
# ── Open Position ─────────────────────────────────────────────────────
def _open_position(self, timestamp, price: float, direction: int,
bar_idx: int, atr: float, event_type: str):
self.direction = direction
self.buy_price = price
self.in_position = True
self.bars_held = 0
self.entry_bar_idx = bar_idx
# ATR-based TP/SL
if direction > 0:
self.tp_price = price + atr * self.atr_tp_mult
self.sl_price = price - atr * self.atr_sl_mult
self.trailing_stop_px = price - atr * self.trailing_stop_mult
else:
self.tp_price = price - atr * self.atr_tp_mult
self.sl_price = price + atr * self.atr_sl_mult
self.trailing_stop_px = price + atr * self.trailing_stop_mult
# Entry fee
entry_pnl = -self.fee_pct - self.slippage
self.balance += self.balance * (entry_pnl / 100)
self._record(timestamp, event_type, 'BUY', price, 0.0, entry_pnl,
f'TP={self.tp_price:.2f} SL={self.sl_price:.2f} ATR={atr:.2f}')
# ── Close Position ────────────────────────────────────────────────────
def _close_position(self, timestamp, price: float,
event_type: str, note: str = ''):
pnl = self._calc_pnl(price)
self.balance += self.balance * (pnl / 100)
direction_save = self.direction
self.in_position = False
self.direction = 0
self.bars_held = 0
label = 'SELL-WIN' if pnl > 0 else 'SELL-LOSS'
self._record(timestamp, event_type, label,
self.buy_price, price, pnl, note)
# ── Event Handlers ────────────────────────────────────────────────────
def _handle_signal(self, ts, payload: dict, bar_idx: int):
new_dir = payload.get('direction', 0)
if new_dir == 0:
return # Neutral — hold or do nothing
atr = self.atr_array[bar_idx]
if np.isnan(atr):
return # Not enough history for ATR
# Volatility regime filter: block entry if ATR too high
price = float(self.np_1m[bar_idx, self.idx_open])
vol_threshold_px = price * (self.atr_vol_threshold / 100)
if atr > vol_threshold_px:
self._record(ts, 'SIGNAL', 'BLOCKED', price, 0.0, 0.0,
f'ATR {atr:.2f} > vol threshold {vol_threshold_px:.2f}')
return
if self.in_position:
if new_dir == self.direction:
return # Same direction — hold
# Direction flip — close then re-enter
self._close_position(ts, price, 'SIGNAL', note='direction_flip')
self._open_position(ts, price, new_dir, bar_idx, atr, 'SIGNAL')
def _handle_news(self, ts, payload: dict, bar_idx: int):
magnitude = payload.get('magnitude', 0.5)
label = payload.get('label', 'News')
price = float(self.np_1m[bar_idx, self.idx_close])
note = f'{label} mag={magnitude:.2f}'
if self.in_position and magnitude >= self.news_force_close_mag:
# High-magnitude news → force close with widened spread (extra slippage)
spread_penalty = magnitude * 0.5 # additional % cost
pnl = self._calc_pnl(price) - spread_penalty
self.balance += self.balance * (pnl / 100)
self.in_position = False
self.direction = 0
self.bars_held = 0
self._record(ts, 'NEWS', 'FORCE_CLOSE',
self.buy_price, price, pnl, note)
else:
# Low-magnitude news — log but do not close
self._record(ts, 'NEWS', 'NOTED', 0.0, 0.0, 0.0, note)
def _handle_atr_exit(self, ts, payload: dict, bar_idx: int):
"""Close position if current ATR has spiked beyond threshold."""
if not self.in_position:
return
atr = self.atr_array[bar_idx]
price = float(self.np_1m[bar_idx, self.idx_close])
if np.isnan(atr):
return
vol_threshold_px = price * (self.atr_vol_threshold / 100)
if atr > vol_threshold_px:
self._close_position(ts, price, 'ATR_EXIT',
note=f'ATR={atr:.2f} > threshold={vol_threshold_px:.2f}')
def _handle_trailing_stop(self, ts, payload: dict, bar_idx: int):
"""Update trailing stop and trigger if price breaches it."""
if not self.in_position:
return
high = float(self.np_1m[bar_idx, self.idx_high])
low = float(self.np_1m[bar_idx, self.idx_low])
atr = self.atr_array[bar_idx]
if np.isnan(atr):
return
trail_dist = atr * self.trailing_stop_mult
if self.direction > 0: # LONG — trail stop moves up with high
new_trail = high - trail_dist
if new_trail > self.trailing_stop_px:
self.trailing_stop_px = new_trail
if low <= self.trailing_stop_px:
self._close_position(ts, self.trailing_stop_px,
'TRAILING_STOP',
note=f'trail_px={self.trailing_stop_px:.2f}')
else: # SHORT — trail stop moves down with low
new_trail = low + trail_dist
if new_trail < self.trailing_stop_px:
self.trailing_stop_px = new_trail
if high >= self.trailing_stop_px:
self._close_position(ts, self.trailing_stop_px,
'TRAILING_STOP',
note=f'trail_px={self.trailing_stop_px:.2f}')
def _handle_tp_sl(self, ts, payload: dict, bar_idx: int):
"""Check static ATR-based TP and SL levels."""
if not self.in_position:
return
high = float(self.np_1m[bar_idx, self.idx_high])
low = float(self.np_1m[bar_idx, self.idx_low])
if self.direction > 0: # LONG
if high >= self.tp_price:
self._close_position(ts, self.tp_price, 'TP_HIT',
note=f'TP={self.tp_price:.2f}')
return
if low <= self.sl_price:
self._close_position(ts, self.sl_price, 'SL_HIT',
note=f'SL={self.sl_price:.2f}')
else: # SHORT
if low <= self.tp_price:
self._close_position(ts, self.tp_price, 'TP_HIT',
note=f'TP={self.tp_price:.2f}')
return
if high >= self.sl_price:
self._close_position(ts, self.sl_price, 'SL_HIT',
note=f'SL={self.sl_price:.2f}')
def _handle_time_exit(self, ts, payload: dict, bar_idx: int):
"""Force-close a position that has exceeded max_bars_in_trade."""
if not self.in_position:
return
price = float(self.np_1m[bar_idx, self.idx_close])
self._close_position(ts, price, 'TIME_EXIT',
note=f'bars_held={self.bars_held}')
# ── Dispatch Table ────────────────────────────────────────────────────
_HANDLERS = {
'NEWS' : _handle_news,
'ATR_EXIT' : _handle_atr_exit,
'TRAILING_STOP': _handle_trailing_stop,
'TP_HIT' : _handle_tp_sl,
'SL_HIT' : _handle_tp_sl,
'TIME_EXIT' : _handle_time_exit,
'SIGNAL' : _handle_signal,
}
# ── Build Event Queue ─────────────────────────────────────────────────
def _build_queue(self,
df_predictions: pd.DataFrame,
news_events : list) -> list:
"""
Populate the min-heap with SIGNAL and NEWS events.
Per-bar events (ATR_EXIT, TRAILING_STOP, TP_HIT, TIME_EXIT) are
injected on-the-fly during the main loop.
"""
heap = []
# ── SIGNAL events from model predictions ─────────────────────────
for _, row in df_predictions.iterrows():
ts = row['datetime']
evt = Event(
timestamp = ts,
priority = int(EventPriority.SIGNAL),
event_type = 'SIGNAL',
payload = {'direction': int(row['predicted_direction'])},
)
heapq.heappush(heap, evt)
# ── NEWS events ───────────────────────────────────────────────────
for n in news_events:
ts = pd.Timestamp(n['timestamp'])
evt = Event(
timestamp = ts,
priority = int(EventPriority.NEWS),
event_type = 'NEWS',
payload = {'magnitude': n['magnitude'], 'label': n['label']},
)
heapq.heappush(heap, evt)
return heap
# ── Main Run Loop ─────────────────────────────────────────────────────
def run(self,
df_predictions: pd.DataFrame,
news_events : list = None) -> pd.DataFrame:
"""
Execute the event-driven simulation.
Parameters
----------
df_predictions : pd.DataFrame
Columns: ['datetime', 'predicted_direction']
news_events : list of dicts
Each dict: {timestamp, magnitude, label}
Returns
-------
pd.DataFrame — trade ledger
"""
if news_events is None:
news_events = []
df_predictions = df_predictions.copy()
df_predictions['datetime'] = pd.to_datetime(df_predictions['datetime'])
heap = self._build_queue(df_predictions, news_events)
breaking_balance = self.starting_balance * 0.5
dt_series = pd.to_datetime(self.np_1m[:, self.idx_dt])
print(f"Queue loaded: {len(heap)} initial events (signals + news)")
print("Processing event queue...")
processed = 0
while heap:
evt = heapq.heappop(heap)
ts = pd.Timestamp(evt.timestamp)
# Find nearest 1-minute bar index
idx_candidates = np.searchsorted(dt_series.values, ts.to_datetime64())
bar_idx = int(min(idx_candidates, len(self.np_1m) - 1))
# Track bars held and inject per-bar events
if self.in_position:
self.bars_held = bar_idx - self.entry_bar_idx
# Inject ATR_EXIT check
heapq.heappush(heap, Event(ts, int(EventPriority.ATR_EXIT),
'ATR_EXIT', {}))
# Inject TRAILING_STOP update
heapq.heappush(heap, Event(ts, int(EventPriority.TRAILING_STOP),
'TRAILING_STOP', {}))
# Inject TP/SL check
heapq.heappush(heap, Event(ts, int(EventPriority.TP_HIT),
'TP_HIT', {}))
# Inject TIME_EXIT if over max bars
if self.bars_held >= self.max_bars_in_trade:
heapq.heappush(heap, Event(ts, int(EventPriority.TIME_EXIT),
'TIME_EXIT', {}))
# Dispatch event to handler
handler = self._HANDLERS.get(evt.event_type)
if handler:
handler(self, ts, evt.payload, bar_idx)
processed += 1
# Max drawdown guard
if self.balance < breaking_balance:
print(f" !! Max drawdown triggered at {ts} — balance ${self.balance:.2f}")
if self.in_position:
price = float(self.np_1m[bar_idx, self.idx_close])
self._close_position(ts, price, 'MAX_DRAWDOWN', note='50% equity loss')
break
print(f"Events processed: {processed}")
df_ledger = pd.DataFrame(self.ledger)
if not df_ledger.empty:
df_ledger['pnl_cumsum'] = df_ledger['pnl'].cumsum().round(4)
return df_ledger
print("EventDrivenBacktest class defined.")
EventDrivenBacktest class defined.
Functional Refactoring: Helper Functions
Instead of a class, the backtesting engine's state (balance, in_position, buy_price, ledger, etc.) will be managed by a dictionary, backtest_state, passed between functions.
These helper functions modify or calculate values based on the backtest_state.
def record_trade(backtest_state: dict, timestamp, event_type: str, action: str,
buy_price: float, sell_price: float, pnl: float, note: str = '') -> None:
"""
Records a trade or significant event in the ledger.
Modifies `backtest_state['ledger']` in place.
"""
direction_label = 'long' if backtest_state['current_state']['direction'] > 0 else \
('short' if backtest_state['current_state']['direction'] < 0 else 'none')
backtest_state['ledger'].append({
'datetime' : timestamp,
'event_type': event_type,
'direction' : direction_label,
'action' : action,
'buy_price' : round(buy_price, 4),
'sell_price': round(sell_price, 4),
'balance' : round(backtest_state['current_state']['balance'], 4),
'pnl' : round(pnl, 4),
'note' : note,
})def calculate_pnl(backtest_state: dict, sell_price: float) -> float:
"""
Calculates the PnL for a closed position.
This is a pure function that only reads from `backtest_state`.
"""
direction = backtest_state['current_state']['direction']
buy_price = backtest_state['current_state']['buy_price']
leverage = backtest_state['params']['leverage']
fee_pct = backtest_state['params']['fee_pct']
slippage_pct = backtest_state['params']['slippage']
if direction > 0:
raw = ((sell_price - buy_price) / buy_price) * 100
else:
raw = ((buy_price - sell_price) / buy_price) * 100
return raw * leverage - fee_pct - slippage_pctdef open_position(backtest_state: dict, timestamp, price: float, direction: int,
bar_idx: int, atr: float, event_type: str) -> dict:
"""
Opens a new position and updates the `backtest_state`.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
params = state_copy['params']
current_state['direction'] = direction
current_state['buy_price'] = price
current_state['in_position'] = True
current_state['bars_held'] = 0
current_state['entry_bar_idx'] = bar_idx
# ATR-based TP/SL
if direction > 0:
current_state['tp_price'] = price + atr * params['atr_tp_mult']
current_state['sl_price'] = price - atr * params['atr_sl_mult']
current_state['trailing_stop_px'] = price - atr * params['trailing_stop_mult']
else:
current_state['tp_price'] = price - atr * params['atr_tp_mult']
current_state['sl_price'] = price + atr * params['atr_sl_mult']
current_state['trailing_stop_px'] = price + atr * params['trailing_stop_mult']
# Entry fee
entry_pnl = -params['fee_pct'] - params['slippage']
current_state['balance'] += current_state['balance'] * (entry_pnl / 100)
record_trade(state_copy, timestamp, event_type, 'BUY', price, 0.0, entry_pnl,
f"TP={current_state['tp_price']:.2f} SL={current_state['sl_price']:.2f} ATR={atr:.2f}")
return state_copydef close_position(backtest_state: dict, timestamp, price: float,
event_type: str, note: str = '') -> dict:
"""
Closes an open position and updates the `backtest_state`.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
pnl = calculate_pnl(state_copy, price)
current_state['balance'] += current_state['balance'] * (pnl / 100)
label = 'SELL-WIN' if pnl > 0 else 'SELL-LOSS'
record_trade(state_copy, timestamp, event_type, label,
current_state['buy_price'], price, pnl, note)
# Reset position state
current_state['in_position'] = False
current_state['direction'] = 0
current_state['bars_held'] = 0
current_state['buy_price'] = 0.0
current_state['tp_price'] = 0.0
current_state['sl_price'] = 0.0
current_state['trailing_stop_px'] = 0.0
current_state['entry_bar_idx'] = 0
return state_copyFunctional Refactoring: Event Handlers
Each event handler is now a standalone function that takes the current backtest_state and relevant data, returning an updated backtest_state.
def handle_signal(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
"""
Handles a SIGNAL event: opens, closes, or flips a position.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
params = state_copy['params']
runtime_data = state_copy['runtime_data']
data_indices = state_copy['data_indices']
new_dir = payload.get('direction', 0)
if new_dir == 0:
return state_copy # Neutral — hold or do nothing
atr = runtime_data['atr_array'][bar_idx]
if np.isnan(atr):
return state_copy # Not enough history for ATR
price = float(runtime_data['np_1m'][bar_idx, data_indices['idx_open']])
vol_threshold_px = price * (params['atr_vol_threshold'] / 100)
# Volatility regime filter: block entry if ATR too high
if atr > vol_threshold_px:
record_trade(state_copy, ts, 'SIGNAL', 'BLOCKED', price, 0.0, 0.0,
f'ATR {atr:.2f} > vol threshold {vol_threshold_px:.2f}')
return state_copy
if current_state['in_position']:
if new_dir == current_state['direction']:
return state_copy # Same direction — hold
# Direction flip — close then re-enter
state_copy = close_position(state_copy, ts, price, 'SIGNAL', note='direction_flip')
state_copy = open_position(state_copy, ts, price, new_dir, bar_idx, atr, 'SIGNAL')
return state_copydef handle_news(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
"""
Handles a NEWS event: force-closes a position if magnitude is high.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
params = state_copy['params']
runtime_data = state_copy['runtime_data']
data_indices = state_copy['data_indices']
magnitude = payload.get('magnitude', 0.5)
label = payload.get('label', 'News')
price = float(runtime_data['np_1m'][bar_idx, data_indices['idx_close']])
note = f'{label} mag={magnitude:.2f}'
if current_state['in_position'] and magnitude >= params['news_force_close_mag']:
# High-magnitude news → force close with widened spread (extra slippage)
spread_penalty = magnitude * 0.5 # additional % cost
pnl = calculate_pnl(state_copy, price) - spread_penalty
current_state['balance'] += current_state['balance'] * (pnl / 100)
record_trade(state_copy, ts, 'NEWS', 'FORCE_CLOSE',
current_state['buy_price'], price, pnl, note)
# Reset position state
current_state['in_position'] = False
current_state['direction'] = 0
current_state['bars_held'] = 0
else:
# Low-magnitude news — log but do not close
record_trade(state_copy, ts, 'NEWS', 'NOTED', 0.0, 0.0, 0.0, note)
return state_copydef handle_atr_exit(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
"""
Closes position if current ATR has spiked beyond threshold.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
params = state_copy['params']
runtime_data = state_copy['runtime_data']
data_indices = state_copy['data_indices']
if not current_state['in_position']:
return state_copy
atr = runtime_data['atr_array'][bar_idx]
if np.isnan(atr):
return state_copy
price = float(runtime_data['np_1m'][bar_idx, data_indices['idx_close']])
vol_threshold_px = price * (params['atr_vol_threshold'] / 100)
if atr > vol_threshold_px:
state_copy = close_position(state_copy, ts, price, 'ATR_EXIT',
note=f'ATR={atr:.2f} > threshold={vol_threshold_px:.2f}')
return state_copydef handle_trailing_stop(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
"""
Updates trailing stop and triggers if price breaches it.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
params = state_copy['params']
runtime_data = state_copy['runtime_data']
data_indices = state_copy['data_indices']
if not current_state['in_position']:
return state_copy
high = float(runtime_data['np_1m'][bar_idx, data_indices['idx_high']])
low = float(runtime_data['np_1m'][bar_idx, data_indices['idx_low']])
atr = runtime_data['atr_array'][bar_idx]
if np.isnan(atr):
return state_copy
trail_dist = atr * params['trailing_stop_mult']
if current_state['direction'] > 0: # LONG — trail stop moves up with high
new_trail = high - trail_dist
if new_trail > current_state['trailing_stop_px']:
current_state['trailing_stop_px'] = new_trail
if low <= current_state['trailing_stop_px']:
state_copy = close_position(state_copy, ts, current_state['trailing_stop_px'],
'TRAILING_STOP',
note=f'trail_px={current_state['trailing_stop_px']:.2f}')
else: # SHORT — trail stop moves down with low
new_trail = low + trail_dist
if new_trail < current_state['trailing_stop_px']:
current_state['trailing_stop_px'] = new_trail
if high >= current_state['trailing_stop_px']:
state_copy = close_position(state_copy, ts, current_state['trailing_stop_px'],
'TRAILING_STOP',
note=f'trail_px={current_state['trailing_stop_px']:.2f}')
return state_copydef handle_tp_sl(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
"""
Checks static ATR-based Take Profit (TP) and Stop Loss (SL) levels.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
runtime_data = state_copy['runtime_data']
data_indices = state_copy['data_indices']
if not current_state['in_position']:
return state_copy
high = float(runtime_data['np_1m'][bar_idx, data_indices['idx_high']])
low = float(runtime_data['np_1m'][bar_idx, data_indices['idx_low']])
if current_state['direction'] > 0: # LONG
if high >= current_state['tp_price']:
state_copy = close_position(state_copy, ts, current_state['tp_price'], 'TP_HIT',
note=f'TP={current_state['tp_price']:.2f}')
return state_copy
if low <= current_state['sl_price']:
state_copy = close_position(state_copy, ts, current_state['sl_price'], 'SL_HIT',
note=f'SL={current_state['sl_price']:.2f}')
return state_copy
else: # SHORT
if low <= current_state['tp_price']:
state_copy = close_position(state_copy, ts, current_state['tp_price'], 'TP_HIT',
note=f'TP={current_state['tp_price']:.2f}')
return state_copy
if high >= current_state['sl_price']:
state_copy = close_position(state_copy, ts, current_state['sl_price'], 'SL_HIT',
note=f'SL={current_state['sl_price']:.2f}')
return state_copy
return state_copydef handle_time_exit(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
"""
Force-closes a position that has exceeded `max_bars_in_trade`.
"""
state_copy = backtest_state.copy()
current_state = state_copy['current_state']
runtime_data = state_copy['runtime_data']
data_indices = state_copy['data_indices']
if not current_state['in_position']:
return state_copy
price = float(runtime_data['np_1m'][bar_idx, data_indices['idx_close']])
state_copy = close_position(state_copy, ts, price, 'TIME_EXIT',
note=f'bars_held={current_state['bars_held']}')
return state_copyFunctional Refactoring: Dispatch Table and Queue Builder
The EVENT_HANDLERS dictionary maps event types to their corresponding functional handlers. The build_event_queue function remains largely the same, populating the min-heap with initial events.
# Dispatch table mapping event types to handler functions
EVENT_HANDLERS = {
'NEWS' : handle_news,
'ATR_EXIT' : handle_atr_exit,
'TRAILING_STOP': handle_trailing_stop,
'TP_HIT' : handle_tp_sl,
'SL_HIT' : handle_tp_sl,
'TIME_EXIT' : handle_time_exit,
'SIGNAL' : handle_signal,
}
print("Event handler dispatch table defined.")Event handler dispatch table defined.
import pandas as pd
import heapq
from dataclasses import dataclass, field
from typing import Optional
from enum import IntEnum
# EventPriority and Event are assumed to be defined in the global scope (e.g., cell 4ygqUmwqWvk6)
def build_event_queue(df_predictions: pd.DataFrame, news_events: list) -> list:
"""
Populates the min-heap with SIGNAL and NEWS events.
Per-bar events are injected on-the-fly during the main loop.
"""
heap = []
# SIGNAL events from model predictions
for _, row in df_predictions.iterrows():
ts = row['datetime']
evt = Event(
timestamp = ts,
priority = int(EventPriority.SIGNAL),
event_type = 'SIGNAL',
payload = {'direction': int(row['predicted_direction'])},
)
heapq.heappush(heap, evt)
# NEWS events
for n in news_events:
ts = pd.Timestamp(n['timestamp'])
evt = Event(
timestamp = ts,
priority = int(EventPriority.NEWS),
event_type = 'NEWS',
payload = {'magnitude': n['magnitude'], 'label': n['label']},
)
heapq.heappush(heap, evt)
return heap
print("Event queue builder defined.")Event queue builder defined.
Functional Refactoring: Main Execution Loop
The execute_backtest_run function now encapsulates the core logic of the backtesting engine, managing the event queue and dispatching events to the appropriate handler functions. It takes an initial_state dictionary and returns the final backtest_state and the df_ledger.
import pandas as pd
import numpy as np
import heapq
from dataclasses import dataclass, field
from typing import Optional
from enum import IntEnum
# EventPriority and Event are assumed to be defined in the global scope (e.g., cell 4ygqUmwqWvk6)
def execute_backtest_run(
initial_state: dict,
df_predictions: pd.DataFrame,
news_events: list = None
) -> tuple[dict, pd.DataFrame]:
"""
Executes the event-driven simulation with a functional approach.
Parameters
----------
initial_state : dict
The initial state dictionary for the backtest.
df_predictions : pd.DataFrame
Columns: ['datetime', 'predicted_direction']
news_events : list of dicts, optional
Each dict: {timestamp, magnitude, label}
Returns
-------
tuple[dict, pd.DataFrame]
The final backtest state and the trade ledger DataFrame.
"""
if news_events is None:
news_events = []
# Create a mutable copy of the initial state
backtest_state = initial_state.copy()
backtest_state['ledger'] = initial_state['ledger'].copy() # ensure ledger is a mutable list
df_predictions = df_predictions.copy()
df_predictions['datetime'] = pd.to_datetime(df_predictions['datetime'])
# build_event_queue relies on Event and EventPriority
heap = build_event_queue(df_predictions, news_events)
breaking_balance = backtest_state['params']['starting_balance'] * 0.5
dt_series = backtest_state['runtime_data']['dt_series']
np_1m = backtest_state['runtime_data']['np_1m']
idx_dt = backtest_state['data_indices']['idx_dt']
print(f"Queue loaded: {len(heap)} initial events (signals + news)")
print("Processing event queue...")
processed = 0
while heap:
evt = heapq.heappop(heap)
ts = pd.Timestamp(evt.timestamp)
# Find nearest 1-minute bar index
# Corrected typo from to_datetime600() to to_datetime64()
idx_candidates = np.searchsorted(dt_series.values, ts.to_datetime64())
bar_idx = int(min(idx_candidates, len(np_1m) - 1))
# Track bars held and inject per-bar events
if backtest_state['current_state']['in_position']:
backtest_state['current_state']['bars_held'] = bar_idx - backtest_state['current_state']['entry_bar_idx']
# Inject ATR_EXIT check
heapq.heappush(heap, Event(ts, int(EventPriority.ATR_EXIT),
'ATR_EXIT', {}))
# Inject TRAILING_STOP update
heapq.heappush(heap, Event(ts, int(EventPriority.TRAILING_STOP),
'TRAILING_STOP', {}))
# Inject TP/SL check
heapq.heappush(heap, Event(ts, int(EventPriority.TP_HIT),
'TP_HIT', {}))
# Inject TIME_EXIT if over max bars
if backtest_state['current_state']['bars_held'] >= backtest_state['params']['max_bars_in_trade']:
heapq.heappush(heap, Event(ts, int(EventPriority.TIME_EXIT),
'TIME_EXIT', {}))
# Dispatch event to handler
handler = EVENT_HANDLERS.get(evt.event_type)
if handler:
# Pass the entire state and update it with the handler's return value
backtest_state = handler(backtest_state, ts, evt.payload, bar_idx)
processed += 1
# Max drawdown guard
if backtest_state['current_state']['balance'] < breaking_balance:
print(f" !! Max drawdown triggered at {ts} — balance ${backtest_state['current_state']['balance']:.2f}")
if backtest_state['current_state']['in_position']:
price = float(np_1m[bar_idx, backtest_state['data_indices']['idx_close']])
backtest_state = close_position(backtest_state, ts, price, 'MAX_DRAWDOWN', note='50% equity loss')
break
print(f"Events processed: {processed}")
df_ledger = pd.DataFrame(backtest_state['ledger'])
if not df_ledger.empty:
df_ledger['pnl_cumsum'] = df_ledger['pnl'].cumsum().round(4)
return backtest_state, df_ledger
print("Functional backtest execution loop defined.")Functional backtest execution loop defined.
Step 7 — Execute Event-Driven Backtest
Configure all parameters in the cell below, then run. The engine will:
- Generate 1-minute mock OHLCV data
- Generate model prediction signals
- Inject random news events
- Build and drain the event queue
- Return a full trade ledger
import pandas as pd
import numpy as np
def run_event_driven_simulation(
# Data
n_days = 15,
base_price = 30000.0,
n_periods = 80,
freq = '4h',
news_rate = 0.02,
# Capital
starting_balance = 1000.0,
leverage = 1.0,
transaction_fee = 0.05,
slippage = 0.0,
# ATR
atr_period = 14,
atr_sl_mult = 1.5,
atr_tp_mult = 2.5,
atr_vol_threshold = 3.0, # % of price; blocks entry if ATR > this
trailing_stop_mult = 1.0,
# Trade management
max_bars_in_trade = 240, # 4 hours at 1-minute resolution
news_force_close_mag = 0.6, # magnitude >= this → force close
buy_after_minutes = 0,
):
print("=" * 55)
print(" EVENT-DRIVEN BACKTEST — NOTEBOOK 67")
print("=" * 55)
# ── Generate data ──────────────────────────────────────────────
print("\n[1/4] Generating 1-minute OHLCV data...")
np_1m, idx_dt, idx_open, idx_high, idx_low, idx_close = generate_mock_1m_data(
n_days=n_days, base_price=base_price
)
print(f" Shape : {np_1m.shape}")
print(f" Range : {np_1m[0, idx_dt]} → {np_1m[-1, idx_dt]}")
# ── Generate predictions ───────────────────────────────────────
print("\n[2/4] Generating model predictions...")
df_pred = generate_mock_predictions(n_periods=n_periods, freq=freq)
print(f" Periods : {len(df_pred)}")
print(f" Signals : {df_pred['predicted_direction'].value_counts().to_dict()}")
# ── Generate news events ───────────────────────────────────────
print("\n[3/4] Generating news events...")
news = generate_news_events(np_1m, idx_dt, news_rate=news_rate)
print(f" News events injected : {len(news)}")
if news:
mags = [n['magnitude'] for n in news]
print(f" Magnitude range : {min(mags):.2f} – {max(mags):.2f}")
high_impact = [n for n in news if n['magnitude'] >= news_force_close_mag]
print(f" High-impact (>={news_force_close_mag:.1f}) : {len(high_impact)}")
# ── Initialize functional backtest state ───────────────────────
print("\n[4/4] Running event-driven engine (functional)...")
initial_state = {
'params': {
'starting_balance': starting_balance,
'atr_period': atr_period,
'atr_sl_mult': atr_sl_mult,
'atr_tp_mult': atr_tp_mult,
'atr_vol_threshold': atr_vol_threshold,
'trailing_stop_mult': trailing_stop_mult,
'max_bars_in_trade': max_bars_in_trade,
'news_force_close_mag': news_force_close_mag,
'fee_pct': transaction_fee * leverage,
'slippage': slippage,
'leverage': leverage,
'buy_after_minutes': buy_after_minutes,
},
'current_state': {
'balance': starting_balance,
'in_position': False,
'direction': 0,
'buy_price': 0.0,
'tp_price': 0.0,
'sl_price': 0.0,
'trailing_stop_px': 0.0,
'bars_held': 0,
'entry_bar_idx': 0,
},
'runtime_data': {
'np_1m': np_1m,
'atr_array': compute_atr(np_1m, idx_high, idx_low, idx_close, period=atr_period),
'dt_series': pd.to_datetime(np_1m[:, idx_dt]),
},
'data_indices': {
'idx_dt': idx_dt,
'idx_open': idx_open,
'idx_high': idx_high,
'idx_low': idx_low,
'idx_close': idx_close,
},
'ledger': [],
}
final_state, df_ledger = execute_backtest_run(initial_state, df_pred, news)
final_balance = round(final_state['current_state']['balance'], 2)
pnl_pct = round(df_ledger['pnl'].sum(), 4) if not df_ledger.empty else 0
print("\n" + "=" * 55)
print(f" Starting Balance : ${starting_balance:,.2f}")
print(f" Final Balance : ${final_balance:,.2f}")
print(f" Net PnL (%) : {pnl_pct:.4f}%")
print(f" Total Events : {len(df_ledger)}")
print("=" * 55)
return df_ledger, final_state
# ── Run with default parameters ────────────────────────────────────────────
df_ledger, engine = run_event_driven_simulation(
starting_balance = 1000.0,
leverage = 1.0,
transaction_fee = 0.05,
slippage = 0.0,
atr_period = 14,
atr_sl_mult = 1.5,
atr_tp_mult = 2.5,
atr_vol_threshold = 3.0,
trailing_stop_mult = 1.0,
max_bars_in_trade = 240,
news_force_close_mag= 0.6,
news_rate = 0.02,
)=======================================================
EVENT-DRIVEN BACKTEST — NOTEBOOK 67
=======================================================
[1/4] Generating 1-minute OHLCV data...
Shape : (21600, 6)
Range : 1704067200000000000 → 1705363140000000000
[2/4] Generating model predictions...
Periods : 80
Signals : {-1: 35, 1: 35, 0: 10}
[3/4] Generating news events...
News events injected : 415
Magnitude range : 0.10 – 0.99
High-impact (>=0.6) : 185
[4/4] Running event-driven engine (functional)...
Queue loaded: 495 initial events (signals + news)
Processing event queue...
Step 8 — Trade Ledger Inspection
The ledger records every event that produced an action.
event_type tells you what triggered each row.
if not df_ledger.empty:
pd.set_option('display.max_rows', 120)
pd.set_option('display.float_format', '{:.4f}'.format)
print(f"Total ledger records : {len(df_ledger)}")
print(f"Columns : {df_ledger.columns.tolist()}")
print()
display(df_ledger)
else:
print("No ledger records generated.")Step 9 — Performance Analysis
Breaks down results by exit type so you can see how much P&L came from:
- ATR-based TP hits vs SL hits
- Trailing stop exits
- News force-closes
- Time exits (held too long)
- Direction flips
if not df_ledger.empty:
# ── Event type distribution ────────────────────────────────────────
print("Event Type Distribution:")
print("-" * 40)
print(df_ledger['event_type'].value_counts().to_string())
print()
# ── Action distribution ────────────────────────────────────────────
print("Action Distribution:")
print("-" * 40)
print(df_ledger['action'].value_counts().to_string())
print()
# ── Exit-only P&L breakdown ────────────────────────────────────────
exit_actions = ['SELL-WIN', 'SELL-LOSS', 'FORCE_CLOSE']
df_exits = df_ledger[df_ledger['action'].isin(exit_actions)].copy()
if not df_exits.empty:
wins = df_exits[df_exits['pnl'] > 0]
losses = df_exits[df_exits['pnl'] <= 0]
win_rate = len(wins) / len(df_exits) * 100
avg_win = wins['pnl'].mean() if len(wins) > 0 else 0
avg_loss = losses['pnl'].mean() if len(losses) > 0 else 0
total_win = wins['pnl'].sum()
total_loss = losses['pnl'].sum()
profit_factor = (
total_win / abs(total_loss)
if total_loss != 0 else float('inf')
)
print("=" * 55)
print(" PERFORMANCE METRICS")
print("=" * 55)
print(f" Exit Events : {len(df_exits)}")
print(f" Winning Trades : {len(wins)}")
print(f" Losing Trades : {len(losses)}")
print(f" Win Rate : {win_rate:.1f}%")
print(f" Avg Win PnL : {avg_win:.4f}%")
print(f" Avg Loss PnL : {avg_loss:.4f}%")
print(f" Profit Factor : {profit_factor:.2f}")
print(f" Total PnL : {df_exits['pnl'].sum():.4f}%")
print(f" Final Balance : ${final_state['current_state']['balance']:,.2f}")
print("=" * 55)
# ── P&L by exit type ───────────────────────────────────────────
print("\nP&L by Exit Event Type:")
print("-" * 55)
pnl_by_type = (
df_exits.groupby('event_type')['pnl']
.agg(count='count', total_pnl='sum', avg_pnl='mean')
.round(4)
)
print(pnl_by_type.to_string())
# ── News event summary ─────────────────────────────────────────
df_news = df_ledger[df_ledger['event_type'] == 'NEWS']
if not df_news.empty:
force_closed = df_news[df_news['action'] == 'FORCE_CLOSE']
noted = df_news[df_news['action'] == 'NOTED']
print(f"\nNews Events Summary:")
print("-" * 40)
print(f" Total news events : {len(df_news)}")
print(f" Force closes : {len(force_closed)}")
print(f" Low impact (noted) : {len(noted)}")
if len(force_closed) > 0:
print(f" PnL from force-close: {force_closed['pnl'].sum():.4f}%")
else:
print("No exit events recorded.")
else:
print("Ledger is empty.")Operational Notes & Next Steps
Connecting Real Data
Replace generate_mock_1m_data() with your own OHLCV loader:
np_1m = your_dataframe[['datetime','open','high','low','close','volume']].to_numpy()
Column indices must match (0=datetime, 1=open, 2=high, 3=low, 4=close).
Connecting Real Predictions
Replace generate_mock_predictions() with your model output DataFrame — same two-column format: datetime, predicted_direction.
Connecting Real News
Replace generate_news_events() with a real news feed (e.g., Benzinga, NewsAPI). Map each article to a magnitude score using a sentiment model or keyword classifier.
ATR Parameter Tuning
| Parameter | Conservative | Aggressive |
|---|---|---|
atr_sl_mult | 2.0 | 1.0 |
atr_tp_mult | 3.0 | 1.5 |
atr_vol_threshold | 5.0 | 2.0 |
trailing_stop_mult | 1.5 | 0.5 |
max_bars_in_trade | 60 | 480 |
Production Upgrades
- Replace REST-based 1-minute data with WebSocket streaming for live deployment
- Add position sizing (Kelly criterion or fixed fractional) based on ATR signal confidence
- Log events to a database for post-trade analysis
- Add multi-symbol support by running one engine instance per instrument in parallel