Backtesting·Custom Engines·Intermediate

Event-Driven Backtest

Implement an event-driven backtesting loop that processes bar-by-bar signals, supporting realistic order simulation and state tracking.

event-drivenorder simulationbacktest

Event-Driven Backtest


Overview

This notebook implements an event-driven backtesting engine (Notebook 67) — the natural evolution of the vectorized backtest (Notebook 66).

Instead of applying logic across arrays in bulk, the engine processes a prioritized event queue tick-by-tick on 1-minute candles. Each event is handled in strict chronological order, enabling:

FeatureDescription
News EventsSimulated macro announcements that spike volatility and widen spreads
ATR-Based ExitsDynamic stop-loss and take-profit levels computed from Average True Range
Volatility Regime FilterBlocks new entries when ATR exceeds a configurable multiplier
Trailing StopsStop-loss that ratchets up (long) or down (short) as price moves favorably
Time-Based ExitsAutomatically close positions that have been open beyond a max bar limit
Direction ReversalClose and flip on signal change — identical to vectorized version
Max Drawdown GuardHard stop at 50% equity loss

Event Types (Priority Queue)

Events are processed in this priority order per timestamp:

PriorityEvent TypeDescription
1NEWSMacro shock — widens spread, may force close
2ATR_EXITVolatility-driven stop triggered
3TRAILING_STOPTrailing stop updated or triggered
4TP_HITTake-profit level breached
5SL_HITStop-loss level breached
6TIME_EXITMaximum hold duration exceeded
7SIGNALNew directional prediction from model

Tick-level precision on 1-minute OHLCV data. For bulk vectorized approximation, refer to Notebook 66.

Step 1 — Install Dependencies

[1]
# Install required packages
!pip install pandas numpy --quiet

Step 2 — Import Libraries

[2]
import pandas as pd
import numpy as np
import heapq          # Min-heap for the priority event queue
import warnings
from dataclasses import dataclass, field
from typing import Optional
from enum import IntEnum

warnings.filterwarnings('ignore')
pd.set_option('display.float_format', '{:.6f}'.format)
pd.set_option('display.max_columns', 20)
pd.set_option('display.width', 140)

print(f"pandas version : {pd.__version__}")
print(f"numpy version  : {np.__version__}")
print("Environment ready.")
pandas version : 2.2.2
numpy version  : 2.0.2
Environment ready.

Step 3 — Event System

Event Priority Enum

Each event type carries an integer priority. Lower number = processed first within the same timestamp. The priority ordering ensures news shocks are handled before TP/SL checks, and TP/SL before new signal entries.

[3]
class EventPriority(IntEnum):
    """
    Processing priority for events sharing the same timestamp.
    Lower value = processed first.
    """
    NEWS          = 1   # Macro shock — highest priority
    ATR_EXIT      = 2   # Volatility-driven exit
    TRAILING_STOP = 3   # Trailing stop update/trigger
    TP_HIT        = 4   # Take-profit trigger
    SL_HIT        = 5   # Stop-loss trigger
    TIME_EXIT     = 6   # Max hold duration exceeded
    SIGNAL        = 7   # Directional model prediction — lowest priority


@dataclass(order=True)
class Event:
    """
    A single event in the simulation queue.

    Sorting key: (timestamp, priority) — heapq uses __lt__ via dataclass ordering.

    Attributes
    ----------
    timestamp : np.datetime64
        When the event fires.
    priority : int
        Processing order within the same timestamp (EventPriority values).
    event_type : str
        Human-readable event label (e.g., 'SIGNAL', 'NEWS', 'TP_HIT').
    payload : dict
        Arbitrary event data (direction, price levels, news magnitude, etc.).
    """
    timestamp  : object          # np.datetime64 — sortable
    priority   : int             # EventPriority value
    event_type : str  = field(compare=False)
    payload    : dict = field(default_factory=dict, compare=False)


print("Event system defined.")
print(f"  Event priorities: { {e.name: int(e) for e in EventPriority} }")
Event system defined.
  Event priorities: {'NEWS': 1, 'ATR_EXIT': 2, 'TRAILING_STOP': 3, 'TP_HIT': 4, 'SL_HIT': 5, 'TIME_EXIT': 6, 'SIGNAL': 7}

Step 4 — ATR Calculator

The Average True Range (ATR) measures market volatility by averaging the true range over a lookback window.

True Range = max(High − Low, |High − Prev_Close|, |Low − Prev_Close|)
ATR(n)     = EMA(True Range, n)

ATR serves two roles in this engine:

  1. Dynamic TP/SL sizing — TP = entry ± ATR × atr_tp_mult, SL = entry ∓ ATR × atr_sl_mult
  2. Volatility regime filter — If ATR > threshold, new entries are blocked
[4]
def compute_atr(np_1m: np.ndarray,
               idx_high: int,
               idx_low: int,
               idx_close: int,
               period: int = 14) -> np.ndarray:
    """
    Compute ATR for each row of a 1-minute OHLCV array.

    Parameters
    ----------
    np_1m    : np.ndarray — full 1-minute candle array
    idx_high : int        — column index for High
    idx_low  : int        — column index for Low
    idx_close: int        — column index for Close
    period   : int        — ATR lookback (default 14)

    Returns
    -------
    np.ndarray of shape (n,) — ATR value at each row (NaN for first `period` rows)
    """
    highs  = np_1m[:, idx_high].astype(float)
    lows   = np_1m[:, idx_low].astype(float)
    closes = np_1m[:, idx_close].astype(float)

    n  = len(highs)
    tr = np.empty(n)
    tr[0] = highs[0] - lows[0]

    for i in range(1, n):
        hl   = highs[i]  - lows[i]
        hpc  = abs(highs[i]  - closes[i - 1])
        lpc  = abs(lows[i]   - closes[i - 1])
        tr[i] = max(hl, hpc, lpc)

    # Wilder smoothing (equivalent to EMA with alpha = 1/period)
    atr = np.full(n, np.nan)
    atr[period - 1] = np.mean(tr[:period])
    alpha = 1.0 / period
    for i in range(period, n):
        atr[i] = alpha * tr[i] + (1 - alpha) * atr[i - 1]

    return atr


print("ATR calculator defined.")
ATR calculator defined.

Step 5 — Mock Data Generation

Generates synthetic 1-minute OHLCV data and model predictions.

News events are randomly injected at ~2% of all timestamps to simulate macro announcements (e.g., Fed rate decisions, CPI prints). Each news event has a magnitude (0–1) representing shock severity.

[5]
def generate_mock_1m_data(n_days: int = 15,
                         base_price: float = 30000.0,
                         seed: int = 42) -> tuple:
    """
    Generate synthetic 1-minute OHLCV candle data.

    Returns
    -------
    tuple: (np_1m, idx_datetime, idx_open, idx_high, idx_low, idx_close)
    """
    n_minutes = n_days * 24 * 60
    datetimes = pd.date_range(start='2024-01-01', periods=n_minutes, freq='1min').values

    np.random.seed(seed)
    returns      = np.random.normal(0, 0.001, n_minutes)
    close_prices = base_price * np.cumprod(1 + returns)
    open_prices  = np.roll(close_prices, 1)
    open_prices[0] = base_price

    high_prices = np.maximum(open_prices, close_prices) * (
        1 + np.abs(np.random.normal(0, 0.0005, n_minutes)))
    low_prices  = np.minimum(open_prices, close_prices) * (
        1 - np.abs(np.random.normal(0, 0.0005, n_minutes)))
    volumes     = np.random.randint(1, 100, n_minutes).astype(float)

    np_1m = np.column_stack([
        datetimes.astype(object),
        open_prices, high_prices, low_prices, close_prices, volumes
    ])

    return np_1m, 0, 1, 2, 3, 4  # array, idx_dt, idx_open, idx_high, idx_low, idx_close


def generate_mock_predictions(n_periods: int = 80,
                               freq: str = '4h',
                               seed: int = 99) -> pd.DataFrame:
    """
    Generate synthetic directional prediction signals (+1, -1, 0).
    """
    datetimes = pd.date_range(start='2024-01-01', periods=n_periods, freq=freq)
    np.random.seed(seed)
    directions = np.random.choice([1, -1, 0], size=n_periods, p=[0.45, 0.45, 0.10])
    return pd.DataFrame({'datetime': datetimes, 'predicted_direction': directions})


def generate_news_events(np_1m: np.ndarray,
                         idx_datetime: int,
                         news_rate: float = 0.02,
                         seed: int = 7) -> list:
    """
    Randomly inject news events at ~`news_rate` fraction of 1-minute timestamps.

    Returns
    -------
    list of dicts: [{timestamp, magnitude, label}, ...]
    """
    np.random.seed(seed)
    n = len(np_1m)
    mask = np.random.random(n) < news_rate
    indices = np.where(mask)[0]

    news_labels = [
        'Fed Rate Decision', 'CPI Print', 'NFP Report',
        'GDP Release', 'FOMC Minutes', 'Earnings Surprise',
        'Geopolitical Shock', 'Liquidity Crunch',
    ]

    events = []
    for idx in indices:
        events.append({
            'timestamp' : np_1m[idx, idx_datetime],
            'magnitude' : round(np.random.uniform(0.1, 1.0), 2),
            'label'     : np.random.choice(news_labels),
        })
    return events


print("Mock data generators defined.")
Mock data generators defined.

Step 6 — Event-Driven Engine

Architecture

The EventDrivenBacktest class manages:

  1. Event Queue — A min-heap sorted by (timestamp, priority)
  2. Position Statein_position, buy_price, trailing_stop, bars_held
  3. ATR Array — Pre-computed for every 1-minute candle
  4. Dispatch Table — Maps event types to handler methods

Key Handler Methods

MethodTriggerAction
_handle_signal()New model predictionOpen/close/flip position
_handle_news()News event firesWiden spread; force-close if magnitude > threshold
_handle_atr_exit()ATR spike detectedClose position if volatility too high
_handle_trailing_stop()Price moves favorablyRatchet stop; trigger if breached
_handle_tp_sl()Price crosses TP/SLClose with P&L
_handle_time_exit()Bars held > maxClose regardless of P&L
[6]
class EventDrivenBacktest:
    """
    Event-driven backtesting engine with news, ATR, trailing stop,
    time-exit, and volatility regime filtering.

    Parameters
    ----------
    np_1m               : np.ndarray  — 1-minute OHLCV array
    idx_datetime        : int         — column index: datetime
    idx_open            : int         — column index: open
    idx_high            : int         — column index: high
    idx_low             : int         — column index: low
    idx_close           : int         — column index: close
    starting_balance    : float       — initial capital (USD)
    atr_period          : int         — ATR lookback window (minutes)
    atr_sl_mult         : float       — ATR × mult = stop-loss distance
    atr_tp_mult         : float       — ATR × mult = take-profit distance
    atr_vol_threshold   : float       — ATR × mult above which entries are blocked
    trailing_stop_mult  : float       — ATR × mult = trailing stop distance
    max_bars_in_trade   : int         — max 1-minute bars before time exit
    news_force_close_mag: float       — news magnitude above which position is force-closed
    transaction_fee     : float       — per-leg fee (%)
    slippage            : float       — per-trade slippage (%)
    leverage            : float       — leverage multiplier
    buy_after_minutes   : int         — candle offset before entry
    """

    def __init__(
        self,
        np_1m,
        idx_datetime, idx_open, idx_high, idx_low, idx_close,
        starting_balance    = 1000.0,
        atr_period          = 14,
        atr_sl_mult         = 1.5,
        atr_tp_mult         = 2.5,
        atr_vol_threshold   = 3.0,
        trailing_stop_mult  = 1.0,
        max_bars_in_trade   = 240,
        news_force_close_mag= 0.6,
        transaction_fee     = 0.05,
        slippage            = 0.0,
        leverage            = 1.0,
        buy_after_minutes   = 0,
    ):
        # ── Data ──────────────────────────────────────────────────────────
        self.np_1m          = np_1m
        self.idx_dt         = idx_datetime
        self.idx_open       = idx_open
        self.idx_high       = idx_high
        self.idx_low        = idx_low
        self.idx_close      = idx_close

        # ── Parameters ────────────────────────────────────────────────────
        self.starting_balance     = starting_balance
        self.atr_period           = atr_period
        self.atr_sl_mult          = atr_sl_mult
        self.atr_tp_mult          = atr_tp_mult
        self.atr_vol_threshold    = atr_vol_threshold
        self.trailing_stop_mult   = trailing_stop_mult
        self.max_bars_in_trade    = max_bars_in_trade
        self.news_force_close_mag = news_force_close_mag
        self.fee_pct              = transaction_fee * leverage
        self.slippage             = slippage
        self.leverage             = leverage
        self.buy_after_minutes    = buy_after_minutes

        # ── State ─────────────────────────────────────────────────────────
        self.balance          = starting_balance
        self.in_position      = False
        self.direction        = 0       # +1 long / -1 short
        self.buy_price        = 0.0
        self.tp_price         = 0.0
        self.sl_price         = 0.0
        self.trailing_stop_px = 0.0
        self.bars_held        = 0
        self.entry_bar_idx    = 0
        self.ledger           = []      # list of trade records

        # ── Pre-compute ATR ───────────────────────────────────────────────
        self.atr_array = compute_atr(
            np_1m, idx_high, idx_low, idx_close, period=atr_period
        )

        # ── Build datetime lookup ─────────────────────────────────────────
        self.dt_series = pd.to_datetime(np_1m[:, idx_datetime])
        self.dt_to_idx = {dt: i for i, dt in enumerate(self.dt_series)}

    # ── Ledger ────────────────────────────────────────────────────────────
    def _record(self, timestamp, event_type: str, action: str,
                buy_price: float, sell_price: float, pnl: float, note: str = ''):
        self.ledger.append({
            'datetime'  : timestamp,
            'event_type': event_type,
            'direction' : 'long' if self.direction > 0 else ('short' if self.direction < 0 else 'none'),
            'action'    : action,
            'buy_price' : round(buy_price, 4),
            'sell_price': round(sell_price, 4),
            'balance'   : round(self.balance, 4),
            'pnl'       : round(pnl, 4),
            'note'      : note,
        })

    # ── PnL Calculator ────────────────────────────────────────────────────
    def _calc_pnl(self, sell_price: float) -> float:
        if self.direction > 0:
            raw = ((sell_price - self.buy_price) / self.buy_price) * 100
        else:
            raw = ((self.buy_price - sell_price) / self.buy_price) * 100
        return raw * self.leverage - self.fee_pct - self.slippage

    # ── Open Position ─────────────────────────────────────────────────────
    def _open_position(self, timestamp, price: float, direction: int,
                       bar_idx: int, atr: float, event_type: str):
        self.direction    = direction
        self.buy_price    = price
        self.in_position  = True
        self.bars_held    = 0
        self.entry_bar_idx = bar_idx

        # ATR-based TP/SL
        if direction > 0:
            self.tp_price         = price + atr * self.atr_tp_mult
            self.sl_price         = price - atr * self.atr_sl_mult
            self.trailing_stop_px = price - atr * self.trailing_stop_mult
        else:
            self.tp_price         = price - atr * self.atr_tp_mult
            self.sl_price         = price + atr * self.atr_sl_mult
            self.trailing_stop_px = price + atr * self.trailing_stop_mult

        # Entry fee
        entry_pnl      = -self.fee_pct - self.slippage
        self.balance  += self.balance * (entry_pnl / 100)

        self._record(timestamp, event_type, 'BUY', price, 0.0, entry_pnl,
                     f'TP={self.tp_price:.2f} SL={self.sl_price:.2f} ATR={atr:.2f}')

    # ── Close Position ────────────────────────────────────────────────────
    def _close_position(self, timestamp, price: float,
                        event_type: str, note: str = ''):
        pnl           = self._calc_pnl(price)
        self.balance += self.balance * (pnl / 100)
        direction_save = self.direction
        self.in_position  = False
        self.direction     = 0
        self.bars_held     = 0

        label = 'SELL-WIN' if pnl > 0 else 'SELL-LOSS'
        self._record(timestamp, event_type, label,
                     self.buy_price, price, pnl, note)

    # ── Event Handlers ────────────────────────────────────────────────────
    def _handle_signal(self, ts, payload: dict, bar_idx: int):
        new_dir = payload.get('direction', 0)

        if new_dir == 0:
            return  # Neutral — hold or do nothing

        atr = self.atr_array[bar_idx]
        if np.isnan(atr):
            return  # Not enough history for ATR

        # Volatility regime filter: block entry if ATR too high
        price = float(self.np_1m[bar_idx, self.idx_open])
        vol_threshold_px = price * (self.atr_vol_threshold / 100)
        if atr > vol_threshold_px:
            self._record(ts, 'SIGNAL', 'BLOCKED', price, 0.0, 0.0,
                         f'ATR {atr:.2f} > vol threshold {vol_threshold_px:.2f}')
            return

        if self.in_position:
            if new_dir == self.direction:
                return  # Same direction — hold
            # Direction flip — close then re-enter
            self._close_position(ts, price, 'SIGNAL', note='direction_flip')

        self._open_position(ts, price, new_dir, bar_idx, atr, 'SIGNAL')

    def _handle_news(self, ts, payload: dict, bar_idx: int):
        magnitude = payload.get('magnitude', 0.5)
        label     = payload.get('label', 'News')
        price     = float(self.np_1m[bar_idx, self.idx_close])

        note = f'{label} mag={magnitude:.2f}'

        if self.in_position and magnitude >= self.news_force_close_mag:
            # High-magnitude news → force close with widened spread (extra slippage)
            spread_penalty = magnitude * 0.5  # additional % cost
            pnl = self._calc_pnl(price) - spread_penalty
            self.balance  += self.balance * (pnl / 100)
            self.in_position = False
            self.direction   = 0
            self.bars_held   = 0
            self._record(ts, 'NEWS', 'FORCE_CLOSE',
                         self.buy_price, price, pnl, note)
        else:
            # Low-magnitude news — log but do not close
            self._record(ts, 'NEWS', 'NOTED', 0.0, 0.0, 0.0, note)

    def _handle_atr_exit(self, ts, payload: dict, bar_idx: int):
        """Close position if current ATR has spiked beyond threshold."""
        if not self.in_position:
            return
        atr   = self.atr_array[bar_idx]
        price = float(self.np_1m[bar_idx, self.idx_close])
        if np.isnan(atr):
            return

        vol_threshold_px = price * (self.atr_vol_threshold / 100)
        if atr > vol_threshold_px:
            self._close_position(ts, price, 'ATR_EXIT',
                                 note=f'ATR={atr:.2f} > threshold={vol_threshold_px:.2f}')

    def _handle_trailing_stop(self, ts, payload: dict, bar_idx: int):
        """Update trailing stop and trigger if price breaches it."""
        if not self.in_position:
            return

        high  = float(self.np_1m[bar_idx, self.idx_high])
        low   = float(self.np_1m[bar_idx, self.idx_low])
        atr   = self.atr_array[bar_idx]
        if np.isnan(atr):
            return

        trail_dist = atr * self.trailing_stop_mult

        if self.direction > 0:   # LONG — trail stop moves up with high
            new_trail = high - trail_dist
            if new_trail > self.trailing_stop_px:
                self.trailing_stop_px = new_trail
            if low <= self.trailing_stop_px:
                self._close_position(ts, self.trailing_stop_px,
                                     'TRAILING_STOP',
                                     note=f'trail_px={self.trailing_stop_px:.2f}')
        else:                     # SHORT — trail stop moves down with low
            new_trail = low + trail_dist
            if new_trail < self.trailing_stop_px:
                self.trailing_stop_px = new_trail
            if high >= self.trailing_stop_px:
                self._close_position(ts, self.trailing_stop_px,
                                     'TRAILING_STOP',
                                     note=f'trail_px={self.trailing_stop_px:.2f}')

    def _handle_tp_sl(self, ts, payload: dict, bar_idx: int):
        """Check static ATR-based TP and SL levels."""
        if not self.in_position:
            return

        high = float(self.np_1m[bar_idx, self.idx_high])
        low  = float(self.np_1m[bar_idx, self.idx_low])

        if self.direction > 0:   # LONG
            if high >= self.tp_price:
                self._close_position(ts, self.tp_price, 'TP_HIT',
                                     note=f'TP={self.tp_price:.2f}')
                return
            if low <= self.sl_price:
                self._close_position(ts, self.sl_price, 'SL_HIT',
                                     note=f'SL={self.sl_price:.2f}')
        else:                    # SHORT
            if low <= self.tp_price:
                self._close_position(ts, self.tp_price, 'TP_HIT',
                                     note=f'TP={self.tp_price:.2f}')
                return
            if high >= self.sl_price:
                self._close_position(ts, self.sl_price, 'SL_HIT',
                                     note=f'SL={self.sl_price:.2f}')

    def _handle_time_exit(self, ts, payload: dict, bar_idx: int):
        """Force-close a position that has exceeded max_bars_in_trade."""
        if not self.in_position:
            return
        price = float(self.np_1m[bar_idx, self.idx_close])
        self._close_position(ts, price, 'TIME_EXIT',
                             note=f'bars_held={self.bars_held}')

    # ── Dispatch Table ────────────────────────────────────────────────────
    _HANDLERS = {
        'NEWS'         : _handle_news,
        'ATR_EXIT'     : _handle_atr_exit,
        'TRAILING_STOP': _handle_trailing_stop,
        'TP_HIT'       : _handle_tp_sl,
        'SL_HIT'       : _handle_tp_sl,
        'TIME_EXIT'    : _handle_time_exit,
        'SIGNAL'       : _handle_signal,
    }

    # ── Build Event Queue ─────────────────────────────────────────────────
    def _build_queue(self,
                     df_predictions: pd.DataFrame,
                     news_events   : list) -> list:
        """
        Populate the min-heap with SIGNAL and NEWS events.
        Per-bar events (ATR_EXIT, TRAILING_STOP, TP_HIT, TIME_EXIT) are
        injected on-the-fly during the main loop.
        """
        heap = []

        # ── SIGNAL events from model predictions ─────────────────────────
        for _, row in df_predictions.iterrows():
            ts  = row['datetime']
            evt = Event(
                timestamp  = ts,
                priority   = int(EventPriority.SIGNAL),
                event_type = 'SIGNAL',
                payload    = {'direction': int(row['predicted_direction'])},
            )
            heapq.heappush(heap, evt)

        # ── NEWS events ───────────────────────────────────────────────────
        for n in news_events:
            ts  = pd.Timestamp(n['timestamp'])
            evt = Event(
                timestamp  = ts,
                priority   = int(EventPriority.NEWS),
                event_type = 'NEWS',
                payload    = {'magnitude': n['magnitude'], 'label': n['label']},
            )
            heapq.heappush(heap, evt)

        return heap

    # ── Main Run Loop ─────────────────────────────────────────────────────
    def run(self,
            df_predictions: pd.DataFrame,
            news_events   : list = None) -> pd.DataFrame:
        """
        Execute the event-driven simulation.

        Parameters
        ----------
        df_predictions : pd.DataFrame
            Columns: ['datetime', 'predicted_direction']
        news_events : list of dicts
            Each dict: {timestamp, magnitude, label}

        Returns
        -------
        pd.DataFrame — trade ledger
        """
        if news_events is None:
            news_events = []

        df_predictions = df_predictions.copy()
        df_predictions['datetime'] = pd.to_datetime(df_predictions['datetime'])

        heap = self._build_queue(df_predictions, news_events)

        breaking_balance = self.starting_balance * 0.5
        dt_series        = pd.to_datetime(self.np_1m[:, self.idx_dt])

        print(f"Queue loaded: {len(heap)} initial events (signals + news)")
        print("Processing event queue...")

        processed = 0

        while heap:
            evt = heapq.heappop(heap)
            ts  = pd.Timestamp(evt.timestamp)

            # Find nearest 1-minute bar index
            idx_candidates = np.searchsorted(dt_series.values, ts.to_datetime64())
            bar_idx = int(min(idx_candidates, len(self.np_1m) - 1))

            # Track bars held and inject per-bar events
            if self.in_position:
                self.bars_held = bar_idx - self.entry_bar_idx

                # Inject ATR_EXIT check
                heapq.heappush(heap, Event(ts, int(EventPriority.ATR_EXIT),
                                           'ATR_EXIT', {}))
                # Inject TRAILING_STOP update
                heapq.heappush(heap, Event(ts, int(EventPriority.TRAILING_STOP),
                                           'TRAILING_STOP', {}))
                # Inject TP/SL check
                heapq.heappush(heap, Event(ts, int(EventPriority.TP_HIT),
                                           'TP_HIT', {}))
                # Inject TIME_EXIT if over max bars
                if self.bars_held >= self.max_bars_in_trade:
                    heapq.heappush(heap, Event(ts, int(EventPriority.TIME_EXIT),
                                               'TIME_EXIT', {}))

            # Dispatch event to handler
            handler = self._HANDLERS.get(evt.event_type)
            if handler:
                handler(self, ts, evt.payload, bar_idx)

            processed += 1

            # Max drawdown guard
            if self.balance < breaking_balance:
                print(f"  !! Max drawdown triggered at {ts} — balance ${self.balance:.2f}")
                if self.in_position:
                    price = float(self.np_1m[bar_idx, self.idx_close])
                    self._close_position(ts, price, 'MAX_DRAWDOWN', note='50% equity loss')
                break

        print(f"Events processed: {processed}")

        df_ledger = pd.DataFrame(self.ledger)
        if not df_ledger.empty:
            df_ledger['pnl_cumsum'] = df_ledger['pnl'].cumsum().round(4)

        return df_ledger


print("EventDrivenBacktest class defined.")
EventDrivenBacktest class defined.

Functional Refactoring: Helper Functions

Instead of a class, the backtesting engine's state (balance, in_position, buy_price, ledger, etc.) will be managed by a dictionary, backtest_state, passed between functions.

These helper functions modify or calculate values based on the backtest_state.

[7]
def record_trade(backtest_state: dict, timestamp, event_type: str, action: str,
                 buy_price: float, sell_price: float, pnl: float, note: str = '') -> None:
    """
    Records a trade or significant event in the ledger.
    Modifies `backtest_state['ledger']` in place.
    """
    direction_label = 'long' if backtest_state['current_state']['direction'] > 0 else \
                      ('short' if backtest_state['current_state']['direction'] < 0 else 'none')
    backtest_state['ledger'].append({
        'datetime'  : timestamp,
        'event_type': event_type,
        'direction' : direction_label,
        'action'    : action,
        'buy_price' : round(buy_price, 4),
        'sell_price': round(sell_price, 4),
        'balance'   : round(backtest_state['current_state']['balance'], 4),
        'pnl'       : round(pnl, 4),
        'note'      : note,
    })
[8]
def calculate_pnl(backtest_state: dict, sell_price: float) -> float:
    """
    Calculates the PnL for a closed position.
    This is a pure function that only reads from `backtest_state`.
    """
    direction   = backtest_state['current_state']['direction']
    buy_price   = backtest_state['current_state']['buy_price']
    leverage    = backtest_state['params']['leverage']
    fee_pct     = backtest_state['params']['fee_pct']
    slippage_pct = backtest_state['params']['slippage']

    if direction > 0:
        raw = ((sell_price - buy_price) / buy_price) * 100
    else:
        raw = ((buy_price - sell_price) / buy_price) * 100
    return raw * leverage - fee_pct - slippage_pct
[9]
def open_position(backtest_state: dict, timestamp, price: float, direction: int,
                  bar_idx: int, atr: float, event_type: str) -> dict:
    """
    Opens a new position and updates the `backtest_state`.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']
    params = state_copy['params']

    current_state['direction']    = direction
    current_state['buy_price']    = price
    current_state['in_position']  = True
    current_state['bars_held']    = 0
    current_state['entry_bar_idx'] = bar_idx

    # ATR-based TP/SL
    if direction > 0:
        current_state['tp_price']         = price + atr * params['atr_tp_mult']
        current_state['sl_price']         = price - atr * params['atr_sl_mult']
        current_state['trailing_stop_px'] = price - atr * params['trailing_stop_mult']
    else:
        current_state['tp_price']         = price - atr * params['atr_tp_mult']
        current_state['sl_price']         = price + atr * params['atr_sl_mult']
        current_state['trailing_stop_px'] = price + atr * params['trailing_stop_mult']

    # Entry fee
    entry_pnl = -params['fee_pct'] - params['slippage']
    current_state['balance'] += current_state['balance'] * (entry_pnl / 100)

    record_trade(state_copy, timestamp, event_type, 'BUY', price, 0.0, entry_pnl,
                 f"TP={current_state['tp_price']:.2f} SL={current_state['sl_price']:.2f} ATR={atr:.2f}")

    return state_copy
[10]
def close_position(backtest_state: dict, timestamp, price: float,
                   event_type: str, note: str = '') -> dict:
    """
    Closes an open position and updates the `backtest_state`.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']

    pnl = calculate_pnl(state_copy, price)
    current_state['balance'] += current_state['balance'] * (pnl / 100)

    label = 'SELL-WIN' if pnl > 0 else 'SELL-LOSS'

    record_trade(state_copy, timestamp, event_type, label,
                 current_state['buy_price'], price, pnl, note)

    # Reset position state
    current_state['in_position']  = False
    current_state['direction']     = 0
    current_state['bars_held']     = 0
    current_state['buy_price']     = 0.0
    current_state['tp_price']      = 0.0
    current_state['sl_price']      = 0.0
    current_state['trailing_stop_px'] = 0.0
    current_state['entry_bar_idx'] = 0

    return state_copy

Functional Refactoring: Event Handlers

Each event handler is now a standalone function that takes the current backtest_state and relevant data, returning an updated backtest_state.

[11]
def handle_signal(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
    """
    Handles a SIGNAL event: opens, closes, or flips a position.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']
    params = state_copy['params']
    runtime_data = state_copy['runtime_data']
    data_indices = state_copy['data_indices']

    new_dir = payload.get('direction', 0)

    if new_dir == 0:
        return state_copy  # Neutral — hold or do nothing

    atr = runtime_data['atr_array'][bar_idx]
    if np.isnan(atr):
        return state_copy  # Not enough history for ATR

    price = float(runtime_data['np_1m'][bar_idx, data_indices['idx_open']])
    vol_threshold_px = price * (params['atr_vol_threshold'] / 100)

    # Volatility regime filter: block entry if ATR too high
    if atr > vol_threshold_px:
        record_trade(state_copy, ts, 'SIGNAL', 'BLOCKED', price, 0.0, 0.0,
                     f'ATR {atr:.2f} > vol threshold {vol_threshold_px:.2f}')
        return state_copy

    if current_state['in_position']:
        if new_dir == current_state['direction']:
            return state_copy  # Same direction — hold
        # Direction flip — close then re-enter
        state_copy = close_position(state_copy, ts, price, 'SIGNAL', note='direction_flip')

    state_copy = open_position(state_copy, ts, price, new_dir, bar_idx, atr, 'SIGNAL')
    return state_copy
[12]
def handle_news(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
    """
    Handles a NEWS event: force-closes a position if magnitude is high.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']
    params = state_copy['params']
    runtime_data = state_copy['runtime_data']
    data_indices = state_copy['data_indices']

    magnitude = payload.get('magnitude', 0.5)
    label     = payload.get('label', 'News')
    price     = float(runtime_data['np_1m'][bar_idx, data_indices['idx_close']])

    note = f'{label} mag={magnitude:.2f}'

    if current_state['in_position'] and magnitude >= params['news_force_close_mag']:
        # High-magnitude news → force close with widened spread (extra slippage)
        spread_penalty = magnitude * 0.5  # additional % cost
        pnl = calculate_pnl(state_copy, price) - spread_penalty
        current_state['balance'] += current_state['balance'] * (pnl / 100)

        record_trade(state_copy, ts, 'NEWS', 'FORCE_CLOSE',
                     current_state['buy_price'], price, pnl, note)

        # Reset position state
        current_state['in_position'] = False
        current_state['direction']   = 0
        current_state['bars_held']   = 0

    else:
        # Low-magnitude news — log but do not close
        record_trade(state_copy, ts, 'NEWS', 'NOTED', 0.0, 0.0, 0.0, note)

    return state_copy
[13]
def handle_atr_exit(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
    """
    Closes position if current ATR has spiked beyond threshold.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']
    params = state_copy['params']
    runtime_data = state_copy['runtime_data']
    data_indices = state_copy['data_indices']

    if not current_state['in_position']:
        return state_copy

    atr = runtime_data['atr_array'][bar_idx]
    if np.isnan(atr):
        return state_copy

    price = float(runtime_data['np_1m'][bar_idx, data_indices['idx_close']])
    vol_threshold_px = price * (params['atr_vol_threshold'] / 100)

    if atr > vol_threshold_px:
        state_copy = close_position(state_copy, ts, price, 'ATR_EXIT',
                                     note=f'ATR={atr:.2f} > threshold={vol_threshold_px:.2f}')

    return state_copy
[14]
def handle_trailing_stop(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
    """
    Updates trailing stop and triggers if price breaches it.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']
    params = state_copy['params']
    runtime_data = state_copy['runtime_data']
    data_indices = state_copy['data_indices']

    if not current_state['in_position']:
        return state_copy

    high  = float(runtime_data['np_1m'][bar_idx, data_indices['idx_high']])
    low   = float(runtime_data['np_1m'][bar_idx, data_indices['idx_low']])
    atr   = runtime_data['atr_array'][bar_idx]

    if np.isnan(atr):
        return state_copy

    trail_dist = atr * params['trailing_stop_mult']

    if current_state['direction'] > 0:   # LONG — trail stop moves up with high
        new_trail = high - trail_dist
        if new_trail > current_state['trailing_stop_px']:
            current_state['trailing_stop_px'] = new_trail
        if low <= current_state['trailing_stop_px']:
            state_copy = close_position(state_copy, ts, current_state['trailing_stop_px'],
                                         'TRAILING_STOP',
                                         note=f'trail_px={current_state['trailing_stop_px']:.2f}')
    else:                     # SHORT — trail stop moves down with low
        new_trail = low + trail_dist
        if new_trail < current_state['trailing_stop_px']:
            current_state['trailing_stop_px'] = new_trail
        if high >= current_state['trailing_stop_px']:
            state_copy = close_position(state_copy, ts, current_state['trailing_stop_px'],
                                         'TRAILING_STOP',
                                         note=f'trail_px={current_state['trailing_stop_px']:.2f}')

    return state_copy
[15]
def handle_tp_sl(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
    """
    Checks static ATR-based Take Profit (TP) and Stop Loss (SL) levels.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']
    runtime_data = state_copy['runtime_data']
    data_indices = state_copy['data_indices']

    if not current_state['in_position']:
        return state_copy

    high = float(runtime_data['np_1m'][bar_idx, data_indices['idx_high']])
    low  = float(runtime_data['np_1m'][bar_idx, data_indices['idx_low']])

    if current_state['direction'] > 0:   # LONG
        if high >= current_state['tp_price']:
            state_copy = close_position(state_copy, ts, current_state['tp_price'], 'TP_HIT',
                                         note=f'TP={current_state['tp_price']:.2f}')
            return state_copy
        if low <= current_state['sl_price']:
            state_copy = close_position(state_copy, ts, current_state['sl_price'], 'SL_HIT',
                                         note=f'SL={current_state['sl_price']:.2f}')
            return state_copy
    else:                    # SHORT
        if low <= current_state['tp_price']:
            state_copy = close_position(state_copy, ts, current_state['tp_price'], 'TP_HIT',
                                         note=f'TP={current_state['tp_price']:.2f}')
            return state_copy
        if high >= current_state['sl_price']:
            state_copy = close_position(state_copy, ts, current_state['sl_price'], 'SL_HIT',
                                         note=f'SL={current_state['sl_price']:.2f}')
            return state_copy

    return state_copy
[16]
def handle_time_exit(backtest_state: dict, ts, payload: dict, bar_idx: int) -> dict:
    """
    Force-closes a position that has exceeded `max_bars_in_trade`.
    """
    state_copy = backtest_state.copy()
    current_state = state_copy['current_state']
    runtime_data = state_copy['runtime_data']
    data_indices = state_copy['data_indices']

    if not current_state['in_position']:
        return state_copy

    price = float(runtime_data['np_1m'][bar_idx, data_indices['idx_close']])
    state_copy = close_position(state_copy, ts, price, 'TIME_EXIT',
                                 note=f'bars_held={current_state['bars_held']}')
    return state_copy

Functional Refactoring: Dispatch Table and Queue Builder

The EVENT_HANDLERS dictionary maps event types to their corresponding functional handlers. The build_event_queue function remains largely the same, populating the min-heap with initial events.

[17]
# Dispatch table mapping event types to handler functions
EVENT_HANDLERS = {
    'NEWS'         : handle_news,
    'ATR_EXIT'     : handle_atr_exit,
    'TRAILING_STOP': handle_trailing_stop,
    'TP_HIT'       : handle_tp_sl,
    'SL_HIT'       : handle_tp_sl,
    'TIME_EXIT'    : handle_time_exit,
    'SIGNAL'       : handle_signal,
}

print("Event handler dispatch table defined.")
Event handler dispatch table defined.
[18]
import pandas as pd
import heapq
from dataclasses import dataclass, field
from typing import Optional
from enum import IntEnum

# EventPriority and Event are assumed to be defined in the global scope (e.g., cell 4ygqUmwqWvk6)

def build_event_queue(df_predictions: pd.DataFrame, news_events: list) -> list:
    """
    Populates the min-heap with SIGNAL and NEWS events.
    Per-bar events are injected on-the-fly during the main loop.
    """
    heap = []

    # SIGNAL events from model predictions
    for _, row in df_predictions.iterrows():
        ts  = row['datetime']
        evt = Event(
            timestamp  = ts,
            priority   = int(EventPriority.SIGNAL),
            event_type = 'SIGNAL',
            payload    = {'direction': int(row['predicted_direction'])},
        )
        heapq.heappush(heap, evt)

    # NEWS events
    for n in news_events:
        ts  = pd.Timestamp(n['timestamp'])
        evt = Event(
            timestamp  = ts,
            priority   = int(EventPriority.NEWS),
            event_type = 'NEWS',
            payload    = {'magnitude': n['magnitude'], 'label': n['label']},
        )
        heapq.heappush(heap, evt)

    return heap

print("Event queue builder defined.")
Event queue builder defined.

Functional Refactoring: Main Execution Loop

The execute_backtest_run function now encapsulates the core logic of the backtesting engine, managing the event queue and dispatching events to the appropriate handler functions. It takes an initial_state dictionary and returns the final backtest_state and the df_ledger.

[19]
import pandas as pd
import numpy as np
import heapq
from dataclasses import dataclass, field
from typing import Optional
from enum import IntEnum

# EventPriority and Event are assumed to be defined in the global scope (e.g., cell 4ygqUmwqWvk6)

def execute_backtest_run(
    initial_state: dict,
    df_predictions: pd.DataFrame,
    news_events: list = None
) -> tuple[dict, pd.DataFrame]:
    """
    Executes the event-driven simulation with a functional approach.

    Parameters
    ----------
    initial_state : dict
        The initial state dictionary for the backtest.
    df_predictions : pd.DataFrame
        Columns: ['datetime', 'predicted_direction']
    news_events : list of dicts, optional
        Each dict: {timestamp, magnitude, label}

    Returns
    -------
    tuple[dict, pd.DataFrame]
        The final backtest state and the trade ledger DataFrame.
    """
    if news_events is None:
        news_events = []

    # Create a mutable copy of the initial state
    backtest_state = initial_state.copy()
    backtest_state['ledger'] = initial_state['ledger'].copy() # ensure ledger is a mutable list

    df_predictions = df_predictions.copy()
    df_predictions['datetime'] = pd.to_datetime(df_predictions['datetime'])

    # build_event_queue relies on Event and EventPriority
    heap = build_event_queue(df_predictions, news_events)

    breaking_balance = backtest_state['params']['starting_balance'] * 0.5
    dt_series        = backtest_state['runtime_data']['dt_series']
    np_1m            = backtest_state['runtime_data']['np_1m']
    idx_dt           = backtest_state['data_indices']['idx_dt']

    print(f"Queue loaded: {len(heap)} initial events (signals + news)")
    print("Processing event queue...")

    processed = 0

    while heap:
        evt = heapq.heappop(heap)
        ts  = pd.Timestamp(evt.timestamp)

        # Find nearest 1-minute bar index
        # Corrected typo from to_datetime600() to to_datetime64()
        idx_candidates = np.searchsorted(dt_series.values, ts.to_datetime64())
        bar_idx = int(min(idx_candidates, len(np_1m) - 1))

        # Track bars held and inject per-bar events
        if backtest_state['current_state']['in_position']:
            backtest_state['current_state']['bars_held'] = bar_idx - backtest_state['current_state']['entry_bar_idx']

            # Inject ATR_EXIT check
            heapq.heappush(heap, Event(ts, int(EventPriority.ATR_EXIT),
                                       'ATR_EXIT', {}))
            # Inject TRAILING_STOP update
            heapq.heappush(heap, Event(ts, int(EventPriority.TRAILING_STOP),
                                       'TRAILING_STOP', {}))
            # Inject TP/SL check
            heapq.heappush(heap, Event(ts, int(EventPriority.TP_HIT),
                                       'TP_HIT', {}))
            # Inject TIME_EXIT if over max bars
            if backtest_state['current_state']['bars_held'] >= backtest_state['params']['max_bars_in_trade']:
                heapq.heappush(heap, Event(ts, int(EventPriority.TIME_EXIT),
                                           'TIME_EXIT', {}))

        # Dispatch event to handler
        handler = EVENT_HANDLERS.get(evt.event_type)
        if handler:
            # Pass the entire state and update it with the handler's return value
            backtest_state = handler(backtest_state, ts, evt.payload, bar_idx)

        processed += 1

        # Max drawdown guard
        if backtest_state['current_state']['balance'] < breaking_balance:
            print(f"  !! Max drawdown triggered at {ts} — balance ${backtest_state['current_state']['balance']:.2f}")
            if backtest_state['current_state']['in_position']:
                price = float(np_1m[bar_idx, backtest_state['data_indices']['idx_close']])
                backtest_state = close_position(backtest_state, ts, price, 'MAX_DRAWDOWN', note='50% equity loss')
            break

    print(f"Events processed: {processed}")

    df_ledger = pd.DataFrame(backtest_state['ledger'])
    if not df_ledger.empty:
        df_ledger['pnl_cumsum'] = df_ledger['pnl'].cumsum().round(4)

    return backtest_state, df_ledger

print("Functional backtest execution loop defined.")
Functional backtest execution loop defined.

Step 7 — Execute Event-Driven Backtest

Configure all parameters in the cell below, then run. The engine will:

  1. Generate 1-minute mock OHLCV data
  2. Generate model prediction signals
  3. Inject random news events
  4. Build and drain the event queue
  5. Return a full trade ledger
[ ]
import pandas as pd
import numpy as np

def run_event_driven_simulation(
    # Data
    n_days          = 15,
    base_price      = 30000.0,
    n_periods       = 80,
    freq            = '4h',
    news_rate       = 0.02,

    # Capital
    starting_balance = 1000.0,
    leverage         = 1.0,
    transaction_fee  = 0.05,
    slippage         = 0.0,

    # ATR
    atr_period         = 14,
    atr_sl_mult        = 1.5,
    atr_tp_mult        = 2.5,
    atr_vol_threshold  = 3.0,   # % of price; blocks entry if ATR > this
    trailing_stop_mult = 1.0,

    # Trade management
    max_bars_in_trade    = 240,   # 4 hours at 1-minute resolution
    news_force_close_mag = 0.6,   # magnitude >= this → force close
    buy_after_minutes    = 0,
):
    print("=" * 55)
    print("  EVENT-DRIVEN BACKTEST — NOTEBOOK 67")
    print("=" * 55)

    # ── Generate data ──────────────────────────────────────────────
    print("\n[1/4] Generating 1-minute OHLCV data...")
    np_1m, idx_dt, idx_open, idx_high, idx_low, idx_close = generate_mock_1m_data(
        n_days=n_days, base_price=base_price
    )
    print(f"      Shape : {np_1m.shape}")
    print(f"      Range : {np_1m[0, idx_dt]}{np_1m[-1, idx_dt]}")

    # ── Generate predictions ───────────────────────────────────────
    print("\n[2/4] Generating model predictions...")
    df_pred = generate_mock_predictions(n_periods=n_periods, freq=freq)
    print(f"      Periods : {len(df_pred)}")
    print(f"      Signals : {df_pred['predicted_direction'].value_counts().to_dict()}")

    # ── Generate news events ───────────────────────────────────────
    print("\n[3/4] Generating news events...")
    news = generate_news_events(np_1m, idx_dt, news_rate=news_rate)
    print(f"      News events injected : {len(news)}")
    if news:
        mags = [n['magnitude'] for n in news]
        print(f"      Magnitude range     : {min(mags):.2f}{max(mags):.2f}")
        high_impact = [n for n in news if n['magnitude'] >= news_force_close_mag]
        print(f"      High-impact (>={news_force_close_mag:.1f}) : {len(high_impact)}")

    # ── Initialize functional backtest state ───────────────────────
    print("\n[4/4] Running event-driven engine (functional)...")
    initial_state = {
        'params': {
            'starting_balance': starting_balance,
            'atr_period': atr_period,
            'atr_sl_mult': atr_sl_mult,
            'atr_tp_mult': atr_tp_mult,
            'atr_vol_threshold': atr_vol_threshold,
            'trailing_stop_mult': trailing_stop_mult,
            'max_bars_in_trade': max_bars_in_trade,
            'news_force_close_mag': news_force_close_mag,
            'fee_pct': transaction_fee * leverage,
            'slippage': slippage,
            'leverage': leverage,
            'buy_after_minutes': buy_after_minutes,
        },
        'current_state': {
            'balance': starting_balance,
            'in_position': False,
            'direction': 0,
            'buy_price': 0.0,
            'tp_price': 0.0,
            'sl_price': 0.0,
            'trailing_stop_px': 0.0,
            'bars_held': 0,
            'entry_bar_idx': 0,
        },
        'runtime_data': {
            'np_1m': np_1m,
            'atr_array': compute_atr(np_1m, idx_high, idx_low, idx_close, period=atr_period),
            'dt_series': pd.to_datetime(np_1m[:, idx_dt]),
        },
        'data_indices': {
            'idx_dt': idx_dt,
            'idx_open': idx_open,
            'idx_high': idx_high,
            'idx_low': idx_low,
            'idx_close': idx_close,
        },
        'ledger': [],
    }

    final_state, df_ledger = execute_backtest_run(initial_state, df_pred, news)

    final_balance = round(final_state['current_state']['balance'], 2)
    pnl_pct       = round(df_ledger['pnl'].sum(), 4) if not df_ledger.empty else 0

    print("\n" + "=" * 55)
    print(f"  Starting Balance : ${starting_balance:,.2f}")
    print(f"  Final Balance    : ${final_balance:,.2f}")
    print(f"  Net PnL (%)     : {pnl_pct:.4f}%")
    print(f"  Total Events     : {len(df_ledger)}")
    print("=" * 55)

    return df_ledger, final_state


# ── Run with default parameters ────────────────────────────────────────────
df_ledger, engine = run_event_driven_simulation(
    starting_balance    = 1000.0,
    leverage            = 1.0,
    transaction_fee     = 0.05,
    slippage            = 0.0,
    atr_period          = 14,
    atr_sl_mult         = 1.5,
    atr_tp_mult         = 2.5,
    atr_vol_threshold   = 3.0,
    trailing_stop_mult  = 1.0,
    max_bars_in_trade   = 240,
    news_force_close_mag= 0.6,
    news_rate           = 0.02,
)
=======================================================
  EVENT-DRIVEN BACKTEST — NOTEBOOK 67
=======================================================

[1/4] Generating 1-minute OHLCV data...
      Shape : (21600, 6)
      Range : 1704067200000000000  →  1705363140000000000

[2/4] Generating model predictions...
      Periods : 80
      Signals : {-1: 35, 1: 35, 0: 10}

[3/4] Generating news events...
      News events injected : 415
      Magnitude range     : 0.10 – 0.99
      High-impact (>=0.6) : 185

[4/4] Running event-driven engine (functional)...
Queue loaded: 495 initial events (signals + news)
Processing event queue...

Step 8 — Trade Ledger Inspection

The ledger records every event that produced an action. event_type tells you what triggered each row.

[ ]
if not df_ledger.empty:
    pd.set_option('display.max_rows', 120)
    pd.set_option('display.float_format', '{:.4f}'.format)

    print(f"Total ledger records : {len(df_ledger)}")
    print(f"Columns              : {df_ledger.columns.tolist()}")
    print()
    display(df_ledger)
else:
    print("No ledger records generated.")

Step 9 — Performance Analysis

Breaks down results by exit type so you can see how much P&L came from:

  • ATR-based TP hits vs SL hits
  • Trailing stop exits
  • News force-closes
  • Time exits (held too long)
  • Direction flips
[ ]
if not df_ledger.empty:

    # ── Event type distribution ────────────────────────────────────────
    print("Event Type Distribution:")
    print("-" * 40)
    print(df_ledger['event_type'].value_counts().to_string())
    print()

    # ── Action distribution ────────────────────────────────────────────
    print("Action Distribution:")
    print("-" * 40)
    print(df_ledger['action'].value_counts().to_string())
    print()

    # ── Exit-only P&L breakdown ────────────────────────────────────────
    exit_actions = ['SELL-WIN', 'SELL-LOSS', 'FORCE_CLOSE']
    df_exits = df_ledger[df_ledger['action'].isin(exit_actions)].copy()

    if not df_exits.empty:
        wins   = df_exits[df_exits['pnl'] > 0]
        losses = df_exits[df_exits['pnl'] <= 0]

        win_rate     = len(wins) / len(df_exits) * 100
        avg_win      = wins['pnl'].mean()   if len(wins)   > 0 else 0
        avg_loss     = losses['pnl'].mean() if len(losses) > 0 else 0
        total_win    = wins['pnl'].sum()
        total_loss   = losses['pnl'].sum()
        profit_factor = (
            total_win / abs(total_loss)
            if total_loss != 0 else float('inf')
        )

        print("=" * 55)
        print("  PERFORMANCE METRICS")
        print("=" * 55)
        print(f"  Exit Events      : {len(df_exits)}")
        print(f"  Winning Trades   : {len(wins)}")
        print(f"  Losing Trades    : {len(losses)}")
        print(f"  Win Rate         : {win_rate:.1f}%")
        print(f"  Avg Win PnL      : {avg_win:.4f}%")
        print(f"  Avg Loss PnL     : {avg_loss:.4f}%")
        print(f"  Profit Factor    : {profit_factor:.2f}")
        print(f"  Total PnL        : {df_exits['pnl'].sum():.4f}%")
        print(f"  Final Balance    : ${final_state['current_state']['balance']:,.2f}")
        print("=" * 55)

        # ── P&L by exit type ───────────────────────────────────────────
        print("\nP&L by Exit Event Type:")
        print("-" * 55)
        pnl_by_type = (
            df_exits.groupby('event_type')['pnl']
            .agg(count='count', total_pnl='sum', avg_pnl='mean')
            .round(4)
        )
        print(pnl_by_type.to_string())

        # ── News event summary ─────────────────────────────────────────
        df_news = df_ledger[df_ledger['event_type'] == 'NEWS']
        if not df_news.empty:
            force_closed = df_news[df_news['action'] == 'FORCE_CLOSE']
            noted        = df_news[df_news['action'] == 'NOTED']
            print(f"\nNews Events Summary:")
            print("-" * 40)
            print(f"  Total news events   : {len(df_news)}")
            print(f"  Force closes        : {len(force_closed)}")
            print(f"  Low impact (noted)  : {len(noted)}")
            if len(force_closed) > 0:
                print(f"  PnL from force-close: {force_closed['pnl'].sum():.4f}%")
    else:
        print("No exit events recorded.")
else:
    print("Ledger is empty.")

Operational Notes & Next Steps

Connecting Real Data

Replace generate_mock_1m_data() with your own OHLCV loader:

np_1m = your_dataframe[['datetime','open','high','low','close','volume']].to_numpy()

Column indices must match (0=datetime, 1=open, 2=high, 3=low, 4=close).

Connecting Real Predictions

Replace generate_mock_predictions() with your model output DataFrame — same two-column format: datetime, predicted_direction.

Connecting Real News

Replace generate_news_events() with a real news feed (e.g., Benzinga, NewsAPI). Map each article to a magnitude score using a sentiment model or keyword classifier.

ATR Parameter Tuning

ParameterConservativeAggressive
atr_sl_mult2.01.0
atr_tp_mult3.01.5
atr_vol_threshold5.02.0
trailing_stop_mult1.50.5
max_bars_in_trade60480

Production Upgrades

  • Replace REST-based 1-minute data with WebSocket streaming for live deployment
  • Add position sizing (Kelly criterion or fixed fractional) based on ATR signal confidence
  • Log events to a database for post-trade analysis
  • Add multi-symbol support by running one engine instance per instrument in parallel