banner
DarkMesh

Dark Flame Master

DarkMesh的小屋

データベース接続プールの設計方法 —— psycopg3の接続プールを設計する

概要#

これは design doc の読解、翻訳、要約です。先週、プロジェクト内の PG データベース接続プールを注意深く研究し、再構築するために時間を費やしました。以下にその要点をまとめ、さらなる再構築プロセスについての私の考えを列挙し、この過程での学びを共有します。

原文リンク:
https://www.psycopg.org/articles/2021/01/17/pool-design/


まず、原作者は Psycopg2 の既存の接続プールに以下の欠陥があることをまとめました:

  • コンテキストマネージャのサポートがない
  • クライアントが接続を使用する前に、接続プール内の接続状態が不明である
  • プールの min_size に達し、すべての接続が占有状態の場合、新しく作成された接続は使用後に破棄され、必要なクライアントによって再利用されない
  • クライアントが一度に要求する接続数が max_size を超えると、接続プールエラーが発生する

作者は Java のHikariCPなど、他の多くの接続プールの実装を参考にし、新しい Psycopg3 の接続プールを再設計および実装しました。

クライアント部分#

コンテキストマネージャ#

コンテキストマネージャのサポートが追加されました

with pool.connection() as conn: 
    cur = conn.cursor()
    cur.execute("...")    
    consume_data(cur.fetchall())

Connection.execute()メソッドが再実装されました:

with pool.connection() as conn:    
    conn.execute(DML_QUERY)

非同期操作とブロッキング動作#

枯渇した同期接続プールが新しい接続のリクエストを受け取ったとき、正しい反応はブロックし、新しい接続が作成されて利用可能になるまで待つか、タイムアウト後にエラーを報告することです。これにより、リクエストのピーク時にデータベースが大量のロールバック操作に直面し、パフォーマンスの低下を伴う可能性があります。以下のコードは:

async def worker(conn, work_for_sec):
    await conn.execute("select pg_sleep(%s)", (work_for_sec,))
    pool = psycopg3.AsyncPool(maxconn=4, connection_timeout_sec=1.0)
    for i in range(8):    
        async with pool.connection():        
            create_task(worker(conn, work_for_sec=0.5))

非同期サポートasyncioを追加した後のバージョンを示しています。同時に、同期バージョンの接続プールも Eventlet /gevent によって簡単にパッチされ、非同期プログラムになるべきです。
注: 現在の最新の Psycopg3 バージョンはこの点で完全に互換性がないため、詳細はこのgithub Issueを参照してください。

排他性接続は存在すべきか?#

作者は接続プールが排他性接続を管理することを許可すべきかどうかを探討しました。例えば、以下のサンプルコードをサポートすることです:

def listen_notifications():    
    with pool.exclusive() as conn:        
        conn.autocommit = True        
        for notify in conn.cursor().notifies():            
            handle_notification(notify)

しかし、接続プールが作成時に接続パラメータを同期的に受け入れることをサポートしているため、上記の点は無駄のように思えます。排他性接続プールの利用者は、以下のように操作することができます:

pool = Pool(maxconn=config.pool_maxconn, args=(config.db_dsn,))
def listen_notifications():    
    with psycopg3.connect(*pool.args, **pool.kwargs):        
        conn.autocommit = True        
        for notify in conn.cursor().notifies():            
            handle_notification(notify)

少し休憩して、次に内部実装部分を見てみましょう。


データベース側の実装#

接続プールのワーカースレッド#

作者は、データベース接続プールのパフォーマンスを向上させ、応答時間を短縮し、他のプロセスに負担をかけないために、以下の点を重視する必要があると提案しています:

  • 可能なリクエストのピークをどのように処理するか?
  • 可能な低谷期をどのように処理するか、min_connection を下げる必要があるか?
  • 現在の接続プール内に利用可能な接続がまだあるかどうかをどのように判断するか?

シンプルで明確な方法はポーリングです。利用可能な接続がない場合は、新しい接続を作成してプールに追加します。

class Pool:    
    def getconn(self):        
        conn = self._get_a_connection_from_the_pool()        
        if conn:            
            return conn        
        else:            
          conn = psycopg3.connect(*self.args, **self.kwargs)           
        self._put_it_in_the_pool_when_done(conn)            
        return conn

しかし、これは明らかに最適な方法ではなく、リクエストが頻繁で、使用時間が短く、接続に時間がかかる場合には特にそうです。したがって、別の一般的な方法は、接続プールのサイズを維持する作業を分離されたワーカースレッドに任せることです。例えば:

class Pool:    
    def getconn(self):        
        if not self._pool():
          self.worker.create_a_new_connection_and_put_it_in_the_pool()
        return self.wait_for_a_connection_from_the_pool()

これらのワーカースレッドは、接続の可用性を確認するために接続プールをポーリングしたり、接続がタイムアウトしたときにそれらを閉じたり、プール内の古い接続を新しい接続で置き換えたりするなど、さまざまなメンテナンス作業を担う能力があります。

接続失敗時の対策は何か?#

接続プールの使命は、アプリケーションとデータベースの接続が中断の影響を受けないようにすることですが、特別な状況下での例外があります。例えば、データベースが利用できなくなったり、誤ったデータベース接続設定を使用したり、データベースの位置が変更されたりすることです。この場合、アプリケーション層のエラーは、接続プールによる中断のない再接続の試行によって隠されてしまい、これは良い現象ではありません。

作者の個人的なプロジェクト経験は、稼働中のシステムをデバッグすることが複雑で困難な作業であることを示しています。特に、そのシステムが外部にログを出力したり、エラーメッセージを表示したりしない場合はなおさらです。しかし、時にはシステムの実行時状態も保持し、観察する価値があり、粗暴にシステムの実行を終了すべきではありません。

賢明な解決策は、システムが最初に起動したときに問題を早期に発見し、早期に実行を終了することです。そして、その後の実行中に指数バックオフの方法を使用して再試行の時間間隔を決定し、設定された再試行回数に達し、最新の再試行がまだ失敗した場合にシステム全体を終了します。

接続プールのデータ構造#

パフォーマンス、負荷分散、接続失効チェックの観点から、接続プール内部のデータ構造は以下の特徴を満たすべきです:

  1. 接続の取得および返却の時間計算量はできるだけ低く、理想的には O (1) であるべきです。
  2. プール内のすべての接続は、同じまたはほぼ同じ取得確率を持つべきです。
  3. 最新の接続を優先的に再利用すべきです。なぜなら、これらの接続の失効可能性は相対的に低いためです(自動タイムアウトなどのメカニズムの影響を受ける)。

以上のことから、双方向キュー(deque)とセグメントツリーの 2 つのデータ構造が条件を満たしているようです。セグメントツリーの取得と返却の時間計算量が高く、Python がネイティブの双方向キューをサポートしていることを考慮すると、著者は双方向キューを使用すべきだと考え、作者もこのデータ構造を選択しました。

提案された API#

ここでは省略します。必要な方は元の接続プールを派生させてください。

主要な構成パラメータ#

同上

greenlet/gevent の green worker のサポート#

この部分も重要です。なぜなら、ターゲットプロジェクトでは、Flask を対象にして同期メソッドを動的に非同期 Async メソッドに変更するために gevent を monkey patch ツールとして使用しているからです。しかし、Psycopg2 および最新の psycopg3.1.4 以降、c_wait の導入により、gunicorn 内部の gevent ワーカーを直接使用するとコードが正しくパッチされず、async 特性を失います。

簡単な解決策は、psycopg3.1.3 以前のバージョンを使用するか、psycogreenを使用してこの問題を回避することです。より根本的な解決策は、モジュール関数を自分で修正し、wait メソッドを Pure Python のバージョンに置き換えることです。

以下は私が継承した新しいクラスの一部コードで、主に以下の機能の変更を行いました:

  1. 接続タイムアウトの判断メカニズムを追加
  2. Psycopg3 に付属のコンテキストマネージャを再実装
  3. ビジネス面での config と URI の相互変換メソッドを追加

基本的に、Psycopg3 の接続プールはすでに深く利用可能であり、その上での変更は非常に簡単です。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@author: mesh
@version: 1.0.0
@license: MIT
@file: postgres.py
@time: 2023-07-31 16:34
"""
from __future__ import annotations

import logging
from contextlib import contextmanager
from typing import Any
from typing import Dict

from psycopg import Connection, OperationalError
from psycopg_pool import ConnectionPool

logger = logging.getLogger("gunicorn.error")
logger.setLevel(logging.INFO)

pg_config = {
    "host": "127.0.0.1",
    "port": 5432,
    "username": "test",
    "password": "123456",
    "db_name": 'default'
}

default_pg_configs = {
    'default': pg_config
}

class ConnectionPoolsManager:
    """
    psycopg3.1.3モジュールを使用して、複数の接続プールをPostgreSQL DBプールマネージャオブジェクトにカプセル化します。
    これをFlaskのアプリケーションコンテキストに入れ、@app.before_requestおよび@app.teardown_requestを使用して
    すべてのルートのためのグローバルプロキシを登録し、すべてのgunicornワーカーのために共有接続を取得します。
    """
    def __init__(self, conn_info: str | Dict[str, Any] | None, *args, **kwargs):
        self.__default_db_configs: dict[str, dict] = default_pg_configs
        self.__pools: dict[str, ConnectionPool] = dict()

        if isinstance(conn_info, str):
            # conn_info = db_id
            self._init_pool(conn_info)
        elif isinstance(conn_info, dict):
            # conn_info = {db_id: db_config}
            for db_id in conn_info.keys():
                self._init_pool(db_id)
        else:
            # 接続プールなしの空のPoolManagerを作成
            logger.info(f"空のプールマネージャを作成...")

    def get_pool(self, db_id: str) -> ConnectionPool:
        return self._get_pool_by_name(db_id)

    def _get_pool_by_name(self, db_id: str) -> ConnectionPool:
        pool = self.__pools.get(db_id)
        if pool:
            return pool
        else:
            # 現在のdb_nameのプールがない場合、デフォルトのdb設定で新しいプールを作成しようとします
            if db_id in self.__default_db_configs.keys():
                # デフォルト設定が存在する場合、これを使用して新しいプールを作成
                self._init_pool(db_id)
                return self.__pools.get(db_id)
            else:
                err_msg = (f'提供されたdb_name: {db_id}はdefault_pg_configに設定がありません。手動でプールを作成するか、設定を更新してください...')
                logger.error(err_msg)
                raise RuntimeError

    def create_pool(self, db_id: str):
        return self._init_pool(db_id)

    def _init_pool(self, db_id: str, min_size=5, max_size=10, **kwargs):
        config = self.__default_db_configs.get(db_id)
        new_pool = ConnectionPool(
            self.config_to_uri(config),
            min_size=min_size,
            max_size=max_size,
            **kwargs,
        )
        self.__pools[db_id] = new_pool

    @contextmanager
    def connection(self, db_id: str) -> Connection:
        # 生きた接続を返すことを保証
        pool = self.get_pool(db_id)
        conn = pool.getconn()
        conn.autocommit = True
        while not self._check_alive(conn):
            # 最悪の状況:プール全体をループ:T(getconn) * pool_size
            pool.putconn(conn)
            conn = pool.getconn()
            conn.autocommit = True
        try:
            with conn:
                yield conn
        finally:
            pool.putconn(conn)

    def status(self):
        for db_id, pool in self.__pools.items():
            logger.info(f"{db_id}: {pool.get_stats()}")

    def _close_pools(self):
        for pool in self.__pools.values():
            pool.close()

    def close(self):
        return self._close_pools()

    @staticmethod
    def _check_alive(conn: Connection) -> bool:
        try:
            conn.execute("SELECT 1")
        except OperationalError:
            return False
        return True

    @staticmethod
    def config_to_uri(config: dict[Any]) -> str:
        host = config.get("host", "127.0.0.1")
        port = config.get("port", 5432)
        username = config.get("username", "root")
        password = config.get("password", "root")
        db_name = config.get("db_name", "default")
        return f"postgresql://{username}:{password}@{host}:{port}/{db_name}?client_encoding=utf8"

    @staticmethod
    def uri_to_config(uri: str) -> dict[str, Any]:
        # サンプル : "postgresql://test:[email protected]:5432/default"
        _, username, password_host, port_db_name = uri.replace("//", " ").split(':')
        password, host = password_host.split("@")
        port, db_name = port_db_name.split("/")
        logger.info(f"{username}_{password}_{host}_{port}_{db_name}")
        return {
            "host": host,
            "port": port,
            "user": username,
            "password": password,
            "dbname": db_name,
        }

    def __del__(self):
        self.close()


def test_connection_manager():
    pass


def main():
    manager = ConnectionPoolsManager(None)
    uri = "postgresql://test:[email protected]:5432/default"
    config = manager.uri_to_config(uri)


if __name__ == '__main__':
    main()

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。