Speeding up PostgreSQL ETL pipeline with the help of GODS

Posted by:

30 Sep 2019

Problem to solve

When working on the new Federated Reporting feature for CFEngine we had to solve the problem of collecting data from multiple CFEngine hubs (feeders) on a single hub (superhub). CFEngine hubs are using PostgreSQL to store data, so, more specifically, the problem was how to collect data from multiple PostgreSQL databases in one PostgreSQL database. And because we are talking about ~1 GiB of SQL data per feeder hub and for example 10 feeders connected to a superhub here, the initial and trivial solution using basically this ETL (Extract Transform Load) pipeline – pg_dump | gz | ssh | gunzip | psql – provided really poor performance. The problem was in the last part of the pipeline – importing data using psql. Reading and writing 10 GiB of data of course takes a while, but we soon realized that I/O speed was not the bottleneck in this case.

Part of the problem is that the feeder hubs and the superhub are potentially using different versions of CFEngine and thus PostgreSQL. Their schemas can also be a little different, with the differences being backwards compatible (new columns having default values, etc.). This means, that the --inserts option has to be used when pg_dump is run. Worse, the --column-inserts have to be used because columns could be in different orders on the feeders and the superhub. And this is what man pg_dump has to say about these two options:

--inserts
    Dump data as INSERT commands (rather than COPY). This will make restoration
    very slow; it is mainly useful for making dumps that can be loaded into
    non-PostgreSQL databases. However, since this option generates a separate
    command for each row, an error in reloading a row causes only that row to be
    lost rather than the entire table contents. Note that the restore might fail
    altogether if you have rearranged column order. The --column-inserts option
    is safe against column order changes, though even slower.

Given the amount of data, "generates a separate command for each row" in our case actually means ~40 millions of commands/rows. And that is a lot of processing PostgreSQL has to do.

Solution with the help of GODS

When in troubles, GODS, or Good Old Damn Scripts, come to help.

Go parallel

The first strategy to make things faster we chose was to run the import (gunzip | psql) in parallel. Because I/O was not the bottleneck, utilizing multiple cores to handle the processing of all the PSQL commands seemed promising. First we had to change the superhub’s database schema to make it possible to import the new (fresh) data from a feeder next to the older (previous) data from the same feeder because running DELETE FROM some_table
WHERE source = the_feeder
in a transaction together with the data import is much slower than just importing the data into a new table, then running TRUNCATE old_table and replacing the references to the old table by references to the new table. And we wanted to avoid the case where a failed import of the fresh data would delete the old data.

We made use of PostgreSQL partitioning to put together data from individual feeders’ tables and this also allowed us to swap the old and new data behind the scenes atomically.

The next question was how to run things in parallel in a shell. Probably the best choice is to use the parallel utility which takes care of many issues related to running things in parallel like interleaving output etc. But we didn’t want to force CFEngine users to install GNU Parallel if they wanted to use Federated Reporting. Another option, a bit worse, is xargs -P, but even xargs is not available by default on some systems. Last resort – just use a for loop in shell backgrounding the individual tasks and keeping track of their numbers and statuses.

We could have just chosen the last option which provides worst results (interleaved outputs from the parallel tasks/commands), but since the other two options are better, we wrote a small Bash "library" – parallel.sh – providing the run_in_parallel() function which automatically chooses the best possible way to run a given command on given inputs in parallel with up to a given number of concurrent processes. With this we can simply do

echo "$dump_files" | run_in_parallel "$(dirname "$0")/import_file.sh" - $CFE_FR_IMPORT_NJOBS

and we know that the best possible option to run import_file.sh on all the dump files in parallel will be chosen and the result (exit code, $?) will always be 0 in case of success and 1 otherwise.

Reduce the number of SQL commands

The next step was to reduce the number of SQL commands (statements) PostgreSQL had to execute. As the documentation of the --inserts option says, inserting data as one row per SQL command is very slow and when the command specifies column order (--column-inserts), it is even slower. However, as explained above, we cannot drop the --column-insert option or use --insert instead. Fortunately, PostgreSQL supports a little trick – multiple VALUES tuples (rows) can be specified with a single INSERT INTO statement which can significantly speed up the import process. The upcoming new major release of PostgreSQL, version 12, supports a new option for the pg_dump command – --rows-per-insert – but older version of pg_dump have no such option and thus we cannot rely on it.

We need something that would transform multiple lines inserting data into the same table using one INSERT INTO command per row into multiple lines starting with INSERT INTO followed by multiple VALUES rows. For example:

INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=701e89c9db156196ef97a65189f575ca6ff36ecf1d373b7df128567744bb8265', 325, '2019-07-02 10:54:32+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=5963e41506ec205fe877ea5acbc5db728af41a601b0bcc45c358d00ceea11fb0', 315, '2019-07-02 10:52:05+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=4a2de5b0ac9f91de4899f1a65508e6408ddc773f3adbc981ff9fc08f0fe7eea8', 310, '2019-07-02 10:52:05+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=8818e5251f608b06804b6862fe601b0fa3d9fa57d3be8c00b92f14c8ab783631', 281, '2019-07-02 10:53:49+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=a51bfbd7e062205766b8fb682269345c0d9282b590557bfdf29a3ae138e504bf', 310, '2019-07-02 10:50:57+00', 'OK');

should be transformed into:

INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES
 ('SHA=701e89c9db156196ef97a65189f575ca6ff36ecf1d373b7df128567744bb8265', 325, '2019-07-02 10:54:32+00', 'OK'),
 ('SHA=5963e41506ec205fe877ea5acbc5db728af41a601b0bcc45c358d00ceea11fb0', 315, '2019-07-02 10:52:05+00', 'OK'),
 ('SHA=4a2de5b0ac9f91de4899f1a65508e6408ddc773f3adbc981ff9fc08f0fe7eea8', 310, '2019-07-02 10:52:05+00', 'OK'),
 ('SHA=8818e5251f608b06804b6862fe601b0fa3d9fa57d3be8c00b92f14c8ab783631', 281, '2019-07-02 10:53:49+00', 'OK'),
 ('SHA=a51bfbd7e062205766b8fb682269345c0d9282b590557bfdf29a3ae138e504bf', 310, '2019-07-02 10:50:57+00', 'OK');

which is easier to parse even for humans.

We want to do this using stream processing instead of dumping data into a temporary file and then reading the file to do the transformation because we want to keep the whole pipeline busy to get the best results (shortest times). And similarly to running things in parallel in this case we also don’t want to force users to install any extra dependencies. Again, GODS are here to help. In particular, the merge_sql_inserts.awk AWK script which does exactly the transformation described above. Running 10 GiB of data through AWK of course takes a while, 100% utilizing 1 CPU core. However, the transformation is not specific to a particular version of PostgreSQL or anything else specific to the superhub and thus can be done on the feeder hubs in parallel, each processing "only" 1 GiB of data.

Plus, it is the total time that matters. And when running data through a pipeline, the effect of adding or slowing down one step can range from slowing the whole pipeline down to speeding it up depending on what the bottleneck (or resource shortage) is. For example, when reading data from disk and sending it over network, adding compression (on the sender) and decompression (on the target) can speed things up if network speed is the limiting factor and the compression is effective and fast enough. Which is something our pipeline is doing. On the other hand, if the compression throughput is much lower than the network speed and/or the compression ratio is low (1:1), the effect of adding it to the pipeline is negative (or negligible at best).

In our case, the throughput of GNU AWK processing data with merge_sql_inserts.awk is only a little bit lower than the throughput of GZIP following it in the pipeline so on a multi-core system (recommended for a CFEngine hub) the negative impact of this extra step in the pipeline is very small. In contrast to a very big positive impact on the throughput of PostgreSQL importing the data with much fewer commands/statements.

For example, on a smaller data set (~2 GiB) and a low-end VM, the overhead of adding merge_sql_inserts.awk into the dumping part of the pipeline makes it take 14 seconds instead of 8 seconds. The importing part of the pipeline, however, goes from 9 minutes to 1.5 minutes!

Good Old Damn Scripts (GODS)

Sometimes, actually quite often, some may even say always, simple solutions using good old technologies can be very helpful and very powerful. In our particular case on the setup we use for testing, utilizing GNU Bash, GNU AWK and optionally GNU Parallel resulted in a speedup of the ETL (Extract Transform Load) pipeline from ~20 minutes to ~5 minutes. From the user’s perspective this means that the data they are looking at on the superhub is 5 minutes old at worst instead of 20 minutes old at worst which is quite a difference.

If you want some more examples of how good old technologies can help solve modern problems, search for things like "bash big data", "awk big data" with your favorite search engine.

And because Open Source and UNIX principles are part of our nature at Northern.tech, we are proud of using such simple yet powerful solutions and want to make them available for others. This is how the new GODS repository was born. At the time of writing this blog post, the repository only contains the two scripts mentioned here, but more are coming soon. They are licensed under the MIT license and we encourage everybody to use them, report any kinds of issues and ideally, submit improvements or even new GOD-grade scripts.

Vratislav Podzimek