When you run advanced SQL in Citus, what’s possible? Which SQL statements work, and which ones don’t? Citus is a PostgreSQL extension that adds powerful sharding capabilities to PostgreSQL. However, every solution does have limitations. Therefore, it makes sense to take a look at the latest version of Citus and learn how to properly use the most frequently-needed SQL features. Note that this is not a comprehensive overview, rather it is a guide through 6 of the most commonly-needed SQL tools:

  1. Naming Citus databases
  2. The first thing people typically notice is that with Citus, managing databases is not as straightforward as in a “normal PostgreSQL deployment” anymore. You might notice that most Citus examples use the “postgres” database for demonstration purposes. The reason is simple:

    testbox:citus hs$ createdb somename
    NOTICE:  Citus partially supports CREATE DATABASE for distributed databases
    DETAIL:  Citus does not propagate CREATE DATABASE command to workers
    HINT:  You can manually create a database and its extensions on workers.
    

    The Citus workers are bound to a certain database because in PostgreSQL, a database is a strict separation. Therefore people are supposed to use schemas rather than multiple databases. However, if you really run analytics at scale, you’ll most likely not need multiple databases on the same infrastructure anyway. You can build multiple clusters in Kubernetes using Patroni to ensure high availability.

    The first takeaway is therefore:

    Just use the postgres database and you’ll be OK.

  3. Loading data using COPY
  4. Bulk loading data using COPY is the key to loading data quickly. Yes, you can use the PostgreSQL COPY command with Citus. Here’s how it works:

    postgres=# CREATE TABLE t_oil (
    region 	text, 
    country 	text, 
    year 		int, 
    production int, 
    consumption int
    );
    CREATE TABLE
    postgres=# SELECT create_distributed_table('t_oil', 'country');
     create_distributed_table 
    --------------------------
     
    (1 row)
    
    postgres=# COPY t_oil 
    FROM PROGRAM 
    'curl https://www.cybertec-postgresql.com/secret/oil_ext.txt';
    COPY 644
    

    Data was loaded successfully. The system will automatically route the tuples to the right shard and ensure that there is full transparency.

    Takeaway:

    Bulk loading data with COPY in Citus works just fine.

  5. Advanced SQL: Using ordered sets
  6. What is an ordered set? We’re all familiar with calculating the “average”. However, this is not what we want in many cases. Let us imagine that we have three people: Person A earns 1 USD, person B earns 3 USD and person C earns 1 million USD. On average everybody makes north of 300k USD so we should be fine? Actually, no, because two people are starving. The average of a data set is therefore often not what reveals the real situation. A more meaningful value is the “median” of the data set, which is the value in the middle if you order the data set. In the case of 1, 3 and 1000000, the middle value is 3. 50% of the values in the data set are smaller than the median, and 50% of the values are bigger. In SQL, this means that we have to sort the data and get the middle of this ordered data set. WITHIN GROUP (ORDER BY …) is necessary for aggregates like the median that require a certain ordering.

    So let’s see if we can run ordered sets in a Citus-enabled PostgreSQL database:

    postgres=# SELECT  country, 
       avg(production), 
       percentile_disc(0.5) 
    WITHIN GROUP (ORDER BY production) 
    FROM 	   t_oil 
    GROUP BY country 
    ORDER BY avg(production) DESC NULLS LAST;
           country        |          avg          | percentile_disc 
    ----------------------+-----------------------+-----------------
     USA                  | 9141.3478260869565217 |            9159
     Saudi Arabia         | 7641.8260869565217391 |            8820
     Iran                 | 3631.6956521739130435 |            3730
     Mexico               | 2359.5217391304347826 |            2930
     Canada               | 2123.2173913043478261 |            1967
     Kuwait               | 2083.6956521739130435 |            2182
     United Arab Emirates | 1936.0434782608695652 |            1937
     Iraq                 | 1780.4130434782608696 |            1899
     Qatar                |  609.8695652173913043 |             461
     Oman                 |  586.4545454545454545 |             625
     Syria                |  335.9767441860465116 |             341
     Yemen                |  307.2800000000000000 |             346
     Other Middle East    |   74.2173913043478261 |              53
     Israel               |                       |                
    (14 rows)
    

    Here is the execution plan of the query:

    postgres=# explain analyze 
    SELECT  country, 
       avg(production), 
       percentile_disc(0.5) 
    WITHIN GROUP (ORDER BY production) 
    FROM 	   t_oil 
    GROUP BY country
    ORDER BY avg(production) DESC NULLS LAST;
                                                                      QUERY PLAN                                                                   
    -----------------------------------------------------------------------------------------------------------------------------------------------
     Sort  (cost=12406.82..12656.82 rows=100000 width=68) (actual time=11.035..11.037 rows=14 loops=1)
       Sort Key: remote_scan.avg DESC NULLS LAST
       Sort Method: quicksort  Memory: 25kB
       ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=68) (actual time=11.012..11.014 rows=14 loops=1)
             Task Count: 32
             Tuple data received from nodes: 384 bytes
             Tasks Shown: One of 32
             ->  Task
                   Tuple data received from node: 81 bytes
                   Node: host=localhost port=6004 dbname=postgres
                   ->  GroupAggregate  (cost=8.28..9.71 rows=3 width=42) (actual time=0.097..0.133 rows=3 loops=1)
                         Group Key: country
                         ->  Sort  (cost=8.28..8.63 rows=138 width=10) (actual time=0.068..0.077 rows=138 loops=1)
                               Sort Key: country
                               Sort Method: quicksort  Memory: 32kB
                               ->  Seq Scan on t_oil_102255 t_oil  (cost=0.00..3.38 rows=138 width=10) (actual time=0.008..0.028 rows=138 loops=1)
                       Planning Time: 0.025 ms
                       Execution Time: 0.147 ms
     Planning Time: 1.153 ms
     Execution Time: 11.069 ms
    (20 rows)
    

    Voila, it works.

    The takeaway is:

    Yes, ordered set aggregates can be used with Citus DB.

  7. Advanced SQL: Window functions with Citus and PostgreSQL
  8. When analyzing time series, it’s often necessary to calculate the difference between rows. One example would be to calculate the difference between a value and the difference to the previous period. We might want to know if production has risen or fallen. The following example does exactly that:

    postgres=# SELECT	
    year,
    production,
    production - lag(production) OVER (ORDER BY year) AS diff,
    avg(production) OVER (ORDER BY year 
    ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
    FROM  t_oil
    WHERE country = 'USA'
     AND year > 2000
    ;
     year | production | diff |          avg          
    ------+------------+------+-----------------------
     2001 |       7669 |      | 7565.0000000000000000
     2002 |       7626 |  -43 | 7480.7500000000000000
     2003 |       7400 | -226 | 7363.6000000000000000
     2004 |       7228 | -172 | 7198.0000000000000000
     2005 |       6895 | -333 | 7042.2000000000000000
     2006 |       6841 |  -54 | 6909.0000000000000000
     2007 |       6847 |    6 | 6917.6000000000000000
     2008 |       6734 | -113 | 7041.2000000000000000
     2009 |       7271 |  537 | 7091.2500000000000000
     2010 |       7513 |  242 | 7172.6666666666666667
    (10 rows)
    

    We can indeed run window functions and analytics in Citus. Of course, we have to keep in mind that we are operating in a sharded environment. Window functions usually need sorted input.

    Takeaway:

    Citus supports window functions, but your data should be aligned in a way that the system can provide sorted input to achieve a decent level of efficiency.

  9. Rethinking grouping sets
  10. If you are working on big reporting projects, you’ll inevitably need some kind of grouping sets. What is a grouping set? The goal is to perform more than one aggregation at once. Maybe we want to calculate the average production per country, but also have a bottom line that contains the “overall average” of all rows. The way to do that in advanced SQL is to use ROLLUP:

    SELECT  country, avg(production) 
    FROM    t_oil 
    GROUP BY ROLLUP (country) 
    ORDER BY avg(production) DESC NULLS LAST;
    ERROR:  could not run distributed query with GROUPING SETS, CUBE, or ROLLUP
    HINT:  Consider using an equality filter on the distributed table's partition column.
    

    Basically, the idea behind ROLLUP is to group by “country” as well as by “nothing” which leaves us with two grouping criteria:

    SELECT  country, avg(production) 
    FROM    t_oil 
    GROUP BY GROUPING SETS ((), (country)) 
    ORDER BY avg(production) DESC NULLS LAST;
    ERROR:  could not run distributed query with GROUPING SETS, CUBE, or ROLLUP
    HINT:  Consider using an equality filter on the distributed table's partition column.
    

    Neither feature is in Citus (yet?), therefore we have to work around this missing feature.

    The way to model grouping sets is to make use of UNION ALL:

    SELECT  country, 
            avg(production) 
    FROM    t_oil 
    GROUP BY country 
    UNION ALL 
    SELECT  NULL, 
            avg(production) 
    FROM    t_oil
    ORDER BY country;
           country        |          avg          
    ----------------------+-----------------------
     Canada               | 2123.2173913043478261
     Iran                 | 3631.6956521739130435
     Iraq                 | 1780.4130434782608696
     Israel               |                      
     Kuwait               | 2083.6956521739130435
     Mexico               | 2359.5217391304347826
     Oman                 |  586.4545454545454545
     Other Middle East    |   74.2173913043478261
     Qatar                |  609.8695652173913043
     Saudi Arabien        | 7641.8260869565217391
     Syria                |  335.9767441860465116
     USA                  | 9141.3478260869565217
     United Arab Emirates | 1936.0434782608695652
     Yemen                |  307.2800000000000000
                          | 2607.5139860139860140
    (15 rows)
    

    The downside is that we have to read the data twice (once for the country list and once for the overall average).

    Takeaway:

    Emulating grouping sets with UNION ALL in a column store will still perform better than using grouping sets with a row store.

  11. Using triggers with Citus
  12. Often, people want to run triggers on their data. However, there is a catch.

    Let’s write a basic trigger and see what happens:

    CREATE FUNCTION trig_func() 
    RETURNS trigger AS
    $$
    	BEGIN
    		NEW.production := round(NEW.production, -1);
    		RAISE NOTICE 'rounded production: %', NEW.production;
    		RETURN NEW;
    	END;
    $$ LANGUAGE 'plpgsql';
    
    CREATE TRIGGER mytrig 
    	BEFORE INSERT 
    	ON t_oil
    	FOR EACH ROW 
    	EXECUTE PROCEDURE trig_func();
    
    ERROR:  triggers are not supported on distributed tables
    

    We CANNOT use triggers on distributed Citus tables.

    The takeaway here is:

    It’s not possible to use triggers on distributed tables.

  13. Running ALTER TABLE in Citus
  14. Once in a while, the data structure of a table must be changed. In SQL, the command to do that is ALTER TABLE. However, there are some implications which we have to keep in mind:

    postgres=# ALTER TABLE t_oil 
    ADD COLUMN data_verified boolean DEFAULT true;
    ALTER TABLE
    

    Adding columns is actually simple and can be easily done. However, dropping columns is more delicate:

    postgres=# ALTER TABLE t_oil DROP COLUMN country;
    ERROR:  cannot execute ALTER TABLE command involving partition column
    

    In case the partitioning column is touched, Citus will error out and PostgreSQL will not drop the column. This kind of behavior is totally expected and it is logical to prevent this operation from happening, because it would destroy the entire setup.

    What is possible is to rename the sharding column. Citus and PostgreSQL will handle this nicely for us:

    postgres=# ALTER TABLE t_oil RENAME COLUMN country TO data_country;
    ALTER TABLE
    

Takeaway:

There are some restrictions to changing the table structure, particularly when the sharding column is involved.

Finally …


In order to receive regular updates on important changes in PostgreSQL, subscribe to our newsletter, or follow us on Twitter, Facebook, or LinkedIn.