Notebooks/Store OHLCV in PostgreSQL
Data·Storage·Beginner

Store OHLCV in PostgreSQL

Design and populate a PostgreSQL schema for market data — table structure, indexing strategy, and bulk insert patterns.

postgresqlsqlstorage

Data Storage Framework — PostgreSQL

This notebook defines a standardized protocol for persisting cleaned OHLCV data into a PostgreSQL relational database. It covers schema creation, datetime-indexed table definition, insertion, and read-back verification using SQLAlchemy and pandas.


1. Dependency Installation

[24]
!pip install pandas sqlalchemy psycopg2-binary
Requirement already satisfied: pandas in /usr/local/lib/python3.12/dist-packages (2.2.2)
Requirement already satisfied: sqlalchemy in /usr/local/lib/python3.12/dist-packages (2.0.49)
Requirement already satisfied: psycopg2-binary in /usr/local/lib/python3.12/dist-packages (2.9.12)
Requirement already satisfied: numpy>=1.26.0 in /usr/local/lib/python3.12/dist-packages (from pandas) (2.0.2)
Requirement already satisfied: python-dateutil>=2.8.2 in /usr/local/lib/python3.12/dist-packages (from pandas) (2.9.0.post0)
Requirement already satisfied: pytz>=2020.1 in /usr/local/lib/python3.12/dist-packages (from pandas) (2025.2)
Requirement already satisfied: tzdata>=2022.7 in /usr/local/lib/python3.12/dist-packages (from pandas) (2026.1)
Requirement already satisfied: greenlet>=1 in /usr/local/lib/python3.12/dist-packages (from sqlalchemy) (3.4.0)
Requirement already satisfied: typing-extensions>=4.6.0 in /usr/local/lib/python3.12/dist-packages (from sqlalchemy) (4.15.0)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.12/dist-packages (from python-dateutil>=2.8.2->pandas) (1.17.0)

2. Library Imports

[25]
import warnings
warnings.filterwarnings("ignore")

import pandas as pd
from sqlalchemy import create_engine, text

3. What Is PostgreSQL and Why Use It for OHLCV Data?

PostgreSQL is an open-source relational database management system (RDBMS). Data is organized into tables with strictly typed columns and rows. PostgreSQL enforces schema constraints (e.g., NOT NULL, PRIMARY KEY) at the database level, preventing corrupt or duplicate data from being stored regardless of what the application layer sends.

For OHLCV data, PostgreSQL is appropriate when:

  • Data from multiple exchanges and symbols must be queried together with SQL JOIN operations.
  • Strict data integrity is required — the PRIMARY KEY on datetime guarantees no duplicate candles exist.
  • The dataset is moderate in size (millions of rows) and does not require the specialized time-series optimizations of TimescaleDB.

SQLAlchemy is a Python library that provides a unified interface to relational databases. Rather than writing raw database connection strings and cursor management, SQLAlchemy handles connection pooling, transaction management, and dialect translation — the same Python code works against PostgreSQL, SQLite, or MySQL by changing the connection URL.


4. Dummy Dataset

[26]
raw_data = {
    "datetime": [
        "2024-01-01 00:00:00+00:00",
        "2024-01-01 00:01:00+00:00",
        "2024-01-01 00:02:00+00:00",
        "2024-01-01 00:03:00+00:00",
        "2024-01-01 00:04:00+00:00",
    ],
    "open":   [42100.0, 42200.0, 42150.0, 42300.0, 42250.0],
    "high":   [42300.0, 42400.0, 42350.0, 42500.0, 42450.0],
    "low":    [41900.0, 42000.0, 41950.0, 42100.0, 42050.0],
    "close":  [42200.0, 42150.0, 42300.0, 42250.0, 42400.0],
    "volume": [10.5,    8.2,     9.1,     11.3,    7.6    ],
}

df = pd.DataFrame(raw_data)
df["datetime"] = pd.to_datetime(df["datetime"], utc=True)

print("--- OHLCV DataFrame ---")
display(df)
df.info()
--- OHLCV DataFrame ---
datetime open high low close volume
0 2024-01-01 00:00:00+00:00 42100.0 42300.0 41900.0 42200.0 10.5
1 2024-01-01 00:01:00+00:00 42200.0 42400.0 42000.0 42150.0 8.2
2 2024-01-01 00:02:00+00:00 42150.0 42350.0 41950.0 42300.0 9.1
3 2024-01-01 00:03:00+00:00 42300.0 42500.0 42100.0 42250.0 11.3
4 2024-01-01 00:04:00+00:00 42250.0 42450.0 42050.0 42400.0 7.6
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 6 columns):
 #   Column    Non-Null Count  Dtype              
---  ------    --------------  -----              
 0   datetime  5 non-null      datetime64[ns, UTC]
 1   open      5 non-null      float64            
 2   high      5 non-null      float64            
 3   low       5 non-null      float64            
 4   close     5 non-null      float64            
 5   volume    5 non-null      float64            
dtypes: datetime64[ns, UTC](1), float64(5)
memory usage: 372.0 bytes

5. Database Configuration

[27]
DB_HOST = "172.27.112.1"
DB_PORT = 5432
DB_NAME = "criptofocus_db"
DB_USER = "postgres"
DB_PASS = "arsalan"
SCHEMA  = "data_ohlcv"
TABLE   = "ohlcv_btcusdt_1m"

DB_URL  = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine  = create_engine(DB_URL, echo=False)

6. Table Initialization Function

[28]
def initialize_table(engine, schema: str, table: str) -> None:
    ddl = f"""
        CREATE SCHEMA IF NOT EXISTS {schema};

        CREATE TABLE IF NOT EXISTS {schema}.{table} (
            datetime  TIMESTAMPTZ      NOT NULL,
            open      DOUBLE PRECISION NOT NULL,
            high      DOUBLE PRECISION NOT NULL,
            low       DOUBLE PRECISION NOT NULL,
            close     DOUBLE PRECISION NOT NULL,
            volume    DOUBLE PRECISION NOT NULL,
            CONSTRAINT {table}_pkey PRIMARY KEY (datetime)
        );
    """
    with engine.begin() as conn:
        conn.execute(text(ddl))

    print(f"Table '{schema}.{table}' verified.")

Code Logic

  • CREATE SCHEMA IF NOT EXISTS: Creates the target namespace only if absent — safe to re-execute.
  • CREATE TABLE IF NOT EXISTS: Idempotent table definition, safe on repeated runs without dropping existing data.
  • TIMESTAMPTZ: PostgreSQL timezone-aware timestamp. Stores UTC offset metadata and normalizes all inserts to UTC internally — equivalent to Python DatetimeTZDtype[ns, UTC].
  • DOUBLE PRECISION: 64-bit floating point, equivalent to Python float64. Preserves price precision for BTC values in the tens of thousands.
  • PRIMARY KEY (datetime): Enforces row uniqueness on datetime at the database level. Any attempt to insert a duplicate candle timestamp raises an IntegrityError before the row reaches storage.
  • engine.begin(): Opens a transactional connection. All DDL is committed atomically on context exit.

7. Save Function

[29]
def save_to_postgres(df: pd.DataFrame, engine, schema: str, table: str) -> int:
    df = df.copy().set_index("datetime")

    df.to_sql(
        name        = table,
        con         = engine,
        schema      = schema,
        if_exists   = "append",
        index       = True,
        index_label = "datetime",
        method      = "multi",
    )

    print(f"Inserted {len(df)} row(s) → {schema}.{table}")
    return len(df)

Code Logic

  • df.set_index("datetime"): Promotes datetime to the DataFrame index so to_sql writes it as the named primary key column rather than generating a separate integer index.
  • if_exists="append": Appends rows without truncating prior data. Duplicate datetime values are rejected by the PRIMARY KEY constraint at the database level.
  • method="multi": Emits a single multi-row INSERT per batch, significantly reducing round-trip overhead compared to one statement per row.

8. Execution

[30]
initialize_table(engine, SCHEMA, TABLE)
rows_inserted = save_to_postgres(df, engine, SCHEMA, TABLE)
---------------------------------------------------------------------------
OperationalError                          Traceback (most recent call last)
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in __init__(self, engine, connection, _has_events, _allow_revalidate, _allow_autobegin)
    142             try:
--> 143                 self._dbapi_connection = engine.raw_connection()
    144             except dialect.loaded_dbapi.Error as err:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in raw_connection(self)
   3316         """
-> 3317         return self.pool.connect()
   3318 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in connect(self)
    447         """
--> 448         return _ConnectionFairy._checkout(self)
    449 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy)
   1271         if not fairy:
-> 1272             fairy = _ConnectionRecord.checkout(pool)
   1273 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in checkout(cls, pool)
    711         else:
--> 712             rec = pool._do_get()
    713 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
    176             except:
--> 177                 with util.safe_reraise():
    178                     self._dec_overflow()

/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
    120             self._exc_info = None  # remove potential circular references
--> 121             raise exc_value.with_traceback(exc_tb)
    122         else:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
    174             try:
--> 175                 return self._create_connection()
    176             except:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _create_connection(self)
    388 
--> 389         return _ConnectionRecord(self)
    390 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect)
    673         if connect:
--> 674             self.__connect()
    675         self.finalize_callback = deque()

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
    899         except BaseException as e:
--> 900             with util.safe_reraise():
    901                 pool.logger.debug("Error on connect(): %s", e)

/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
    120             self._exc_info = None  # remove potential circular references
--> 121             raise exc_value.with_traceback(exc_tb)
    122         else:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
    895             self.starttime = time.time()
--> 896             self.dbapi_connection = connection = pool._invoke_creator(self)
    897             pool.logger.debug("Created new connection %r", connection)

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/create.py in connect(connection_record)
    666             else:
--> 667                 return dialect.connect(*cargs_tup, **cparams)
    668 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams)
    629         # inherits the docstring from interfaces.Dialect.connect
--> 630         return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
    631 

/usr/local/lib/python3.12/dist-packages/psycopg2/__init__.py in connect(dsn, connection_factory, cursor_factory, **kwargs)
    121     dsn = _ext.make_dsn(dsn, **kwargs)
--> 122     conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
    123     if cursor_factory is not None:

OperationalError: connection to server at "172.27.112.1", port 5432 failed: Connection timed out
	Is the server running on that host and accepting TCP/IP connections?


The above exception was the direct cause of the following exception:

OperationalError                          Traceback (most recent call last)
/tmp/ipykernel_2991/3768471397.py in <cell line: 0>()
----> 1 initialize_table(engine, SCHEMA, TABLE)
      2 rows_inserted = save_to_postgres(df, engine, SCHEMA, TABLE)

/tmp/ipykernel_2991/1711263443.py in initialize_table(engine, schema, table)
     13         );
     14     """
---> 15     with engine.begin() as conn:
     16         conn.execute(text(ddl))
     17 

/usr/lib/python3.12/contextlib.py in __enter__(self)
    135         del self.args, self.kwds, self.func
    136         try:
--> 137             return next(self.gen)
    138         except StopIteration:
    139             raise RuntimeError("generator didn't yield") from None

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in begin(self)
   3255 
   3256         """  # noqa: E501
-> 3257         with self.connect() as conn:
   3258             with conn.begin():
   3259                 yield conn

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in connect(self)
   3291         """
   3292 
-> 3293         return self._connection_cls(self)
   3294 
   3295     def raw_connection(self) -> PoolProxiedConnection:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in __init__(self, engine, connection, _has_events, _allow_revalidate, _allow_autobegin)
    143                 self._dbapi_connection = engine.raw_connection()
    144             except dialect.loaded_dbapi.Error as err:
--> 145                 Connection._handle_dbapi_exception_noconnection(
    146                     err, dialect, engine
    147                 )

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception_noconnection(cls, e, dialect, engine, is_disconnect, invalidate_pool_on_disconnect, is_pre_ping)
   2446         elif should_wrap:
   2447             assert sqlalchemy_exception is not None
-> 2448             raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
   2449         else:
   2450             assert exc_info[1] is not None

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in __init__(self, engine, connection, _has_events, _allow_revalidate, _allow_autobegin)
    141         if connection is None:
    142             try:
--> 143                 self._dbapi_connection = engine.raw_connection()
    144             except dialect.loaded_dbapi.Error as err:
    145                 Connection._handle_dbapi_exception_noconnection(

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in raw_connection(self)
   3315 
   3316         """
-> 3317         return self.pool.connect()
   3318 
   3319 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in connect(self)
    446 
    447         """
--> 448         return _ConnectionFairy._checkout(self)
    449 
    450     def _return_conn(self, record: ConnectionPoolEntry) -> None:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy)
   1270     ) -> _ConnectionFairy:
   1271         if not fairy:
-> 1272             fairy = _ConnectionRecord.checkout(pool)
   1273 
   1274             if threadconns is not None:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in checkout(cls, pool)
    710             rec = cast(_ConnectionRecord, pool._do_get())
    711         else:
--> 712             rec = pool._do_get()
    713 
    714         try:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
    175                 return self._create_connection()
    176             except:
--> 177                 with util.safe_reraise():
    178                     self._dec_overflow()
    179                 raise

/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
    119             assert exc_value is not None
    120             self._exc_info = None  # remove potential circular references
--> 121             raise exc_value.with_traceback(exc_tb)
    122         else:
    123             self._exc_info = None  # remove potential circular references

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
    173         if self._inc_overflow():
    174             try:
--> 175                 return self._create_connection()
    176             except:
    177                 with util.safe_reraise():

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _create_connection(self)
    387         """Called by subclasses to create a new ConnectionRecord."""
    388 
--> 389         return _ConnectionRecord(self)
    390 
    391     def _invalidate(

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect)
    672         self.__pool = pool
    673         if connect:
--> 674             self.__connect()
    675         self.finalize_callback = deque()
    676 

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
    898             self.fresh = True
    899         except BaseException as e:
--> 900             with util.safe_reraise():
    901                 pool.logger.debug("Error on connect(): %s", e)
    902         else:

/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
    119             assert exc_value is not None
    120             self._exc_info = None  # remove potential circular references
--> 121             raise exc_value.with_traceback(exc_tb)
    122         else:
    123             self._exc_info = None  # remove potential circular references

/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
    894         try:
    895             self.starttime = time.time()
--> 896             self.dbapi_connection = connection = pool._invoke_creator(self)
    897             pool.logger.debug("Created new connection %r", connection)
    898             self.fresh = True

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/create.py in connect(connection_record)
    665                 return dialect.connect(*mutable_cargs, **mutable_cparams)
    666             else:
--> 667                 return dialect.connect(*cargs_tup, **cparams)
    668 
    669         creator = pop_kwarg("creator", connect)

/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams)
    628     def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
    629         # inherits the docstring from interfaces.Dialect.connect
--> 630         return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
    631 
    632     def create_connect_args(self, url: URL) -> ConnectArgsType:

/usr/local/lib/python3.12/dist-packages/psycopg2/__init__.py in connect(dsn, connection_factory, cursor_factory, **kwargs)
    120 
    121     dsn = _ext.make_dsn(dsn, **kwargs)
--> 122     conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
    123     if cursor_factory is not None:
    124         conn.cursor_factory = cursor_factory

OperationalError: (psycopg2.OperationalError) connection to server at "172.27.112.1", port 5432 failed: Connection timed out
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/20/e3q8)

9. Verification — Read Back and Inspect

[ ]
query = f"SELECT * FROM {SCHEMA}.{TABLE} ORDER BY datetime ASC;"
df_db = pd.read_sql(text(query), con=engine.connect(), parse_dates=["datetime"])

print("--- PostgreSQL Read-Back ---")
display(df_db.head())

print("\n--- Schema Summary ---")
df_db.info()
[ ]