这是一篇design doc的阅读,翻译、转述与总结。上周花了一段时间来仔细研究并重构了项目中的PG数据库连接池,下面我将总结其中的重点、列举我对其进一步的重构过程,以及该过程中我的学习收获。
原文链接:
https://www.psycopg.org/articles/2021/01/17/pool-design/
首先原作者总结了Psycopg2的现存连接池存在以下缺陷:
作者参考了大量其他连接池的实现,例如Java的HikariCP, 并重新设计与实现了新的Psycopg3的连接池。
新增了上下文管理器的支持
with pool.connection() as conn:
cur = conn.cursor()
cur.execute("...")
consume_data(cur.fetchall())
重写了Connection.execute()方法:
with pool.connection() as conn:
conn.execute(DML_QUERY)
当一个耗竭的同步连接池接收到新连接的请求时,正确的反应应该是阻塞,直到新的连接被创捷且可用,或者超时后报错。这样会面临一个问题,当请求尖峰spike来临时,数据库可能会面临大量的rollback操作,并有可能伴随着性能降级。而下列代码:
async def worker(conn, work_for_sec):
await conn.execute("select pg_sleep(%s)", (work_for_sec,))
pool = psycopg3.AsyncPool(maxconn=4, connection_timeout_sec=1.0)
for i in range(8):
async with pool.connection():
create_task(worker(conn, work_for_sec=0.5))
展示了加入了异步支持asyncio 后的版本。与此同时,同步版本的连接池也应该可以轻松被Eventlet / gevent所patch而成为异步程序
备注: 目前最新的Psycopg3版本还不能完美兼容这一点,具体详见该github Issue
作者也探讨了是否应该允许连接池来管理排他性连接的问题,例如支持下面的示例代码:
def listen_notifications():
with pool.exclusive() as conn:
conn.autocommit = True
for notify in conn.cursor().notifies():
handle_notification(notify)
但是鉴于连接池已经支持在创建的时候同步接受连接参数,上面这一点似乎是多此一举的,需要排他性连接池的使用者完全可以按照如下示例操作:
pool = Pool(maxconn=config.pool_maxconn, args=(config.db_dsn,))
def listen_notifications():
with psycopg3.connect(*pool.args, **pool.kwargs):
conn.autocommit = True
for notify in conn.cursor().notifies():
handle_notification(notify)
让我们休息一下,接下来看内部实现部分
作者提出,如果需要提高一个数据库连接池的性能,缩短响应时间,且不拖累其他进程,那么以下几点需要着重考虑:
一个简单明了的做法是轮询,如果没有可用的连接就新建一个添加到池中。
class Pool:
def getconn(self):
conn = self._get_a_connection_from_the_pool()
if conn:
return conn
else:
conn = psycopg3.connect(*self.args, **self.kwargs)
self._put_it_in_the_pool_when_done(conn)
return conn
然而,这明显不是最优的办法,在请求频繁,使用时长短且连接耗时久的情形下更是如此。所以,另一个常用的办法是将维护连接池大小的工作交由分离的工作线程去做,例如:
class Pool:
def getconn(self):
if not self._pool():
self.worker.create_a_new_connection_and_put_it_in_the_pool()
return self.wait_for_a_connection_from_the_pool()
这些工作线程还有能力承担更多种类的维护工作,例如轮询连接池以检查连接的可用性,或是在连接超时的时候关闭它们,亦或者使用新的连接替换掉池中的旧连接等等。
连接池的使命应该是保证应用和数据库的连接不受中断影响,但是有特殊情形下的例外情况,例如数据库不再可用,使用了错误的数据库连接配置,亦或是数据库改变了位置等等。这时应用层的错误就会被不间断尝试重连的连接池所掩盖,而这并不是一种好的现象。
作者的个人项目经验告诉我们,调试一个正在运作的系统是复杂而困难的工作,尤其是当该系统不具备对外输出日志,或者错误提示的时候;但有些时候系统的运行时状态也是值得保留且观察的,而不应该粗暴地终止系统运行。
一种明智的解决方案应该是,在系统首次启动的时候,早发现问题,早终止运行;而在后续的运行过程中,使用指数回退的方式来确定重试的时间间隔,并且在达到预设好的重试次数后、且最新一次重试仍然失败时终止整个系统。
从性能,负载均衡和连接失效检查的角度考虑,连接池内部的数据结构应该满足如下特征:
综上所述,双向队列(deque)和树状数组这两种数据结构似乎均满足条件。考虑到树状数组的取出归还时间复杂度更高,且Python具备原生的双向队列支持,笔者认为应该使用双向队列,作者也是选择了该数据结构。
此处略过,需要的同学可以自行派生原版的连接池
同上
这部分也是一个重点,因为在目标项目中,我们使用gevent作为monkey path的工具来针对Flask进行修改,将同步方法动态修改为异步Async方法。然而在Psycopg2以及最新的psycopg3.1.4之后,由于c_wait的引入,直接使用例如gunicorn内部的gevent worker将不能正确patch代码,从而失去async特性。
简单一些的解决方案是使用psycopg3.1.3以前的版本,或者使用psycogreen绕过该问题。更根本的解决方案是自行修改模块函数,将wait方法替换为Pure Python的版本。
以下是我继承的新类的部分代码, 主要做了下列功能的改动:
基本上可以看到,Psycopg3的连接池已经深度符合可用,在此基础上面的改动很简单。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: mesh
@version: 1.0.0
@license: MIT
@file: postgres.py
@time: 2023-07-31 16:34
"""
from __future__ import annotations
import logging
from contextlib import contextmanager
from typing import Any
from typing import Dict
from psycopg import Connection, OperationalError
from psycopg_pool import ConnectionPool
logger = logging.getLogger("gunicorn.error")
logger.setLevel(logging.INFO)
pg_config = {
"host": "127.0.0.1",
"port": 5432,
"username": "test",
"password": "123456",
"db_name": 'default'
}
default_pg_configs = {
'default': pg_config
}
class ConnectionPoolsManager:
"""
we encapsulate multiple connection pools into PostgreSQL DB Pools Manager object using psycopg3.1.3 module.
You can put it into flask's app context and using @app.before_request and @app.teardown_request to register global
proxy for all routes and get sharing connections for all gunicorn workers like:
"""
def __init__(self, conn_info: str | Dict[str, Any] | None, *args, **kwargs):
self.__default_db_configs: dict[str, dict] = default_pg_configs
self.__pools: dict[str, ConnectionPool] = dict()
if isinstance(conn_info, str):
# conn_info = db_id
self._init_pool(conn_info)
elif isinstance(conn_info, dict):
# conn_info = {db_id: db_config}
for db_id in conn_info.keys():
self._init_pool(db_id)
else:
# create an empty PoolManager without any connection pools
logger.info(f"create an empty pool manager...")
def get_pool(self, db_id: str) -> ConnectionPool:
return self._get_pool_by_name(db_id)
def _get_pool_by_name(self, db_id: str) -> ConnectionPool:
pool = self.__pools.get(db_id)
if pool:
return pool
else:
# no pool for current db_name, trying to build a new one with default db configs
if db_id in self.__default_db_configs.keys():
# default config existed, create a new pool by this
self._init_pool(db_id)
return self.__pools.get(db_id)
else:
err_msg = (f'The provided db_name: {db_id} have no configs in default_pg_config, please create the '
f'pool manually or update config...')
logger.error(err_msg)
raise RuntimeError
def create_pool(self, db_id: str):
return self._init_pool(db_id)
def _init_pool(self, db_id: str, min_size=5, max_size=10, **kwargs):
config = self.__default_db_configs.get(db_id)
new_pool = ConnectionPool(
self.config_to_uri(config),
min_size=min_size,
max_size=max_size,
**kwargs,
)
self.__pools[db_id] = new_pool
@contextmanager
def connection(self, db_id: str) -> Connection:
# guarantee to return an alive connection
pool = self.get_pool(db_id)
conn = pool.getconn()
conn.autocommit = True
while not self._check_alive(conn):
# worst situation: loop entire pool: T(getconn) * pool_size
pool.putconn(conn)
conn = pool.getconn()
conn.autocommit = True
try:
with conn:
yield conn
finally:
pool.putconn(conn)
def status(self):
for db_id, pool in self.__pools.items():
logger.info(f"{db_id}: {pool.get_stats()}")
def _close_pools(self):
for pool in self.__pools.values():
pool.close()
def close(self):
return self._close_pools()
@staticmethod
def _check_alive(conn: Connection) -> bool:
try:
conn.execute("SELECT 1")
except OperationalError:
return False
return True
@staticmethod
def config_to_uri(config: dict[Any]) -> str:
host = config.get("host", "127.0.0.1")
port = config.get("port", 5432)
username = config.get("username", "root")
password = config.get("password", "root")
db_name = config.get("db_name", "default")
return f"postgresql://{username}:{password}@{host}:{port}/{db_name}?client_encoding=utf8"
@staticmethod
def uri_to_config(uri: str) -> dict[str, Any]:
# sample : "postgresql://test:[email protected]:5432/default"
_, username, password_host, port_db_name = uri.replace("//", " ").split(':')
password, host = password_host.split("@")
port, db_name = port_db_name.split("/")
logger.info(f"{username}_{password}_{host}_{port}_{db_name}")
return {
"host": host,
"port": port,
"user": username,
"password": password,
"dbname": db_name,
}
def __del__(self):
self.close()
def test_connection_manager():
pass
def main():
manager = ConnectionPoolsManager(None)
uri = "postgresql://test:[email protected]:5432/default"
config = manager.uri_to_config(uri)
if __name__ == '__main__':
main()