Welcome to pgcom’s documentation!¶
Communication manager for PostgreSQL database, provides a collection of convenience wrappers over Psycopg adapter to simplify the usage of basic Psycopg methods in conjunction with Pandas DataFrames.
Key Features¶
Reading from database table to Pandas DataFrame.
Writing from DataFrame to a table.
Adaptation between DataFrames and COPY FROM.
Methods to resolve conflicts in DataFrame before applying COPY FROM.
Tools for setting asynchronous communication with database using LISTEN and NOTIFY commands.
Installation¶
Pgcom supports Python 3.6 or newer.
To install the package, you can simply use pip:
$ pip install pgcom
or clone the repository:
$ git clone git@github.com:viktorsapozhok/pgcom.git
$ cd pgcom
$ pip install .
To install the package in development mode with the possibility to run tests in local environment, use following:
$ git clone git@github.com:viktorsapozhok/pgcom.git
$ cd pgcom
$ pip install --no-cache-dir --editable .[test]
Tutorial¶
Basic Usage¶
Here is an interactive session showing the basic commands usage.
>>> import pandas as pd
>>> from pgcom import Commuter
# create commuter
>>> commuter = Commuter(dbname="test", user="postgres", password="secret", host="localhost")
# execute a command: this creates a new table
>>> commuter.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")
# write from DataFrame to a table
>>> df = pd.DataFrame([[100, "abc"], [200, "abc'def"]], columns=["num", "data"])
>>> commuter.insert(table_name="test", data=df)
# read from table to a DataFrame
>>> commuter.select("SELECT * FROM test")
id num data
0 1 100 abc
1 2 200 abc'def
# pass data to fill a query placeholders
>>> commuter.select("SELECT * FROM test WHERE data = (%s)", ("abc'def",))
id num data
0 2 200 abc'def
Writing to a table with copy from¶
PostgreSQL COPY FROM command copies data from a file-system file to a table (appending the data to whatever is in the table already).
Commuter’s copy_from
method provides an adaptation
between Pandas DataFrame and COPY FROM, the DataFrame format however must be
compatible with database table (data types, columns order, etc).
>>> commuter.copy_from(table_name="test", data=df)
A slight adaptation of DataFrame to the table structure can be attained by setting
format_data
parameter to True
. This enables to use DataFrames with the incomplete
set of columns given in any order.
>>> df = pd.DataFrame([["abc", 100], ["abc'def", 200]], columns=["data", "num_2"])
# DataFrame has column "num_2" not presented in table
>>> commuter.copy_from("test", df)
pgcom.exc.CopyError: column "num_2" of relation "test" does not exist
>>> commuter.copy_from("test", df, format_data=True)
>>> commuter.select("SELECT * FROM test")
id num data
0 1 None abc
1 2 None abc'def
Upsert with copy from¶
If DataFrame contains rows conflicting with table constraints and you need to implement
UPSERT, you can specify
where
parameter of copy_from
method.
Then it removes rows from the table before applying COPY FROM.
On the other hand, if you want to sanitize DataFrame and remove conflicts from it rather than from the table,
you can use resolve_primary_conflicts
and resolve_foreign_conflicts
.
>>> commuter.execute("CREATE TABLE test (id integer PRIMARY KEY, num integer, data varchar);")
>>> df_1 = pd.DataFrame([[1, 100, "a"], [2, 200, "b"]], columns=["id", "num", "data"])
>>> commuter.copy_from("test", df_1)
# df_2 has primary key conflict
>>> df_2 = pd.DataFrame([[2, 201, "bb"], [3, 300, "c"]], columns=["id", "num", "data"])
>>> commuter.copy_from("test", df_2)
pgcom.exc.CopyError: duplicate key value violates unique constraint "test_pkey"
# remove all rows from test table where id >= 2
>>> commuter.copy_from("test", df_2, where="id >= 2")
>>> commuter.select("SELECT * FROM test")
id num data
0 1 100 a
1 2 201 bb
2 3 300 c
>>> df_3 = pd.DataFrame([[3, 301, "cc"], [4, 400, "d"]], columns=["id", "num", "data"])
# remove conflicts from the DataFrame
>>> commuter.resolve_primary_conflicts("test", df_3)
id num data
0 4 400 d
Note
Be careful when resolving conflicts on DataFrame. Since both methods query data from the table,
the whole table will be queried if you don’t specify where
parameter.
Encode categorical columns¶
If DataFrame contains a column with string categories which you want to place in a separate table
with a serial primary key. And you want to replace categories with the corresponding key value, to
minimize the original table size, you can use encode_category
method.
It implements writing of all the unique values in categorical column given by category_name
to the table given by parameter category_table
.
In the example below, we have a DataFrame with a categorical column city
. We store it in
a separate table called cities
. And replace column with the corresponding city_id
.
>>> df
city year
0 Berlin 2010
1 Berlin 2011
2 London 2015
3 Paris 2012
4 Berlin 2018
>>> commuter.execute("CREATE TABLE cities (city_id SERIAL PRIMARY KEY, city TEXT)")
>>> df = commuter.encode_category(
... data=df, category="city", key="city_id", category_table="cities")
>>> df
city year city_id
0 Berlin 2010 1
1 Berlin 2011 1
2 London 2015 2
3 Paris 2012 3
4 Berlin 2018 1
>>> commuter.select("SELECT * FROM cities")
city_id city
0 1 Berlin
1 2 London
2 3 Paris
When categories are presented by multiple columns, its suggested to use encode_composite_category
method. It implements writing of all the unique combinations given by multiple columns in
DataFrame to the table given by category_table
.
>>> df
city year country
0 Berlin 2010 Germany
1 Berlin 2011 None
2 London 2015 UK
3 Paris 2012 France
4 Berlin 2018 UK
>>> cmd = "CREATE TABLE cities (city_id SERIAL PRIMARY KEY, city TEXT, country TEXT)"
>>> commuter.execute(cmd)
>>> cats = {"city: city", "country": "country"}
>>> df = commuter.encode_composite_category(
... data=df, categories=cats, key="city_id",
... category_table="cities", na_value="NONE")
>>> df
city year country city_id
0 Berlin 2010 Germany 1
1 Berlin 2018 Germany 1
2 Berlin 2011 NONE 2
3 London 2015 UK 3
4 Paris 2012 France 4
>>> commuter.select("SELECT * FROM cities")
city_id city country
0 1 Berlin Germany
1 2 Berlin NONE
2 3 London UK
3 4 Paris France
Connection options¶
A connection pooling technique is used to maintain a “pool” of active database connections in memory which are reused across the requests.
Testing the connection for liveness is available by using pre_ping
argument. This feature
will normally emit “SELECT 1” statement on each request to the database. If an error is raised,
the pool will be immediately restarted.
>>> commuter = Commuter(pre_ping=True, **conn_args)
The exponential backoff strategy is used for reconnection. By default, it implements 3 reconnection attempts.
This can be changed by setting max_reconnects
argument.
>>> commuter = Commuter(pre_ping=True, max_reconnects=5, **conn_args)
Note
When creating a new instance of Commuter
, the connection pool
is created by calling make_pool
and the connection
is established. The typical usage of Commuter
is therefore once
per particular database, held globally for the lifetime of a single application process.
Warning
So far a simple connection pool is used and it can’t be shared across different threads.
Extras¶
Here is the use cases of other Commuter
methods.
Select data from the table and return a scalar value.
>>> commuter.select_one("SELECT MAX(num) FROM test")
300
Insert one row to the table passing values using key-value arguments.
>>> commuter.insert_row("test", id=5, num=500, data="abc'def")
When using a serial column to provide unique identifiers, it can be very handy to insert one row and return the serial ID assigned to it.
>>> row_id = commuter.insert_row("test", return_id="id", num=500, data="abc'def")
Insert rows using custom placeholders, e.g. to insert PostGIS data.
>>> commuter.insert("test", data,
... columns=["name", "geom"],
... placeholders=["%s", "ST_GeomFromText(%s, 4326)"])
Check if the table exists.
>>> commuter.is_table_exist("test")
True
Check if the specific entry exists in the table. It implements a simple query building a WHERE clause from kwargs.
# SELECT 1 FROM TABLE test WHERE id=5 AND num=500
>>> commuter.is_entry_exist("test", id=5, num=500)
True
Delete entry from the table, specifying a WHERE clause using kwargs.
# DELETE FROM TABLE test WHERE id=5 AND num=500
>>> commuter.delete_entry("test", id=5, num=500)
True
Return the number of active connections to the database.
>>> commuter.get_connections_count()
9
Listener¶
PostgreSQL provides tools for setting asynchronous interaction with database session using LISTEN and NOTIFY commands.
A client application registers as a listener on the notification channel with the LISTEN command (and can stop listening with the UNLISTEN command). When the command NOTIFY is executed, the application listening on the channel is notified. A payload can be passed to provide some extra data to the listener. This is commonly used when sending notifications that table rows have been modified.
Notifications are received after trigger is fired, the poll method can be used to check for the new notifications without wasting resources.
Methods create_notify_function
and
create_trigger
present basic query constructors,
which can be used to define a new trigger and a new function associated with this trigger.
Some custom definitions can be done using execute method.
Here is the example of simple application receiving notification when rows are inserted to the table. See API reference for more details
from pgcom import Listener
>>> listener = Listener(dbname="test", user="postgres", password="secret", host="localhost")
# create a function called by trigger, it generates a notification
# which is sending to test_channel
>>> listener.create_notify_function(func_name='notify_trigger', channel='test_channel')
# create a trigger executed AFTER INSERT STATEMENT
>>> listener.create_trigger(table_name='test', func_name='notify_trigger')
# register function callback activated on the notification
>>> def on_notify(payload):
... print("received notification")
# listening loop, callback is activated on every INSERT to "test" table
>>> listener.poll(channel='test_channel', on_notify=on_notify)
received notification
received notification
Note
Note that the payload is only available from PostgreSQL 9.0: notifications received from a previous version server will have the payload attribute set to the empty string.
df = commuter.encode_composite_category(data=df, categories={“city: city”, “country”: “country”}, key=”city_id”, category_table=”cities”)
API Reference¶
Connection handlers¶
|
Base class for all connectors. |
|
Setting a connection with database. |
Communication agents¶
|
Base class for all commuters. |
|
Communication agent. |
Asynchronous interaction¶
|
Listener on the notification channel. |
ChangeLog¶
0.2.9 (2022-04-04)¶
Fixed
encode_composite_category
for integer categories.
0.2.8 (2021-08-16)¶
Added
encode_composite_category
method.
0.2.7 (2021-06-28)¶
Fixed
copy_from
to make it compatible with psycopg2 2.9 release.
0.2.5 (2021-03-11)¶
0.2.4 (2021-02-28)¶
Fixed issue with custom placeholders in
insert
method
0.2.3 (2021-02-27)¶
Deprecated
schema
parameterAdded
is_entry_exist
methodAdded
delete_entry
methodAdded
encode_category
methodAdded
make_where
methodUpdated
insert
method to support customized placeholders
0.2.2 (2020-07-27)¶
Added adaptation for numpy
int64
andfloat64
.
0.2.1 (2020-07-13)¶
Added
BaseConnector
Added
BaseCommuter
0.2.0 (2020-07-04)¶
0.1.7 (2020-05-31)¶
Fixed #9
0.1.6 (2020-05-28)¶
Updated
_format_data
to fix text fields with comma
0.1.5 (2020-03-16)¶
Fixed data formatting on integer columns with missed values (#5)
0.1.4 (2020-01-21)¶
Changed
where
argument type inresolve_primary_conflicts
from positional to optionalChanged
where
argument type inresolve_foreign_conflicts
from positional to optionalFixed bug in copying from DataFrame with incomplete set of columns (#3)
Added new test
0.1.3 (2020-01-19)¶
0.1.2 (2020-01-16)¶
0.1.1 (2020-01-10)¶
Added
Listener
classAdded
fix_schema
decoratorAdded
select_one
methodAdded
where
argument toresolve_foreign_conflicts
methodAdded
where
argument tocopy_from
methodAdded
_table_columns
methodAdded
_primary_key
methodAdded
_foreign_key
methodMoved sql queries to queries.py
Deprecated
f_key
,filter_col
arguments ofresolve_foreign_conflicts
methodDeprecated
p_key
,filter_col
argument ofresolve_primary_conflicts
methodDeprecated
return_scalar
argument ofselect
methodDeprecated
get_columns
method
0.1.0 (2020-01-02)¶
Pre-release
MIT License (see LICENSE).