Source code for clickhouse_pool.pool

"""Connection pool for clickhouse_driver

Heavily inspired by psycopg2/lib/pool.py, this module implements a thread-safe
connection pool. Each connection is an instance of a clickhouse_driver client.
"""
import threading
from contextlib import contextmanager
from clickhouse_driver import Client
from typing import Generator


class ChPoolError(Exception):
    """A generic exception that may be raised by ChPool"""


class TooManyConnections(ChPoolError):
    """Raised when attempting to use more than connections_max clients."""


[docs]class ChPool(): """A ChPool is a pool of connections (Clients) to a ClickhouseServer. Attributes: connections_min (int): minimum number of connections to keep open connections_max (int): maximum number of connections allowed open closed (bool): if closed the pool has no connections and cannot be used """ # pylint: disable=too-many-instance-attributes def __init__(self, **kwargs): """Initialize the pool of clickhouse clients. Args: **kwargs: similar to clickhouse-driver, all settings available are documented `here <https://clickhouse.tech/docs/en/single/#settings>`_. """ self.connections_min = kwargs.pop("connections_min", 10) self.connections_max = kwargs.pop("connections_max", 20) self.connection_args = { "host": kwargs.pop("host", "localhost"), **kwargs } self.closed = False self._pool = [] self._used = {} # similar to psycopg2 pools, _rused is used for mapping instances of conn # to their respective keys in _used self._rused = {} self._keys = 0 self._lock = threading.Lock() for _ in range(self.connections_min): self._connect() def _connect(self, key: str = None) -> Client: """Create a new client and assign to a key.""" client = Client(**self.connection_args) if key is not None: self._used[key] = client self._rused[id(client)] = key else: self._pool.append(client) return client def _get_key(self): """Get an unused key.""" self._keys += 1 return self._keys
[docs] def pull(self, key: str = None) -> Client: """Get an available client from the pool. Args: key: If known, the key of the client you would like. Returns: A clickhouse-driver client. """ self._lock.acquire() try: if self.closed: raise ChPoolError("pool closed") if key is None: key = self._get_key() if key in self._used: return self._used[key] if self._pool: self._used[key] = client = self._pool.pop() self._rused[id(client)] = key return client if len(self._used) >= self.connections_max: raise TooManyConnections("too many connections") return self._connect(key) finally: self._lock.release()
[docs] def push(self, client: Client = None, key: str = None, close: bool = False): """Return a client to the pool for reuse. Args: client: The client to return. key: If known, the key of the client. close: Close the client instead of adding back to pool. """ self._lock.acquire() try: if self.closed: raise ChPoolError("pool closed") if key is None: key = self._rused.get(id(client)) if key is None: raise ChPoolError("trying to put unkeyed client") if len(self._pool) < self.connections_min and not close: # TODO: verify connection still valid if client.connection.connected: self._pool.append(client) else: client.disconnect() # ensure thread doesn't put connection back once the pool is closed if not self.closed or key in self._used: del self._used[key] del self._rused[id(client)] finally: self._lock.release()
[docs] def cleanup(self): """Close all open connections in the pool. This method loops through eveery client and calls disconnect. """ self._lock.acquire() try: if self.closed: raise ChPoolError("pool closed") for client in self._pool + list(self._used.values()): try: client.disconnect() # TODO: handle problems with disconnect except Exception: pass self.closed = True finally: self._lock.release()
[docs] @contextmanager def get_client(self, key: str = None) -> Generator[Client, None, None]: """A clean way to grab a client via a contextmanager. Args: key: If known, the key of the client to grab. Yields: Client: a clickhouse-driver client """ client = self.pull(key) try: yield client finally: self.push(client=client)