banner
DarkMesh

Dark Flame Master

DarkMesh的小屋

如何设计一个数据库的连接池 —— Designing a connection pool for psycopg3

Abstract#

这是一篇 design doc 的阅读,翻译、转述与总结。上周花了一段时间来仔细研究并重构了项目中的 PG 数据库连接池,下面我将总结其中的重点、列举我对其进一步的重构过程,以及该过程中我的学习收获。

原文链接:
https://www.psycopg.org/articles/2021/01/17/pool-design/


首先原作者总结了 Psycopg2 的现存连接池存在以下缺陷:

  • 没有上下文管理器支持
  • 客户端在使用连接之前,连接池中的连接状态是不可知的
  • 如果池子的 min_size 已经达到且连接全部为占用状态,那么新创建的连接会在用后即被丢弃,而不是继续被需要的客户端原地复用。
  • 当一个客户端一次性申请的连接数超过 max_size 的时候,会收到一个连接池错误

作者参考了大量其他连接池的实现,例如 Java 的HikariCP, 并重新设计与实现了新的 Psycopg3 的连接池。

客户端部分#

上下文管理器#

新增了上下文管理器的支持

with pool.connection() as conn: 
    cur = conn.cursor()
    cur.execute("...")    
    consume_data(cur.fetchall())

重写了Connection.execute()方法:

with pool.connection() as conn:    
    conn.execute(DML_QUERY)

异步操作和阻塞行为#

当一个耗竭的同步连接池接收到新连接的请求时,正确的反应应该是阻塞,直到新的连接被创捷且可用,或者超时后报错。这样会面临一个问题,当请求尖峰 spike 来临时,数据库可能会面临大量的 rollback 操作,并有可能伴随着性能降级。而下列代码:

async def worker(conn, work_for_sec):
    await conn.execute("select pg_sleep(%s)", (work_for_sec,))
    pool = psycopg3.AsyncPool(maxconn=4, connection_timeout_sec=1.0)
    for i in range(8):    
        async with pool.connection():        
            create_task(worker(conn, work_for_sec=0.5))

展示了加入了异步支持asyncio 后的版本。与此同时,同步版本的连接池也应该可以轻松被 Eventlet /gevent 所 patch 而成为异步程序
备注: 目前最新的 Psycopg3 版本还不能完美兼容这一点,具体详见该github Issue

排他性连接是否应该存在?#

作者也探讨了是否应该允许连接池来管理排他性连接的问题,例如支持下面的示例代码:

def listen_notifications():    
    with pool.exclusive() as conn:        
        conn.autocommit = True        
        for notify in conn.cursor().notifies():            
            handle_notification(notify)

但是鉴于连接池已经支持在创建的时候同步接受连接参数,上面这一点似乎是多此一举的,需要排他性连接池的使用者完全可以按照如下示例操作:

pool = Pool(maxconn=config.pool_maxconn, args=(config.db_dsn,))
def listen_notifications():    
    with psycopg3.connect(*pool.args, **pool.kwargs):        
        conn.autocommit = True        
        for notify in conn.cursor().notifies():            
            handle_notification(notify)

让我们休息一下,接下来看内部实现部分


数据库端实现#

连接池工作线程#

作者提出,如果需要提高一个数据库连接池的性能,缩短响应时间,且不拖累其他进程,那么以下几点需要着重考虑:

  • 如何处理可能存在的请求尖峰 spike?
  • 如何处理可能存在的低谷期,是否需要降低 min_connection?
  • 如何判断当前连接池内部是否还有可用的连接?

一个简单明了的做法是轮询,如果没有可用的连接就新建一个添加到池中。

class Pool:    
    def getconn(self):        
        conn = self._get_a_connection_from_the_pool()        
        if conn:            
            return conn        
        else:            
          conn = psycopg3.connect(*self.args, **self.kwargs)           
        self._put_it_in_the_pool_when_done(conn)            
        return conn

然而,这明显不是最优的办法,在请求频繁,使用时长短且连接耗时久的情形下更是如此。所以,另一个常用的办法是将维护连接池大小的工作交由分离的工作线程去做,例如:

class Pool:    
    def getconn(self):        
        if not self._pool():
          self.worker.create_a_new_connection_and_put_it_in_the_pool()
        return self.wait_for_a_connection_from_the_pool()

这些工作线程还有能力承担更多种类的维护工作,例如轮询连接池以检查连接的可用性,或是在连接超时的时候关闭它们,亦或者使用新的连接替换掉池中的旧连接等等。

连接失败时的对策应该是什么?#

连接池的使命应该是保证应用和数据库的连接不受中断影响,但是有特殊情形下的例外情况,例如数据库不再可用,使用了错误的数据库连接配置,亦或是数据库改变了位置等等。这时应用层的错误就会被不间断尝试重连的连接池所掩盖,而这并不是一种好的现象。

作者的个人项目经验告诉我们,调试一个正在运作的系统是复杂而困难的工作,尤其是当该系统不具备对外输出日志,或者错误提示的时候;但有些时候系统的运行时状态也是值得保留且观察的,而不应该粗暴地终止系统运行。

一种明智的解决方案应该是,在系统首次启动的时候,早发现问题,早终止运行;而在后续的运行过程中,使用指数回退的方式来确定重试的时间间隔,并且在达到预设好的重试次数后、且最新一次重试仍然失败时终止整个系统。

连接池的数据结构#

从性能,负载均衡和连接失效检查的角度考虑,连接池内部的数据结构应该满足如下特征:

  1. 取出及归还连接的时间复杂度应尽量低,最好是 O (1)
  2. 所有池内的连接应该具备相同,或者近似相同的获取概率
  3. 应该优先复用最新的连接,因为他们的失效可能性相对最低(有自动超时等机制影响)

综上所述,双向队列 (deque) 和树状数组这两种数据结构似乎均满足条件。考虑到树状数组的取出归还时间复杂度更高,且 Python 具备原生的双向队列支持,笔者认为应该使用双向队列,作者也是选择了该数据结构。

Proposed API#

此处略过,需要的同学可以自行派生原版的连接池

Key configuration parameters#

同上

对于 greenlet/gevent 的 green worker 的支持#

这部分也是一个重点,因为在目标项目中,我们使用 gevent 作为 monkey path 的工具来针对 Flask 进行修改,将同步方法动态修改为异步 Async 方法。然而在 Psycopg2 以及最新的 psycopg3.1.4 之后,由于 c_wait 的引入,直接使用例如 gunicorn 内部的 gevent worker 将不能正确 patch 代码,从而失去 async 特性。

简单一些的解决方案是使用 psycopg3.1.3 以前的版本,或者使用psycogreen绕过该问题。更根本的解决方案是自行修改模块函数,将 wait 方法替换为 Pure Python 的版本。

以下是我继承的新类的部分代码,主要做了下列功能的改动:

  1. 增加了连接超时的判断机制
  2. 重写了 Psycopg3 自带的上下文管理器
  3. 增加了业务方面 config 和 URI 互转的方法

基本上可以看到,Psycopg3 的连接池已经深度符合可用,在此基础上面的改动很简单。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@author: mesh
@version: 1.0.0
@license: MIT
@file: postgres.py
@time: 2023-07-31 16:34
"""
from __future__ import annotations

import logging
from contextlib import contextmanager
from typing import Any
from typing import Dict

from psycopg import Connection, OperationalError
from psycopg_pool import ConnectionPool

logger = logging.getLogger("gunicorn.error")
logger.setLevel(logging.INFO)

pg_config = {
    "host": "127.0.0.1",
    "port": 5432,
    "username": "test",
    "password": "123456",
    "db_name": 'default'
}

default_pg_configs = {
    'default': pg_config
}

class ConnectionPoolsManager:
    """
    we encapsulate multiple connection pools into PostgreSQL DB Pools Manager object using psycopg3.1.3 module.
    You can put it into flask's app context and using @app.before_request and @app.teardown_request to register global
    proxy for all routes and get sharing connections for all gunicorn workers like:
    """
    def __init__(self, conn_info: str | Dict[str, Any] | None, *args, **kwargs):
        self.__default_db_configs: dict[str, dict] = default_pg_configs
        self.__pools: dict[str, ConnectionPool] = dict()

        if isinstance(conn_info, str):
            # conn_info = db_id
            self._init_pool(conn_info)
        elif isinstance(conn_info, dict):
            # conn_info = {db_id: db_config}
            for db_id in conn_info.keys():
                self._init_pool(db_id)
        else:
            # create an empty PoolManager without any connection pools
            logger.info(f"create an empty pool manager...")

    def get_pool(self, db_id: str) -> ConnectionPool:
        return self._get_pool_by_name(db_id)

    def _get_pool_by_name(self, db_id: str) -> ConnectionPool:
        pool = self.__pools.get(db_id)
        if pool:
            return pool
        else:
            # no pool for current db_name, trying to build a new one with default db configs
            if db_id in self.__default_db_configs.keys():
                # default config existed, create a new pool by this
                self._init_pool(db_id)
                return self.__pools.get(db_id)
            else:
                err_msg = (f'The provided db_name: {db_id} have no configs in default_pg_config, please create the '
                           f'pool manually or update config...')
                logger.error(err_msg)
                raise RuntimeError

    def create_pool(self, db_id: str):
        return self._init_pool(db_id)

    def _init_pool(self, db_id: str, min_size=5, max_size=10, **kwargs):
        config = self.__default_db_configs.get(db_id)
        new_pool = ConnectionPool(
            self.config_to_uri(config),
            min_size=min_size,
            max_size=max_size,
            **kwargs,
        )
        self.__pools[db_id] = new_pool

    @contextmanager
    def connection(self, db_id: str) -> Connection:
        # guarantee to return an alive connection
        pool = self.get_pool(db_id)
        conn = pool.getconn()
        conn.autocommit = True
        while not self._check_alive(conn):
            # worst situation: loop entire pool: T(getconn) * pool_size
            pool.putconn(conn)
            conn = pool.getconn()
            conn.autocommit = True
        try:
            with conn:
                yield conn
        finally:
            pool.putconn(conn)

    def status(self):
        for db_id, pool in self.__pools.items():
            logger.info(f"{db_id}: {pool.get_stats()}")

    def _close_pools(self):
        for pool in self.__pools.values():
            pool.close()

    def close(self):
        return self._close_pools()

    @staticmethod
    def _check_alive(conn: Connection) -> bool:
        try:
            conn.execute("SELECT 1")
        except OperationalError:
            return False
        return True

    @staticmethod
    def config_to_uri(config: dict[Any]) -> str:
        host = config.get("host", "127.0.0.1")
        port = config.get("port", 5432)
        username = config.get("username", "root")
        password = config.get("password", "root")
        db_name = config.get("db_name", "default")
        return f"postgresql://{username}:{password}@{host}:{port}/{db_name}?client_encoding=utf8"

    @staticmethod
    def uri_to_config(uri: str) -> dict[str, Any]:
        # sample : "postgresql://test:[email protected]:5432/default"
        _, username, password_host, port_db_name = uri.replace("//", " ").split(':')
        password, host = password_host.split("@")
        port, db_name = port_db_name.split("/")
        logger.info(f"{username}_{password}_{host}_{port}_{db_name}")
        return {
            "host": host,
            "port": port,
            "user": username,
            "password": password,
            "dbname": db_name,
        }

    def __del__(self):
        self.close()


def test_connection_manager():
    pass


def main():
    manager = ConnectionPoolsManager(None)
    uri = "postgresql://test:[email protected]:5432/default"
    config = manager.uri_to_config(uri)


if __name__ == '__main__':
    main()

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.