When you run advanced SQL in Citus, what’s possible? Which SQL statements work, and which ones don’t? Citus is a PostgreSQL extension that adds powerful sharding capabilities to PostgreSQL. However, every solution does have limitations. Therefore, it makes sense to take a look at the latest version of Citus and learn how to properly use the most frequently-needed SQL features. Note that this is not a comprehensive overview, rather it is a guide through 6 of the most commonly-needed SQL tools:
- Naming Citus databases
- Loading data using
- Advanced SQL: Using ordered sets
- Advanced SQL: Window functions with Citus and PostgreSQL
- Rethinking grouping sets
- Using triggers with Citus
ALTER TABLEin Citus
The first thing people typically notice is that with Citus, managing databases is not as straightforward as in a “normal PostgreSQL deployment” anymore. You might notice that most Citus examples use the “postgres” database for demonstration purposes. The reason is simple:
testbox:citus hs$ createdb somename NOTICE: Citus partially supports CREATE DATABASE for distributed databases DETAIL: Citus does not propagate CREATE DATABASE command to workers HINT: You can manually create a database and its extensions on workers.
The Citus workers are bound to a certain database because in PostgreSQL, a database is a strict separation. Therefore people are supposed to use schemas rather than multiple databases. However, if you really run analytics at scale, you’ll most likely not need multiple databases on the same infrastructure anyway. You can build multiple clusters in Kubernetes using Patroni to ensure high availability.
The first takeaway is therefore:
Just use the
postgresdatabase and you’ll be OK.
Bulk loading data using
COPY is the key to loading data quickly. Yes, you can use the PostgreSQL
COPY command with Citus. Here’s how it works:
postgres=# CREATE TABLE t_oil ( region text, country text, year int, production int, consumption int ); CREATE TABLE postgres=# SELECT create_distributed_table('t_oil', 'country'); create_distributed_table -------------------------- (1 row) postgres=# COPY t_oil FROM PROGRAM 'curl https://www.cybertec-postgresql.com/secret/oil_ext.txt'; COPY 644
Data was loaded successfully. The system will automatically route the tuples to the right shard and ensure that there is full transparency.
Bulk loading data with
COPYin Citus works just fine.
What is an ordered set? We’re all familiar with calculating the “average”. However, this is not what we want in many cases. Let us imagine that we have three people: Person A earns 1 USD, person B earns 3 USD and person C earns 1 million USD. On average everybody makes north of 300k USD so we should be fine? Actually, no, because two people are starving. The average of a data set is therefore often not what reveals the real situation. A more meaningful value is the “median” of the data set, which is the value in the middle if you order the data set. In the case of 1, 3 and 1000000, the middle value is 3. 50% of the values in the data set are smaller than the median, and 50% of the values are bigger. In SQL, this means that we have to sort the data and get the middle of this ordered data set.
WITHIN GROUP (ORDER BY …) is necessary for aggregates like the median that require a certain ordering.
So let’s see if we can run ordered sets in a Citus-enabled PostgreSQL database:
postgres=# SELECT country, avg(production), percentile_disc(0.5) WITHIN GROUP (ORDER BY production) FROM t_oil GROUP BY country ORDER BY avg(production) DESC NULLS LAST; country | avg | percentile_disc ----------------------+-----------------------+----------------- USA | 9141.3478260869565217 | 9159 Saudi Arabia | 7641.8260869565217391 | 8820 Iran | 3631.6956521739130435 | 3730 Mexico | 2359.5217391304347826 | 2930 Canada | 2123.2173913043478261 | 1967 Kuwait | 2083.6956521739130435 | 2182 United Arab Emirates | 1936.0434782608695652 | 1937 Iraq | 1780.4130434782608696 | 1899 Qatar | 609.8695652173913043 | 461 Oman | 586.4545454545454545 | 625 Syria | 335.9767441860465116 | 341 Yemen | 307.2800000000000000 | 346 Other Middle East | 74.2173913043478261 | 53 Israel | | (14 rows)
Here is the execution plan of the query:
postgres=# explain analyze SELECT country, avg(production), percentile_disc(0.5) WITHIN GROUP (ORDER BY production) FROM t_oil GROUP BY country ORDER BY avg(production) DESC NULLS LAST; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------- Sort (cost=12406.82..12656.82 rows=100000 width=68) (actual time=11.035..11.037 rows=14 loops=1) Sort Key: remote_scan.avg DESC NULLS LAST Sort Method: quicksort Memory: 25kB -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=68) (actual time=11.012..11.014 rows=14 loops=1) Task Count: 32 Tuple data received from nodes: 384 bytes Tasks Shown: One of 32 -> Task Tuple data received from node: 81 bytes Node: host=localhost port=6004 dbname=postgres -> GroupAggregate (cost=8.28..9.71 rows=3 width=42) (actual time=0.097..0.133 rows=3 loops=1) Group Key: country -> Sort (cost=8.28..8.63 rows=138 width=10) (actual time=0.068..0.077 rows=138 loops=1) Sort Key: country Sort Method: quicksort Memory: 32kB -> Seq Scan on t_oil_102255 t_oil (cost=0.00..3.38 rows=138 width=10) (actual time=0.008..0.028 rows=138 loops=1) Planning Time: 0.025 ms Execution Time: 0.147 ms Planning Time: 1.153 ms Execution Time: 11.069 ms (20 rows)
Voila, it works.
The takeaway is:
Yes, ordered set aggregates can be used with Citus DB.
When analyzing time series, it’s often necessary to calculate the difference between rows. One example would be to calculate the difference between a value and the difference to the previous period. We might want to know if production has risen or fallen. The following example does exactly that:
postgres=# SELECT year, production, production - lag(production) OVER (ORDER BY year) AS diff, avg(production) OVER (ORDER BY year ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) FROM t_oil WHERE country = 'USA' AND year > 2000 ; year | production | diff | avg ------+------------+------+----------------------- 2001 | 7669 | | 7565.0000000000000000 2002 | 7626 | -43 | 7480.7500000000000000 2003 | 7400 | -226 | 7363.6000000000000000 2004 | 7228 | -172 | 7198.0000000000000000 2005 | 6895 | -333 | 7042.2000000000000000 2006 | 6841 | -54 | 6909.0000000000000000 2007 | 6847 | 6 | 6917.6000000000000000 2008 | 6734 | -113 | 7041.2000000000000000 2009 | 7271 | 537 | 7091.2500000000000000 2010 | 7513 | 242 | 7172.6666666666666667 (10 rows)
We can indeed run window functions and analytics in Citus. Of course, we have to keep in mind that we are operating in a sharded environment. Window functions usually need sorted input.
Citus supports window functions, but your data should be aligned in a way that the system can provide sorted input to achieve a decent level of efficiency.
If you are working on big reporting projects, you’ll inevitably need some kind of grouping sets. What is a grouping set? The goal is to perform more than one aggregation at once. Maybe we want to calculate the average production per country, but also have a bottom line that contains the “overall average” of all rows. The way to do that in advanced SQL is to use
SELECT country, avg(production) FROM t_oil GROUP BY ROLLUP (country) ORDER BY avg(production) DESC NULLS LAST; ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP HINT: Consider using an equality filter on the distributed table's partition column.
Basically, the idea behind
ROLLUP is to group by “country” as well as by “nothing” which leaves us with two grouping criteria:
SELECT country, avg(production) FROM t_oil GROUP BY GROUPING SETS ((), (country)) ORDER BY avg(production) DESC NULLS LAST; ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP HINT: Consider using an equality filter on the distributed table's partition column.
Neither feature is in Citus (yet?), therefore we have to work around this missing feature.
The way to model grouping sets is to make use of
SELECT country, avg(production) FROM t_oil GROUP BY country UNION ALL SELECT NULL, avg(production) FROM t_oil ORDER BY country; country | avg ----------------------+----------------------- Canada | 2123.2173913043478261 Iran | 3631.6956521739130435 Iraq | 1780.4130434782608696 Israel | Kuwait | 2083.6956521739130435 Mexico | 2359.5217391304347826 Oman | 586.4545454545454545 Other Middle East | 74.2173913043478261 Qatar | 609.8695652173913043 Saudi Arabien | 7641.8260869565217391 Syria | 335.9767441860465116 USA | 9141.3478260869565217 United Arab Emirates | 1936.0434782608695652 Yemen | 307.2800000000000000 | 2607.5139860139860140 (15 rows)
The downside is that we have to read the data twice (once for the country list and once for the overall average).
Emulating grouping sets with
UNION ALLin a column store will still perform better than using grouping sets with a row store.
Often, people want to run triggers on their data. However, there is a catch.
Let’s write a basic trigger and see what happens:
CREATE FUNCTION trig_func() RETURNS trigger AS $$ BEGIN NEW.production := round(NEW.production, -1); RAISE NOTICE 'rounded production: %', NEW.production; RETURN NEW; END; $$ LANGUAGE 'plpgsql'; CREATE TRIGGER mytrig BEFORE INSERT ON t_oil FOR EACH ROW EXECUTE PROCEDURE trig_func(); ERROR: triggers are not supported on distributed tables
We CANNOT use triggers on distributed Citus tables.
The takeaway here is:
It’s not possible to use triggers on distributed tables.
Once in a while, the data structure of a table must be changed. In SQL, the command to do that is
ALTER TABLE. However, there are some implications which we have to keep in mind:
postgres=# ALTER TABLE t_oil ADD COLUMN data_verified boolean DEFAULT true; ALTER TABLE
Adding columns is actually simple and can be easily done. However, dropping columns is more delicate:
postgres=# ALTER TABLE t_oil DROP COLUMN country; ERROR: cannot execute ALTER TABLE command involving partition column
In case the partitioning column is touched, Citus will error out and PostgreSQL will not drop the column. This kind of behavior is totally expected and it is logical to prevent this operation from happening, because it would destroy the entire setup.
What is possible is to rename the sharding column. Citus and PostgreSQL will handle this nicely for us:
postgres=# ALTER TABLE t_oil RENAME COLUMN country TO data_country; ALTER TABLE
There are some restrictions to changing the table structure, particularly when the sharding column is involved.
- Check out our other Citus blogs.
- Want to improve your PostgreSQL database performance? See our latest performance content here.
- In case you need any assistance, please feel free to contact us.