banner
DarkMesh

Dark Flame Master

DarkMesh的小屋

如何設計一個資料庫的連接池 —— Designing a connection pool for psycopg3

摘要#

這是一篇 design doc 的閱讀,翻譯、轉述與總結。上週花了一段時間來仔細研究並重構了專案中的 PG 資料庫連接池,下面我將總結其中的重點、列舉我對其進一步的重構過程,以及該過程中的我的學習收穫。

原文連結:
https://www.psycopg.org/articles/2021/01/17/pool-design/


首先原作者總結了 Psycopg2 的現存連接池存在以下缺陷:

  • 沒有上下文管理器支持
  • 客戶端在使用連接之前,連接池中的連接狀態是不可知的
  • 如果池子的 min_size 已經達到且連接全部為佔用狀態,那麼新創建的連接會在用後即被丟棄,而不是繼續被需要的客戶端原地復用。
  • 當一個客戶端一次性申請的連接數超過 max_size 的時候,會收到一個連接池錯誤

作者參考了大量其他連接池的實現,例如 Java 的HikariCP,並重新設計與實現了新的 Psycopg3 的連接池。

客戶端部分#

上下文管理器#

新增了上下文管理器的支持

with pool.connection() as conn: 
    cur = conn.cursor()
    cur.execute("...")    
    consume_data(cur.fetchall())

重寫了Connection.execute()方法:

with pool.connection() as conn:    
    conn.execute(DML_QUERY)

非同步操作和阻塞行為#

當一個耗竭的同步連接池接收到新連接的請求時,正確的反應應該是阻塞,直到新的連接被創建且可用,或者超時後報錯。這樣會面臨一個問題,當請求尖峰 spike 來臨時,資料庫可能會面臨大量的 rollback 操作,並有可能伴隨著性能降級。而下列代碼:

async def worker(conn, work_for_sec):
    await conn.execute("select pg_sleep(%s)", (work_for_sec,))
    pool = psycopg3.AsyncPool(maxconn=4, connection_timeout_sec=1.0)
    for i in range(8):    
        async with pool.connection():        
            create_task(worker(conn, work_for_sec=0.5))

展示了加入了非同步支持asyncio後的版本。與此同時,同步版本的連接池也應該可以輕鬆被 Eventlet /gevent 所 patch 而成為非同步程序
備註: 目前最新的 Psycopg3 版本還不能完美兼容這一點,具體詳見該github Issue

排他性連接是否應該存在?#

作者也探討了是否應該允許連接池來管理排他性連接的問題,例如支持下面的示例代碼:

def listen_notifications():    
    with pool.exclusive() as conn:        
        conn.autocommit = True        
        for notify in conn.cursor().notifies():            
            handle_notification(notify)

但是鑑於連接池已經支持在創建的時候同步接受連接參數,上面這一點似乎是多此一舉的,需要排他性連接池的使用者完全可以按照如下示例操作:

pool = Pool(maxconn=config.pool_maxconn, args=(config.db_dsn,))
def listen_notifications():    
    with psycopg3.connect(*pool.args, **pool.kwargs):        
        conn.autocommit = True        
        for notify in conn.cursor().notifies():            
            handle_notification(notify)

讓我們休息一下,接下來看內部實現部分


資料庫端實現#

連接池工作線程#

作者提出,如果需要提高一個資料庫連接池的性能,縮短響應時間,且不拖累其他進程,那麼以下幾點需要著重考慮:

  • 如何處理可能存在的請求尖峰 spike?
  • 如何處理可能存在的低谷期,是否需要降低 min_connection?
  • 如何判斷當前連接池內部是否還有可用的連接?

一個簡單明瞭的做法是輪詢,如果沒有可用的連接就新建一個添加到池中。

class Pool:    
    def getconn(self):        
        conn = self._get_a_connection_from_the_pool()        
        if conn:            
            return conn        
        else:            
          conn = psycopg3.connect(*self.args, **self.kwargs)           
        self._put_it_in_the_pool_when_done(conn)            
        return conn

然而,這明顯不是最優的辦法,在請求頻繁,使用時長短且連接耗時久的情形下更是如此。所以,另一個常用的辦法是將維護連接池大小的工作交由分離的工作線程去做,例如:

class Pool:    
    def getconn(self):        
        if not self._pool():
          self.worker.create_a_new_connection_and_put_it_in_the_pool()
        return self.wait_for_a_connection_from_the_pool()

這些工作線程還有能力承擔更多種類的維護工作,例如輪詢連接池以檢查連接的可用性,或是在連接超時的時候關閉它們,亦或者使用新的連接替換掉池中的舊連接等等。

連接失敗時的對策應該是什麼?#

連接池的使命應該是保證應用和資料庫的連接不受中斷影響,但是有特殊情形下的例外情況,例如資料庫不再可用,使用了錯誤的資料庫連接配置,亦或是資料庫改變了位置等等。這時應用層的錯誤就會被不間斷嘗試重連的連接池所掩蓋,而這並不是一種好的現象。

作者的個人專案經驗告訴我們,調試一個正在運作的系統是複雜而困難的工作,尤其是當該系統不具備對外輸出日誌,或者錯誤提示的時候;但有些時候系統的運行時狀態也是值得保留且觀察的,而不應該粗暴地終止系統運行。

一種明智的解決方案應該是,在系統首次啟動的時候,早發現問題,早終止運行;而在後續的運行過程中,使用指數回退的方式來確定重試的時間間隔,並且在達到預設好的重試次數後、且最新一次重試仍然失敗時終止整個系統。

連接池的資料結構#

從性能,負載均衡和連接失效檢查的角度考慮,連接池內部的資料結構應該滿足如下特徵:

  1. 取出及歸還連接的時間複雜度應盡量低,最好是 O (1)
  2. 所有池內的連接應該具備相同,或者近似相同的獲取概率
  3. 應該優先復用最新的連接,因為它們的失效可能性相對最低(有自動超時等機制影響)

綜上所述,雙向隊列 (deque) 和樹狀數組這兩種資料結構似乎均滿足條件。考慮到樹狀數組的取出歸還時間複雜度更高,且 Python 具備原生的雙向隊列支持,筆者認為應該使用雙向隊列,作者也是選擇了該資料結構。

Proposed API#

此處略過,需要的同學可以自行派生原版的連接池

Key configuration parameters#

同上

對於 greenlet/gevent 的 green worker 的支持#

這部分也是一個重點,因為在目標專案中,我們使用 gevent 作為 monkey path 的工具來針對 Flask 進行修改,將同步方法動態修改為非同步 Async 方法。然而在 Psycopg2 以及最新的 psycopg3.1.4 之後,由於 c_wait 的引入,直接使用例如 gunicorn 內部的 gevent worker 將不能正確 patch 代碼,從而失去 async 特性。

簡單一些的解決方案是使用 psycopg3.1.3 以前的版本,或者使用psycogreen繞過該問題。更根本的解決方案是自行修改模組函數,將 wait 方法替換為 Pure Python 的版本。

以下是我繼承的新類的部分代碼,主要做了下列功能的改動:

  1. 增加了連接超時的判斷機制
  2. 重寫了 Psycopg3 自帶的上下文管理器
  3. 增加了業務方面 config 和 URI 互轉的方法

基本上可以看到,Psycopg3 的連接池已經深度符合可用,在此基礎上面的改動很簡單。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@author: mesh
@version: 1.0.0
@license: MIT
@file: postgres.py
@time: 2023-07-31 16:34
"""
from __future__ import annotations

import logging
from contextlib import contextmanager
from typing import Any
from typing import Dict

from psycopg import Connection, OperationalError
from psycopg_pool import ConnectionPool

logger = logging.getLogger("gunicorn.error")
logger.setLevel(logging.INFO)

pg_config = {
    "host": "127.0.0.1",
    "port": 5432,
    "username": "test",
    "password": "123456",
    "db_name": 'default'
}

default_pg_configs = {
    'default': pg_config
}

class ConnectionPoolsManager:
    """
    we encapsulate multiple connection pools into PostgreSQL DB Pools Manager object using psycopg3.1.3 module.
    You can put it into flask's app context and using @app.before_request and @app.teardown_request to register global
    proxy for all routes and get sharing connections for all gunicorn workers like:
    """
    def __init__(self, conn_info: str | Dict[str, Any] | None, *args, **kwargs):
        self.__default_db_configs: dict[str, dict] = default_pg_configs
        self.__pools: dict[str, ConnectionPool] = dict()

        if isinstance(conn_info, str):
            # conn_info = db_id
            self._init_pool(conn_info)
        elif isinstance(conn_info, dict):
            # conn_info = {db_id: db_config}
            for db_id in conn_info.keys():
                self._init_pool(db_id)
        else:
            # create an empty PoolManager without any connection pools
            logger.info(f"create an empty pool manager...")

    def get_pool(self, db_id: str) -> ConnectionPool:
        return self._get_pool_by_name(db_id)

    def _get_pool_by_name(self, db_id: str) -> ConnectionPool:
        pool = self.__pools.get(db_id)
        if pool:
            return pool
        else:
            # no pool for current db_name, trying to build a new one with default db configs
            if db_id in self.__default_db_configs.keys():
                # default config existed, create a new pool by this
                self._init_pool(db_id)
                return self.__pools.get(db_id)
            else:
                err_msg = (f'The provided db_name: {db_id} have no configs in default_pg_config, please create the '
                           f'pool manually or update config...')
                logger.error(err_msg)
                raise RuntimeError

    def create_pool(self, db_id: str):
        return self._init_pool(db_id)

    def _init_pool(self, db_id: str, min_size=5, max_size=10, **kwargs):
        config = self.__default_db_configs.get(db_id)
        new_pool = ConnectionPool(
            self.config_to_uri(config),
            min_size=min_size,
            max_size=max_size,
            **kwargs,
        )
        self.__pools[db_id] = new_pool

    @contextmanager
    def connection(self, db_id: str) -> Connection:
        # guarantee to return an alive connection
        pool = self.get_pool(db_id)
        conn = pool.getconn()
        conn.autocommit = True
        while not self._check_alive(conn):
            # worst situation: loop entire pool: T(getconn) * pool_size
            pool.putconn(conn)
            conn = pool.getconn()
            conn.autocommit = True
        try:
            with conn:
                yield conn
        finally:
            pool.putconn(conn)

    def status(self):
        for db_id, pool in self.__pools.items():
            logger.info(f"{db_id}: {pool.get_stats()}")

    def _close_pools(self):
        for pool in self.__pools.values():
            pool.close()

    def close(self):
        return self._close_pools()

    @staticmethod
    def _check_alive(conn: Connection) -> bool:
        try:
            conn.execute("SELECT 1")
        except OperationalError:
            return False
        return True

    @staticmethod
    def config_to_uri(config: dict[Any]) -> str:
        host = config.get("host", "127.0.0.1")
        port = config.get("port", 5432)
        username = config.get("username", "root")
        password = config.get("password", "root")
        db_name = config.get("db_name", "default")
        return f"postgresql://{username}:{password}@{host}:{port}/{db_name}?client_encoding=utf8"

    @staticmethod
    def uri_to_config(uri: str) -> dict[str, Any]:
        # sample : "postgresql://test:[email protected]:5432/default"
        _, username, password_host, port_db_name = uri.replace("//", " ").split(':')
        password, host = password_host.split("@")
        port, db_name = port_db_name.split("/")
        logger.info(f"{username}_{password}_{host}_{port}_{db_name}")
        return {
            "host": host,
            "port": port,
            "user": username,
            "password": password,
            "dbname": db_name,
        }

    def __del__(self):
        self.close()


def test_connection_manager():
    pass


def main():
    manager = ConnectionPoolsManager(None)
    uri = "postgresql://test:[email protected]:5432/default"
    config = manager.uri_to_config(uri)


if __name__ == '__main__':
    main()

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。