Tutorial¶
Basic Usage¶
Here is an interactive session showing the basic commands usage.
>>> import pandas as pd
>>> from pgcom import Commuter
# create commuter
>>> commuter = Commuter(dbname="test", user="postgres", password="secret", host="localhost")
# execute a command: this creates a new table
>>> commuter.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")
# write from DataFrame to a table
>>> df = pd.DataFrame([[100, "abc"], [200, "abc'def"]], columns=["num", "data"])
>>> commuter.insert(table_name="test", data=df)
# read from table to a DataFrame
>>> commuter.select("SELECT * FROM test")
id num data
0 1 100 abc
1 2 200 abc'def
# pass data to fill a query placeholders
>>> commuter.select("SELECT * FROM test WHERE data = (%s)", ("abc'def",))
id num data
0 2 200 abc'def
Writing to a table with copy from¶
PostgreSQL COPY FROM command copies data from a file-system file to a table (appending the data to whatever is in the table already).
Commuter’s copy_from
method provides an adaptation
between Pandas DataFrame and COPY FROM, the DataFrame format however must be
compatible with database table (data types, columns order, etc).
>>> commuter.copy_from(table_name="test", data=df)
A slight adaptation of DataFrame to the table structure can be attained by setting
format_data
parameter to True
. This enables to use DataFrames with the incomplete
set of columns given in any order.
>>> df = pd.DataFrame([["abc", 100], ["abc'def", 200]], columns=["data", "num_2"])
# DataFrame has column "num_2" not presented in table
>>> commuter.copy_from("test", df)
pgcom.exc.CopyError: column "num_2" of relation "test" does not exist
>>> commuter.copy_from("test", df, format_data=True)
>>> commuter.select("SELECT * FROM test")
id num data
0 1 None abc
1 2 None abc'def
Upsert with copy from¶
If DataFrame contains rows conflicting with table constraints and you need to implement
UPSERT, you can specify
where
parameter of copy_from
method.
Then it removes rows from the table before applying COPY FROM.
On the other hand, if you want to sanitize DataFrame and remove conflicts from it rather than from the table,
you can use resolve_primary_conflicts
and resolve_foreign_conflicts
.
>>> commuter.execute("CREATE TABLE test (id integer PRIMARY KEY, num integer, data varchar);")
>>> df_1 = pd.DataFrame([[1, 100, "a"], [2, 200, "b"]], columns=["id", "num", "data"])
>>> commuter.copy_from("test", df_1)
# df_2 has primary key conflict
>>> df_2 = pd.DataFrame([[2, 201, "bb"], [3, 300, "c"]], columns=["id", "num", "data"])
>>> commuter.copy_from("test", df_2)
pgcom.exc.CopyError: duplicate key value violates unique constraint "test_pkey"
# remove all rows from test table where id >= 2
>>> commuter.copy_from("test", df_2, where="id >= 2")
>>> commuter.select("SELECT * FROM test")
id num data
0 1 100 a
1 2 201 bb
2 3 300 c
>>> df_3 = pd.DataFrame([[3, 301, "cc"], [4, 400, "d"]], columns=["id", "num", "data"])
# remove conflicts from the DataFrame
>>> commuter.resolve_primary_conflicts("test", df_3)
id num data
0 4 400 d
Note
Be careful when resolving conflicts on DataFrame. Since both methods query data from the table,
the whole table will be queried if you don’t specify where
parameter.
Encode categorical columns¶
If DataFrame contains a column with string categories which you want to place in a separate table
with a serial primary key. And you want to replace categories with the corresponding key value, to
minimize the original table size, you can use encode_category
method.
It implements writing of all the unique values in categorical column given by category_name
to the table given by parameter category_table
.
In the example below, we have a DataFrame with a categorical column city
. We store it in
a separate table called cities
. And replace column with the corresponding city_id
.
>>> df
city year
0 Berlin 2010
1 Berlin 2011
2 London 2015
3 Paris 2012
4 Berlin 2018
>>> commuter.execute("CREATE TABLE cities (city_id SERIAL PRIMARY KEY, city TEXT)")
>>> df = commuter.encode_category(
... data=df, category="city", key="city_id", category_table="cities")
>>> df
city year city_id
0 Berlin 2010 1
1 Berlin 2011 1
2 London 2015 2
3 Paris 2012 3
4 Berlin 2018 1
>>> commuter.select("SELECT * FROM cities")
city_id city
0 1 Berlin
1 2 London
2 3 Paris
When categories are presented by multiple columns, its suggested to use encode_composite_category
method. It implements writing of all the unique combinations given by multiple columns in
DataFrame to the table given by category_table
.
>>> df
city year country
0 Berlin 2010 Germany
1 Berlin 2011 None
2 London 2015 UK
3 Paris 2012 France
4 Berlin 2018 UK
>>> cmd = "CREATE TABLE cities (city_id SERIAL PRIMARY KEY, city TEXT, country TEXT)"
>>> commuter.execute(cmd)
>>> cats = {"city: city", "country": "country"}
>>> df = commuter.encode_composite_category(
... data=df, categories=cats, key="city_id",
... category_table="cities", na_value="NONE")
>>> df
city year country city_id
0 Berlin 2010 Germany 1
1 Berlin 2018 Germany 1
2 Berlin 2011 NONE 2
3 London 2015 UK 3
4 Paris 2012 France 4
>>> commuter.select("SELECT * FROM cities")
city_id city country
0 1 Berlin Germany
1 2 Berlin NONE
2 3 London UK
3 4 Paris France
Connection options¶
A connection pooling technique is used to maintain a “pool” of active database connections in memory which are reused across the requests.
Testing the connection for liveness is available by using pre_ping
argument. This feature
will normally emit “SELECT 1” statement on each request to the database. If an error is raised,
the pool will be immediately restarted.
>>> commuter = Commuter(pre_ping=True, **conn_args)
The exponential backoff strategy is used for reconnection. By default, it implements 3 reconnection attempts.
This can be changed by setting max_reconnects
argument.
>>> commuter = Commuter(pre_ping=True, max_reconnects=5, **conn_args)
Note
When creating a new instance of Commuter
, the connection pool
is created by calling make_pool
and the connection
is established. The typical usage of Commuter
is therefore once
per particular database, held globally for the lifetime of a single application process.
Warning
So far a simple connection pool is used and it can’t be shared across different threads.
Extras¶
Here is the use cases of other Commuter
methods.
Select data from the table and return a scalar value.
>>> commuter.select_one("SELECT MAX(num) FROM test")
300
Insert one row to the table passing values using key-value arguments.
>>> commuter.insert_row("test", id=5, num=500, data="abc'def")
When using a serial column to provide unique identifiers, it can be very handy to insert one row and return the serial ID assigned to it.
>>> row_id = commuter.insert_row("test", return_id="id", num=500, data="abc'def")
Insert rows using custom placeholders, e.g. to insert PostGIS data.
>>> commuter.insert("test", data,
... columns=["name", "geom"],
... placeholders=["%s", "ST_GeomFromText(%s, 4326)"])
Check if the table exists.
>>> commuter.is_table_exist("test")
True
Check if the specific entry exists in the table. It implements a simple query building a WHERE clause from kwargs.
# SELECT 1 FROM TABLE test WHERE id=5 AND num=500
>>> commuter.is_entry_exist("test", id=5, num=500)
True
Delete entry from the table, specifying a WHERE clause using kwargs.
# DELETE FROM TABLE test WHERE id=5 AND num=500
>>> commuter.delete_entry("test", id=5, num=500)
True
Return the number of active connections to the database.
>>> commuter.get_connections_count()
9
Listener¶
PostgreSQL provides tools for setting asynchronous interaction with database session using LISTEN and NOTIFY commands.
A client application registers as a listener on the notification channel with the LISTEN command (and can stop listening with the UNLISTEN command). When the command NOTIFY is executed, the application listening on the channel is notified. A payload can be passed to provide some extra data to the listener. This is commonly used when sending notifications that table rows have been modified.
Notifications are received after trigger is fired, the poll method can be used to check for the new notifications without wasting resources.
Methods create_notify_function
and
create_trigger
present basic query constructors,
which can be used to define a new trigger and a new function associated with this trigger.
Some custom definitions can be done using execute method.
Here is the example of simple application receiving notification when rows are inserted to the table. See API reference for more details
from pgcom import Listener
>>> listener = Listener(dbname="test", user="postgres", password="secret", host="localhost")
# create a function called by trigger, it generates a notification
# which is sending to test_channel
>>> listener.create_notify_function(func_name='notify_trigger', channel='test_channel')
# create a trigger executed AFTER INSERT STATEMENT
>>> listener.create_trigger(table_name='test', func_name='notify_trigger')
# register function callback activated on the notification
>>> def on_notify(payload):
... print("received notification")
# listening loop, callback is activated on every INSERT to "test" table
>>> listener.poll(channel='test_channel', on_notify=on_notify)
received notification
received notification
Note
Note that the payload is only available from PostgreSQL 9.0: notifications received from a previous version server will have the payload attribute set to the empty string.
df = commuter.encode_composite_category(data=df, categories={“city: city”, “country”: “country”}, key=”city_id”, category_table=”cities”)