Store OHLCV in PostgreSQL
Design and populate a PostgreSQL schema for market data — table structure, indexing strategy, and bulk insert patterns.
Data Storage Framework — PostgreSQL
This notebook defines a standardized protocol for persisting cleaned OHLCV data into a PostgreSQL relational database. It covers schema creation, datetime-indexed table definition, insertion, and read-back verification using SQLAlchemy and pandas.
1. Dependency Installation
!pip install pandas sqlalchemy psycopg2-binaryRequirement already satisfied: pandas in /usr/local/lib/python3.12/dist-packages (2.2.2) Requirement already satisfied: sqlalchemy in /usr/local/lib/python3.12/dist-packages (2.0.49) Requirement already satisfied: psycopg2-binary in /usr/local/lib/python3.12/dist-packages (2.9.12) Requirement already satisfied: numpy>=1.26.0 in /usr/local/lib/python3.12/dist-packages (from pandas) (2.0.2) Requirement already satisfied: python-dateutil>=2.8.2 in /usr/local/lib/python3.12/dist-packages (from pandas) (2.9.0.post0) Requirement already satisfied: pytz>=2020.1 in /usr/local/lib/python3.12/dist-packages (from pandas) (2025.2) Requirement already satisfied: tzdata>=2022.7 in /usr/local/lib/python3.12/dist-packages (from pandas) (2026.1) Requirement already satisfied: greenlet>=1 in /usr/local/lib/python3.12/dist-packages (from sqlalchemy) (3.4.0) Requirement already satisfied: typing-extensions>=4.6.0 in /usr/local/lib/python3.12/dist-packages (from sqlalchemy) (4.15.0) Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.12/dist-packages (from python-dateutil>=2.8.2->pandas) (1.17.0)
2. Library Imports
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
from sqlalchemy import create_engine, text3. What Is PostgreSQL and Why Use It for OHLCV Data?
PostgreSQL is an open-source relational database management system
(RDBMS). Data is organized into tables with strictly typed columns and
rows. PostgreSQL enforces schema constraints (e.g., NOT NULL,
PRIMARY KEY) at the database level, preventing corrupt or duplicate
data from being stored regardless of what the application layer sends.
For OHLCV data, PostgreSQL is appropriate when:
- Data from multiple exchanges and symbols must be queried together with SQL
JOINoperations. - Strict data integrity is required — the
PRIMARY KEYondatetimeguarantees no duplicate candles exist. - The dataset is moderate in size (millions of rows) and does not require the specialized time-series optimizations of TimescaleDB.
SQLAlchemy is a Python library that provides a unified interface to relational databases. Rather than writing raw database connection strings and cursor management, SQLAlchemy handles connection pooling, transaction management, and dialect translation — the same Python code works against PostgreSQL, SQLite, or MySQL by changing the connection URL.
4. Dummy Dataset
raw_data = {
"datetime": [
"2024-01-01 00:00:00+00:00",
"2024-01-01 00:01:00+00:00",
"2024-01-01 00:02:00+00:00",
"2024-01-01 00:03:00+00:00",
"2024-01-01 00:04:00+00:00",
],
"open": [42100.0, 42200.0, 42150.0, 42300.0, 42250.0],
"high": [42300.0, 42400.0, 42350.0, 42500.0, 42450.0],
"low": [41900.0, 42000.0, 41950.0, 42100.0, 42050.0],
"close": [42200.0, 42150.0, 42300.0, 42250.0, 42400.0],
"volume": [10.5, 8.2, 9.1, 11.3, 7.6 ],
}
df = pd.DataFrame(raw_data)
df["datetime"] = pd.to_datetime(df["datetime"], utc=True)
print("--- OHLCV DataFrame ---")
display(df)
df.info()--- OHLCV DataFrame ---
| datetime | open | high | low | close | volume | |
|---|---|---|---|---|---|---|
| 0 | 2024-01-01 00:00:00+00:00 | 42100.0 | 42300.0 | 41900.0 | 42200.0 | 10.5 |
| 1 | 2024-01-01 00:01:00+00:00 | 42200.0 | 42400.0 | 42000.0 | 42150.0 | 8.2 |
| 2 | 2024-01-01 00:02:00+00:00 | 42150.0 | 42350.0 | 41950.0 | 42300.0 | 9.1 |
| 3 | 2024-01-01 00:03:00+00:00 | 42300.0 | 42500.0 | 42100.0 | 42250.0 | 11.3 |
| 4 | 2024-01-01 00:04:00+00:00 | 42250.0 | 42450.0 | 42050.0 | 42400.0 | 7.6 |
<class 'pandas.core.frame.DataFrame'> RangeIndex: 5 entries, 0 to 4 Data columns (total 6 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 datetime 5 non-null datetime64[ns, UTC] 1 open 5 non-null float64 2 high 5 non-null float64 3 low 5 non-null float64 4 close 5 non-null float64 5 volume 5 non-null float64 dtypes: datetime64[ns, UTC](1), float64(5) memory usage: 372.0 bytes
5. Database Configuration
DB_HOST = "172.27.112.1"
DB_PORT = 5432
DB_NAME = "criptofocus_db"
DB_USER = "postgres"
DB_PASS = "arsalan"
SCHEMA = "data_ohlcv"
TABLE = "ohlcv_btcusdt_1m"
DB_URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(DB_URL, echo=False)6. Table Initialization Function
def initialize_table(engine, schema: str, table: str) -> None:
ddl = f"""
CREATE SCHEMA IF NOT EXISTS {schema};
CREATE TABLE IF NOT EXISTS {schema}.{table} (
datetime TIMESTAMPTZ NOT NULL,
open DOUBLE PRECISION NOT NULL,
high DOUBLE PRECISION NOT NULL,
low DOUBLE PRECISION NOT NULL,
close DOUBLE PRECISION NOT NULL,
volume DOUBLE PRECISION NOT NULL,
CONSTRAINT {table}_pkey PRIMARY KEY (datetime)
);
"""
with engine.begin() as conn:
conn.execute(text(ddl))
print(f"Table '{schema}.{table}' verified.")Code Logic
CREATE SCHEMA IF NOT EXISTS: Creates the target namespace only if absent — safe to re-execute.CREATE TABLE IF NOT EXISTS: Idempotent table definition, safe on repeated runs without dropping existing data.TIMESTAMPTZ: PostgreSQL timezone-aware timestamp. Stores UTC offset metadata and normalizes all inserts to UTC internally — equivalent to PythonDatetimeTZDtype[ns, UTC].DOUBLE PRECISION: 64-bit floating point, equivalent to Pythonfloat64. Preserves price precision for BTC values in the tens of thousands.PRIMARY KEY (datetime): Enforces row uniqueness ondatetimeat the database level. Any attempt to insert a duplicate candle timestamp raises anIntegrityErrorbefore the row reaches storage.engine.begin(): Opens a transactional connection. All DDL is committed atomically on context exit.
7. Save Function
def save_to_postgres(df: pd.DataFrame, engine, schema: str, table: str) -> int:
df = df.copy().set_index("datetime")
df.to_sql(
name = table,
con = engine,
schema = schema,
if_exists = "append",
index = True,
index_label = "datetime",
method = "multi",
)
print(f"Inserted {len(df)} row(s) → {schema}.{table}")
return len(df)Code Logic
df.set_index("datetime"): Promotesdatetimeto the DataFrame index soto_sqlwrites it as the named primary key column rather than generating a separate integer index.if_exists="append": Appends rows without truncating prior data. Duplicatedatetimevalues are rejected by thePRIMARY KEYconstraint at the database level.method="multi": Emits a single multi-rowINSERTper batch, significantly reducing round-trip overhead compared to one statement per row.
8. Execution
initialize_table(engine, SCHEMA, TABLE)
rows_inserted = save_to_postgres(df, engine, SCHEMA, TABLE)---------------------------------------------------------------------------
OperationalError Traceback (most recent call last)
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in __init__(self, engine, connection, _has_events, _allow_revalidate, _allow_autobegin)
142 try:
--> 143 self._dbapi_connection = engine.raw_connection()
144 except dialect.loaded_dbapi.Error as err:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in raw_connection(self)
3316 """
-> 3317 return self.pool.connect()
3318
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in connect(self)
447 """
--> 448 return _ConnectionFairy._checkout(self)
449
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy)
1271 if not fairy:
-> 1272 fairy = _ConnectionRecord.checkout(pool)
1273
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in checkout(cls, pool)
711 else:
--> 712 rec = pool._do_get()
713
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
176 except:
--> 177 with util.safe_reraise():
178 self._dec_overflow()
/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
120 self._exc_info = None # remove potential circular references
--> 121 raise exc_value.with_traceback(exc_tb)
122 else:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
174 try:
--> 175 return self._create_connection()
176 except:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _create_connection(self)
388
--> 389 return _ConnectionRecord(self)
390
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect)
673 if connect:
--> 674 self.__connect()
675 self.finalize_callback = deque()
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
899 except BaseException as e:
--> 900 with util.safe_reraise():
901 pool.logger.debug("Error on connect(): %s", e)
/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
120 self._exc_info = None # remove potential circular references
--> 121 raise exc_value.with_traceback(exc_tb)
122 else:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
895 self.starttime = time.time()
--> 896 self.dbapi_connection = connection = pool._invoke_creator(self)
897 pool.logger.debug("Created new connection %r", connection)
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/create.py in connect(connection_record)
666 else:
--> 667 return dialect.connect(*cargs_tup, **cparams)
668
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams)
629 # inherits the docstring from interfaces.Dialect.connect
--> 630 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
631
/usr/local/lib/python3.12/dist-packages/psycopg2/__init__.py in connect(dsn, connection_factory, cursor_factory, **kwargs)
121 dsn = _ext.make_dsn(dsn, **kwargs)
--> 122 conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
123 if cursor_factory is not None:
OperationalError: connection to server at "172.27.112.1", port 5432 failed: Connection timed out
Is the server running on that host and accepting TCP/IP connections?
The above exception was the direct cause of the following exception:
OperationalError Traceback (most recent call last)
/tmp/ipykernel_2991/3768471397.py in <cell line: 0>()
----> 1 initialize_table(engine, SCHEMA, TABLE)
2 rows_inserted = save_to_postgres(df, engine, SCHEMA, TABLE)
/tmp/ipykernel_2991/1711263443.py in initialize_table(engine, schema, table)
13 );
14 """
---> 15 with engine.begin() as conn:
16 conn.execute(text(ddl))
17
/usr/lib/python3.12/contextlib.py in __enter__(self)
135 del self.args, self.kwds, self.func
136 try:
--> 137 return next(self.gen)
138 except StopIteration:
139 raise RuntimeError("generator didn't yield") from None
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in begin(self)
3255
3256 """ # noqa: E501
-> 3257 with self.connect() as conn:
3258 with conn.begin():
3259 yield conn
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in connect(self)
3291 """
3292
-> 3293 return self._connection_cls(self)
3294
3295 def raw_connection(self) -> PoolProxiedConnection:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in __init__(self, engine, connection, _has_events, _allow_revalidate, _allow_autobegin)
143 self._dbapi_connection = engine.raw_connection()
144 except dialect.loaded_dbapi.Error as err:
--> 145 Connection._handle_dbapi_exception_noconnection(
146 err, dialect, engine
147 )
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception_noconnection(cls, e, dialect, engine, is_disconnect, invalidate_pool_on_disconnect, is_pre_ping)
2446 elif should_wrap:
2447 assert sqlalchemy_exception is not None
-> 2448 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2449 else:
2450 assert exc_info[1] is not None
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in __init__(self, engine, connection, _has_events, _allow_revalidate, _allow_autobegin)
141 if connection is None:
142 try:
--> 143 self._dbapi_connection = engine.raw_connection()
144 except dialect.loaded_dbapi.Error as err:
145 Connection._handle_dbapi_exception_noconnection(
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/base.py in raw_connection(self)
3315
3316 """
-> 3317 return self.pool.connect()
3318
3319
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in connect(self)
446
447 """
--> 448 return _ConnectionFairy._checkout(self)
449
450 def _return_conn(self, record: ConnectionPoolEntry) -> None:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy)
1270 ) -> _ConnectionFairy:
1271 if not fairy:
-> 1272 fairy = _ConnectionRecord.checkout(pool)
1273
1274 if threadconns is not None:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in checkout(cls, pool)
710 rec = cast(_ConnectionRecord, pool._do_get())
711 else:
--> 712 rec = pool._do_get()
713
714 try:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
175 return self._create_connection()
176 except:
--> 177 with util.safe_reraise():
178 self._dec_overflow()
179 raise
/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
119 assert exc_value is not None
120 self._exc_info = None # remove potential circular references
--> 121 raise exc_value.with_traceback(exc_tb)
122 else:
123 self._exc_info = None # remove potential circular references
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/impl.py in _do_get(self)
173 if self._inc_overflow():
174 try:
--> 175 return self._create_connection()
176 except:
177 with util.safe_reraise():
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in _create_connection(self)
387 """Called by subclasses to create a new ConnectionRecord."""
388
--> 389 return _ConnectionRecord(self)
390
391 def _invalidate(
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect)
672 self.__pool = pool
673 if connect:
--> 674 self.__connect()
675 self.finalize_callback = deque()
676
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
898 self.fresh = True
899 except BaseException as e:
--> 900 with util.safe_reraise():
901 pool.logger.debug("Error on connect(): %s", e)
902 else:
/usr/local/lib/python3.12/dist-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback)
119 assert exc_value is not None
120 self._exc_info = None # remove potential circular references
--> 121 raise exc_value.with_traceback(exc_tb)
122 else:
123 self._exc_info = None # remove potential circular references
/usr/local/lib/python3.12/dist-packages/sqlalchemy/pool/base.py in __connect(self)
894 try:
895 self.starttime = time.time()
--> 896 self.dbapi_connection = connection = pool._invoke_creator(self)
897 pool.logger.debug("Created new connection %r", connection)
898 self.fresh = True
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/create.py in connect(connection_record)
665 return dialect.connect(*mutable_cargs, **mutable_cparams)
666 else:
--> 667 return dialect.connect(*cargs_tup, **cparams)
668
669 creator = pop_kwarg("creator", connect)
/usr/local/lib/python3.12/dist-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams)
628 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
629 # inherits the docstring from interfaces.Dialect.connect
--> 630 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
631
632 def create_connect_args(self, url: URL) -> ConnectArgsType:
/usr/local/lib/python3.12/dist-packages/psycopg2/__init__.py in connect(dsn, connection_factory, cursor_factory, **kwargs)
120
121 dsn = _ext.make_dsn(dsn, **kwargs)
--> 122 conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
123 if cursor_factory is not None:
124 conn.cursor_factory = cursor_factory
OperationalError: (psycopg2.OperationalError) connection to server at "172.27.112.1", port 5432 failed: Connection timed out
Is the server running on that host and accepting TCP/IP connections?
(Background on this error at: https://sqlalche.me/e/20/e3q8)9. Verification — Read Back and Inspect
query = f"SELECT * FROM {SCHEMA}.{TABLE} ORDER BY datetime ASC;"
df_db = pd.read_sql(text(query), con=engine.connect(), parse_dates=["datetime"])
print("--- PostgreSQL Read-Back ---")
display(df_db.head())
print("\n--- Schema Summary ---")
df_db.info()