__all__ = ["Connector"]
from contextlib import contextmanager
import random
import time
from typing import Any, Iterator, Mapping, Sequence, Union
import psycopg2
from psycopg2 import pool
from .base import BaseConnector
QueryParams = Union[Sequence[Any], Mapping[str, Any]]
[docs]class Connector(BaseConnector):
"""Setting a connection with database.
Besides the basic connection parameters any other
connection parameter supported by
`psycopg2.connect <https://www.psycopg.org/docs/module.html>`_
can be passed as a keyword.
Args:
pool_size:
The maximum amount of connections the pool will support.
pre_ping:
If True, the pool will emit a "ping" on the connection to
test if the connection is alive. If not, the connection will
be reconnected.
max_reconnects:
The maximum amount of reconnects, defaults to 3.
"""
_pool: pool.SimpleConnectionPool
def __init__(
self,
pool_size: int = 20,
pre_ping: bool = False,
max_reconnects: int = 3,
**kwargs: str
) -> None:
super().__init__(**kwargs)
self.pool_size = pool_size
self.pre_ping = pre_ping
self.max_reconnects = max_reconnects
self._pool = self.make_pool()
[docs] @contextmanager
def open_connection(self) -> Iterator[psycopg2.connect]:
"""Generate a free connection from the pool.
If ``pre_ping`` is True, then the connection is tested
whether its alive or not. If not, then reconnect.
"""
conn = self._pool.getconn()
if self.pre_ping:
for n in range(self.max_reconnects):
if not self.ping(conn):
if n > 0:
time.sleep(self._back_off_time(n - 1))
self._pool = self.restart_pool()
conn = self._pool.getconn()
else:
break
try:
yield conn
finally:
self._pool.putconn(conn)
[docs] def restart_pool(self) -> pool.SimpleConnectionPool:
"""Close all the connections and create a new pool."""
self.close_all()
return self.make_pool()
[docs] def close_all(self) -> None:
"""Close all the connections handled by the pool."""
if not self._pool.closed:
self._pool.closeall()
[docs] def make_pool(self) -> pool.SimpleConnectionPool:
"""Create a connection pool.
A connection pool that can't be shared
across different threads.
"""
return pool.SimpleConnectionPool(
minconn=1, maxconn=self.pool_size, **self._kwargs
)
[docs] @staticmethod
def ping(conn: psycopg2.connect) -> bool:
"""Ping the connection for liveness.
Implements a ping ("SELECT 1") on the connection.
Return True if the connection is alive, otherwise False.
Args:
conn:
The connection object to ping.
"""
is_alive = False
with conn.cursor() as cur:
cur.execute("SELECT 1")
if cur.description is not None:
fetched = cur.fetchall()
try:
is_alive = fetched[0][0] == 1
except IndexError:
pass
return is_alive
@staticmethod
def _back_off_time(n: int) -> int:
return (2 ** n) + (random.randint(0, 1000) / 1000)