Abstract#
This is a reading, translation, paraphrase, and summary of a design doc. Last week, I spent some time carefully studying and restructuring the PG database connection pool in the project. Below, I will summarize the key points, outline my further restructuring process, and share my learning outcomes during this process.
Original link:
https://www.psycopg.org/articles/2021/01/17/pool-design/
First, the original author summarized the existing connection pool of Psycopg2, which has the following defects:
- No support for context managers
- The connection status in the pool is unknown before the client uses the connection
- If the pool's min_size is reached and all connections are in use, newly created connections will be discarded after use instead of being reused by the client that needs them.
- When a client requests more connections than max_size at once, a connection pool error will be received.
The author referenced a large number of other connection pool implementations, such as Java's HikariCP, and redesigned and implemented a new connection pool for Psycopg3.
Client Side#
Context Manager#
Support for context managers has been added.
with pool.connection() as conn:
cur = conn.cursor()
cur.execute("...")
consume_data(cur.fetchall())
The Connection.execute() method has been rewritten:
with pool.connection() as conn:
conn.execute(DML_QUERY)
Asynchronous Operations and Blocking Behavior#
When an exhausted synchronous connection pool receives a request for a new connection, the correct response should be to block until a new connection is created and available, or to raise an error after a timeout. This can lead to a problem where, during a spike in requests, the database may face a large number of rollback operations, potentially accompanied by performance degradation. The following code:
async def worker(conn, work_for_sec):
await conn.execute("select pg_sleep(%s)", (work_for_sec,))
pool = psycopg3.AsyncPool(maxconn=4, connection_timeout_sec=1.0)
for i in range(8):
async with pool.connection():
create_task(worker(conn, work_for_sec=0.5))
demonstrates the version after adding asynchronous support with asyncio. Meanwhile, the synchronous version of the connection pool should also be easily patched by Eventlet / gevent to become an asynchronous program.
Note: The latest version of Psycopg3 currently does not perfectly support this; see the github Issue for details.
Should Exclusive Connections Exist?#
The author also explored whether connection pools should manage exclusive connections, such as supporting the following example code:
def listen_notifications():
with pool.exclusive() as conn:
conn.autocommit = True
for notify in conn.cursor().notifies():
handle_notification(notify)
However, since the connection pool already supports synchronously accepting connection parameters at creation, this point seems redundant. Users needing exclusive connection pools can operate as follows:
pool = Pool(maxconn=config.pool_maxconn, args=(config.db_dsn,))
def listen_notifications():
with psycopg3.connect(*pool.args, **pool.kwargs):
conn.autocommit = True
for notify in conn.cursor().notifies():
handle_notification(notify)
Let's take a break and then look at the internal implementation part.
Database Side Implementation#
Connection Pool Worker Threads#
The author suggested that to improve the performance of a database connection pool, shorten response times, and not burden other processes, the following points need to be considered:
- How to handle potential request spikes?
- How to handle potential low periods? Should min_connection be reduced?
- How to determine if there are available connections in the current connection pool?
A simple and clear approach is polling; if there are no available connections, create a new one and add it to the pool.
class Pool:
def getconn(self):
conn = self._get_a_connection_from_the_pool()
if conn:
return conn
else:
conn = psycopg3.connect(*self.args, **self.kwargs)
self._put_it_in_the_pool_when_done(conn)
return conn
However, this is clearly not the optimal method, especially in scenarios where requests are frequent, usage is short, and connection times are long. Therefore, another common method is to delegate the task of maintaining the connection pool size to separate worker threads, such as:
class Pool:
def getconn(self):
if not self._pool():
self.worker.create_a_new_connection_and_put_it_in_the_pool()
return self.wait_for_a_connection_from_the_pool()
These worker threads can also handle various maintenance tasks, such as polling the connection pool to check the availability of connections, closing them when they time out, or replacing old connections in the pool with new ones, etc.
What Should Be the Strategy When Connections Fail?#
The mission of the connection pool should be to ensure that the connection between the application and the database is not interrupted. However, there are exceptional circumstances, such as when the database is no longer available, incorrect database connection configurations are used, or the database has changed location, etc. In these cases, application layer errors can be masked by the connection pool's continuous retry attempts, which is not a good phenomenon.
The author's personal project experience tells us that debugging a running system is complex and difficult, especially when the system does not have external logging or error messages; however, sometimes the runtime state of the system is also worth preserving and observing, rather than abruptly terminating the system's operation.
A wise solution should be to detect problems early and terminate the operation at the first startup; during subsequent operations, use exponential backoff to determine the retry interval, and terminate the entire system after reaching the preset retry count if the latest retry still fails.
Data Structure of the Connection Pool#
From the perspective of performance, load balancing, and connection failure checking, the internal data structure of the connection pool should meet the following characteristics:
- The time complexity of retrieving and returning connections should be as low as possible, ideally O(1).
- All connections in the pool should have the same or approximately the same probability of being obtained.
- The most recently used connections should be prioritized for reuse, as their failure probability is relatively low (due to mechanisms like automatic timeout).
In summary, both double-ended queues (deque) and tree arrays seem to meet the conditions. Considering that the retrieval and return time complexity of tree arrays is higher, and Python has native support for double-ended queues, the author believes that double-ended queues should be used, and the author also chose this data structure.
Proposed API#
This part is omitted; interested parties can derive the original connection pool themselves.
Key Configuration Parameters#
Same as above.
Support for Greenlet/Gevent Green Workers#
This part is also a key point because, in the target project, we use gevent as a monkey patching tool to modify Flask, dynamically changing synchronous methods to asynchronous Async methods. However, in Psycopg2 and the latest psycopg3.1.4, due to the introduction of c_wait, directly using gevent workers within gunicorn will not correctly patch the code, thus losing async features.
A simpler solution is to use versions of psycopg3.1.3 or earlier, or use psycogreen to bypass this issue. A more fundamental solution is to modify the module functions by replacing the wait method with a Pure Python version.
Here is part of the code of the new class I inherited, which mainly made the following functional changes:
- Added a connection timeout judgment mechanism.
- Rewrote the built-in context manager of Psycopg3.
- Added methods for converting between business-related config and URI.
Basically, it can be seen that the connection pool of Psycopg3 has already deeply conformed to usability, and the modifications on this basis are very simple.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: mesh
@version: 1.0.0
@license: MIT
@file: postgres.py
@time: 2023-07-31 16:34
"""
from __future__ import annotations
import logging
from contextlib import contextmanager
from typing import Any
from typing import Dict
from psycopg import Connection, OperationalError
from psycopg_pool import ConnectionPool
logger = logging.getLogger("gunicorn.error")
logger.setLevel(logging.INFO)
pg_config = {
"host": "127.0.0.1",
"port": 5432,
"username": "test",
"password": "123456",
"db_name": 'default'
}
default_pg_configs = {
'default': pg_config
}
class ConnectionPoolsManager:
"""
We encapsulate multiple connection pools into PostgreSQL DB Pools Manager object using psycopg3.1.3 module.
You can put it into Flask's app context and use @app.before_request and @app.teardown_request to register global
proxy for all routes and get sharing connections for all gunicorn workers like:
"""
def __init__(self, conn_info: str | Dict[str, Any] | None, *args, **kwargs):
self.__default_db_configs: dict[str, dict] = default_pg_configs
self.__pools: dict[str, ConnectionPool] = dict()
if isinstance(conn_info, str):
# conn_info = db_id
self._init_pool(conn_info)
elif isinstance(conn_info, dict):
# conn_info = {db_id: db_config}
for db_id in conn_info.keys():
self._init_pool(db_id)
else:
# create an empty PoolManager without any connection pools
logger.info(f"create an empty pool manager...")
def get_pool(self, db_id: str) -> ConnectionPool:
return self._get_pool_by_name(db_id)
def _get_pool_by_name(self, db_id: str) -> ConnectionPool:
pool = self.__pools.get(db_id)
if pool:
return pool
else:
# no pool for current db_name, trying to build a new one with default db configs
if db_id in self.__default_db_configs.keys():
# default config existed, create a new pool by this
self._init_pool(db_id)
return self.__pools.get(db_id)
else:
err_msg = (f'The provided db_name: {db_id} have no configs in default_pg_config, please create the '
f'pool manually or update config...')
logger.error(err_msg)
raise RuntimeError
def create_pool(self, db_id: str):
return self._init_pool(db_id)
def _init_pool(self, db_id: str, min_size=5, max_size=10, **kwargs):
config = self.__default_db_configs.get(db_id)
new_pool = ConnectionPool(
self.config_to_uri(config),
min_size=min_size,
max_size=max_size,
**kwargs,
)
self.__pools[db_id] = new_pool
@contextmanager
def connection(self, db_id: str) -> Connection:
# guarantee to return an alive connection
pool = self.get_pool(db_id)
conn = pool.getconn()
conn.autocommit = True
while not self._check_alive(conn):
# worst situation: loop entire pool: T(getconn) * pool_size
pool.putconn(conn)
conn = pool.getconn()
conn.autocommit = True
try:
with conn:
yield conn
finally:
pool.putconn(conn)
def status(self):
for db_id, pool in self.__pools.items():
logger.info(f"{db_id}: {pool.get_stats()}")
def _close_pools(self):
for pool in self.__pools.values():
pool.close()
def close(self):
return self._close_pools()
@staticmethod
def _check_alive(conn: Connection) -> bool:
try:
conn.execute("SELECT 1")
except OperationalError:
return False
return True
@staticmethod
def config_to_uri(config: dict[Any]) -> str:
host = config.get("host", "127.0.0.1")
port = config.get("port", 5432)
username = config.get("username", "root")
password = config.get("password", "root")
db_name = config.get("db_name", "default")
return f"postgresql://{username}:{password}@{host}:{port}/{db_name}?client_encoding=utf8"
@staticmethod
def uri_to_config(uri: str) -> dict[str, Any]:
# sample : "postgresql://test:[email protected]:5432/default"
_, username, password_host, port_db_name = uri.replace("//", " ").split(':')
password, host = password_host.split("@")
port, db_name = port_db_name.split("/")
logger.info(f"{username}_{password}_{host}_{port}_{db_name}")
return {
"host": host,
"port": port,
"user": username,
"password": password,
"dbname": db_name,
}
def __del__(self):
self.close()
def test_connection_manager():
pass
def main():
manager = ConnectionPoolsManager(None)
uri = "postgresql://test:[email protected]:5432/default"
config = manager.uri_to_config(uri)
if __name__ == '__main__':
main()