While sharding is often advertised as “THE solution to PostgreSQL scalability”, it is necessary to keep some technical aspects in consideration in terms of performance. The rule is: Sharding should not be used without a deeper awareness of what it is you are actually doing to the data. It’s important to keep in mind that sharding has to be applied in a clever and thoughtful manner. One of the most common mistakes is to ignore the concept of “data locality”. It’s important for many IT problems, but crucial in the context of database sharding. Citus is one of the most sophisticated sharding solutions in the PostgreSQL world. It can help you to achieve maximum scalability and allows for efficient analytics as well as OLTP. Citus is available on-premise or as part of the Microsoft Azure cloud.

What is data locality? Let’s take a look together.

Preparing data for sharding

To demonstrate the concept, we first have to create two tables. For the sake of simplicity, we’ll use customers and sales:

postgres=# CREATE TABLE t_customer (
id   int, 
name text
postgres=# CREATE TABLE t_sales (
id          int, 
customer_id int, 
whatever    text

The data model is really straightforward. In this scenario, the typical way to analyse the data is to join the customer with the sales table. Why is this relevant?

To understand it, first let’s distribute the table and add some data:

postgres=# SELECT create_distributed_table('t_customer', 'id');
(1 row)

postgres=# SELECT create_distributed_table('t_sales', 'id');
(1 row)

Note that the data is sharded using the “id” which is not the join criteria.

In the next step, we can load some data:

postgres=# INSERT INTO t_customer 
SELECT 	*, 'dummy' 
FROM 		generate_series(1, 1000000);
INSERT 0 1000000

postgres=# INSERT INTO t_sales 
SELECT 	id, random()*100000, 'dummy' 
FROM 		generate_series(1, 1000000) AS id;
INSERT 0 1000000

Running queries across PostgreSQL shards

Once the data has been loaded, it’s time to execute a query. All we want to do is to join and count some data. Here’s what happens:

postgres=# SELECT count(*) 
FROM 	  t_customer AS c, t_sales AS s 
WHERE   c.id = s.customer_id;
ERROR:  the query contains a join that requires repartitioning
HINT:  Set citus.enable_repartition_joins to on to enable repartitioning

Oops! Citus refuses to run the query because the join doesn’t use the sharding criteria in the data model.

Citus will send an error message by default and refuse to run the query. Why is that the case? Imagine we want to join two rows on the same machine: PostgreSQL will do some magic in RAM locally and voila, two rows has been join (it is not as simple as you can see in our blog post about PostgreSQL join strategies but one can understand the logic). Local joins are efficient but if all the data has to go through an expensive network stack? Things will be orders of magnitude more expensive and in case of large data sets this strategy becomes totally impractical and doomed to fail. Therefore it is better to error out than to lead the user directly into a serious performance problem.

In some cases, it might still be necessary to force a join. However, this requires a configuration variable which can be set locally. Note that this is not something you should do in real life on large data sets:

postgres=# SET citus.enable_repartition_joins = on;
postgres=# SELECT count(*) 
FROM 	 t_customer AS c, t_sales AS s 
WHERE	 c.id = s.customer_id;
(1 row)

Due to the small data set we are using here (for the sake of simplicity), the query does indeed return in reasonable time. But as stated before, if you do this on large data sets which are not on the same machine as my little test, this query might not end at all.

To understand what’s going on, we can take a look at the execution plan of the query:

postgres=# explain analyze SELECT count(*) 
FROM 	t_customer AS c, t_sales AS s 
WHERE 	c.id = s.customer_id;
                                                         QUERY PLAN                                                         
 Aggregate  (cost=250.00..250.02 rows=1 width=8) (actual time=1283.486..1283.486 rows=1 loops=1)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=1283.478..1283.480 rows=20 loops=1)
         Task Count: 20
         Tuple data received from nodes: 160 bytes
         Tasks Shown: None, not supported for re-partition queries
         ->  MapMergeJob
               Map Task Count: 32
               Merge Task Count: 20
         ->  MapMergeJob
               Map Task Count: 32
               Merge Task Count: 20
 Planning Time: 10.282 ms
 Execution Time: 1283.670 ms
(13 rows)

We need around 1.3 seconds to run the query. If the shards reside on different machines, and not just on different ports, this execution time can easily increase by many times.

Joining sharded data with Citus properly

To fix the problem, we have to introduce the concept of “data locality”. The importance of this idea cannot be underestimated. The core idea is that data which has to be joined should stay in the same shard. Do not move data between shards and avoid large operations across shards as much as you can.

In my example, what we have to do is to structure our tables in a way that the same sharding criteria is used for BOTH tables:

postgres=# CREATE TABLE t_sales_proper AS 
SELECT * FROM t_sales;
SELECT 1000000

postgres=# SELECT create_distributed_table(
colocate_with => 't_customer'
NOTICE:  Copying data from local table...
NOTICE:  copying the data has completed
DETAIL:  The local data in the table is no longer visible, but is still on disk.
HINT:  To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t_sales_proper$$)
(1 row)

Fortunately Citus is nice enough to warn us that we have just created a local table with all the data. The create_distributed_table function DOES NOT delete local data for safety reasons so we have to truncate it locally to avoid storing it twice:

postgres=# SELECT truncate_local_data_after_distributing_table($$public.t_sales_proper$$);
(1 row)

Storing data locally is usually a bad idea in any case, since you can easily run out of space. Sharding and Citus is all about scalability, so moving data to a central place might not be the best of ideas, due to space constraints and other limitations which are diametrically opposed to the core ideas of sharding and scalability.

Once the data has been properly sharded, we can run the query again and see what happens:

postgres=# explain analyze SELECT count(*) 
FROM 	t_customer AS c, t_sales_proper AS s 
WHERE 	c.id = s.customer_id;
                                                                        QUERY PLAN                                                                         
 Aggregate  (cost=250.00..250.02 rows=1 width=8) (actual time=98.915..98.915 rows=1 loops=1)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=98.907..98.909 rows=32 loops=1)
         Task Count: 32
         Tuple data received from nodes: 256 bytes
         Tasks Shown: One of 32
         ->  Task
               Tuple data received from node: 8 bytes
               Node: host=localhost port=6003 dbname=postgres
               ->  Aggregate  (cost=1851.57..1851.58 rows=1 width=8) (actual time=25.905..25.907 rows=1 loops=1)
                     ->  Hash Join  (cost=858.76..1773.21 rows=31345 width=0) (actual time=8.929..23.526 rows=31345 loops=1)
                           Hash Cond: (s.customer_id = c.id)
                           ->  Seq Scan on t_sales_proper_102142 s  (cost=0.00..483.45 rows=31345 width=4) (actual time=0.007..2.387 rows=31345 loops=1)
                           ->  Hash  (cost=474.45..474.45 rows=30745 width=4) (actual time=8.863..8.863 rows=30745 loops=1)
                                 Buckets: 32768  Batches: 1  Memory Usage: 1337kB
                                 ->  Seq Scan on t_customer_102078 c  (cost=0.00..474.45 rows=30745 width=4) (actual time=0.005..3.701 rows=30745 loops=1)
                   Planning Time: 0.138 ms
                   Execution Time: 26.038 ms
 Planning Time: 2.678 ms
 Execution Time: 98.964 ms

Wow! The query is 15 times faster than before.

In reality, the performance difference is often even larger, but still: This is important and should be taken seriously.

Finally …

If you’re new to sharding and Citus for PostgreSQL, you might want to check out some of our other blog posts related to scalability.


In order to receive regular updates on important changes in PostgreSQL, subscribe to our newsletter, or follow us on Twitter, Facebook, or LinkedIn.