Problem to solve
When working on the new Federated
Reporting
feature for CFEngine we had to solve the problem of collecting data from
multiple CFEngine hubs (feeders) on a single hub (superhub). CFEngine
hubs are using PostgreSQL to store data, so, more specifically, the
problem was how to collect data from multiple PostgreSQL databases in
one PostgreSQL database. And because we are talking about ~1 GiB of SQL
data per feeder hub and for example 10 feeders connected to a superhub
here, the initial and trivial solution using basically this ETL (Extract
Transform Load) pipeline - pg_dump | gz | ssh | gunzip | psql
-
provided really poor performance. The problem was in the last part of
the pipeline - importing data using psql
. Reading and writing 10 GiB
of data of course takes a while, but we soon realized that I/O speed was
not the bottleneck in this case.
Part of the problem is that the feeder hubs and the superhub are
potentially using different versions of CFEngine and thus PostgreSQL.
Their schemas can also be a little different, with the differences being
backwards compatible (new columns having default values, etc.). This
means, that the --inserts
option has to be used when pg_dump
is run.
Worse, the --column-inserts
have to be used because columns could be
in different orders on the feeders and the superhub. And this is what
man pg_dump
has to say about these two options:
--inserts
Dump data as INSERT commands (rather than COPY). This will make restoration
very slow; it is mainly useful for making dumps that can be loaded into
non-PostgreSQL databases. However, since this option generates a separate
command for each row, an error in reloading a row causes only that row to be
lost rather than the entire table contents. Note that the restore might fail
altogether if you have rearranged column order. The --column-inserts option
is safe against column order changes, though even slower.
Given the amount of data, “generates a separate command for each row” in our case actually means ~40 millions of commands/rows. And that is a lot of processing PostgreSQL has to do.
Solution with the help of GODS
When in troubles, GODS, or Good Old Damn Scripts, come to help.
Go parallel
The first strategy to make things faster we chose was to run the import
(gunzip | psql
) in parallel. Because I/O was not the bottleneck,
utilizing multiple cores to handle the processing of all the PSQL
commands seemed promising. First we had to change the superhub’s
database schema to make it possible to import the new (fresh) data from
a feeder next to the older (previous) data from the same feeder because
running DELETE FROM some_table WHERE source = the_feeder
in a
transaction together with the data import is much slower than just
importing the data into a new table, then running TRUNCATE old_table
and replacing the references to the old table by references to the new
table. And we wanted to avoid the case where a failed import of the
fresh data would delete the old data.
We made use of PostgreSQL partitioning to put together data from individual feeders’ tables and this also allowed us to swap the old and new data behind the scenes atomically.
The next question was how to run things in parallel in a shell. Probably
the best choice is to use the
parallel utility which takes
care of many issues related to running things in parallel like
interleaving output etc. But we didn’t want to force CFEngine users to
install GNU Parallel if they wanted to use Federated Reporting.
Another option, a bit worse, is xargs -P
, but even xargs
is not
available by default on some systems. Last resort - just use a for
loop in shell backgrounding the individual tasks and keeping track of
their numbers and statuses.
We could have just chosen the last option which provides worst results (interleaved outputs from the parallel tasks/commands), but since the other two options are better, we wrote a small Bash “library” - parallel.sh
-
providing the
run_in_parallel()
function which automatically chooses the best possible way to run a given command on given inputs in parallel with up to a given number of concurrent processes. With this we can simply doecho “$dump_files” | run_in_parallel “$(dirname “$0”)/import_file.sh” - $CFE_FR_IMPORT_NJOBS
and we know that the best possible option to run import_file.sh
on all
the dump files in parallel will be chosen and the result (exit code,
$?
) will always be 0
in case of success and 1
otherwise.
Reduce the number of SQL commands
The next step was to reduce the number of SQL commands (statements)
PostgreSQL had to execute. As the documentation of the --inserts
option says, inserting data as one row per SQL command is very slow and
when the command specifies column order (--column-inserts
), it is even
slower. However, as explained above, we cannot drop the
--column-insert
option or use --insert
instead. Fortunately,
PostgreSQL supports a little trick - multiple VALUES
tuples (rows) can
be specified with a single INSERT INTO
statement which can
significantly speed up the import process. The upcoming new major
release of PostgreSQL, version 12, supports a new option for the
pg_dump
command - --rows-per-insert
- but older version of pg_dump
have no such option and thus we cannot rely on it.
We need something that would transform multiple lines inserting data
into the same table using one INSERT INTO
command per row into
multiple lines starting with INSERT INTO
followed by multiple VALUES
rows. For example:
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=701e89c9db156196ef97a65189f575ca6ff36ecf1d373b7df128567744bb8265', 325, '2019-07-02 10:54:32+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=5963e41506ec205fe877ea5acbc5db728af41a601b0bcc45c358d00ceea11fb0', 315, '2019-07-02 10:52:05+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=4a2de5b0ac9f91de4899f1a65508e6408ddc773f3adbc981ff9fc08f0fe7eea8', 310, '2019-07-02 10:52:05+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=8818e5251f608b06804b6862fe601b0fa3d9fa57d3be8c00b92f14c8ab783631', 281, '2019-07-02 10:53:49+00', 'OK');
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES ('SHA=a51bfbd7e062205766b8fb682269345c0d9282b590557bfdf29a3ae138e504bf', 310, '2019-07-02 10:50:57+00', 'OK');
should be transformed into:
INSERT INTO public.__agentstatus (hostkey, agentexecutioninterval, lastagentlocalexecutiontimestamp, lastagentexecutionstatus) VALUES
('SHA=701e89c9db156196ef97a65189f575ca6ff36ecf1d373b7df128567744bb8265', 325, '2019-07-02 10:54:32+00', 'OK'),
('SHA=5963e41506ec205fe877ea5acbc5db728af41a601b0bcc45c358d00ceea11fb0', 315, '2019-07-02 10:52:05+00', 'OK'),
('SHA=4a2de5b0ac9f91de4899f1a65508e6408ddc773f3adbc981ff9fc08f0fe7eea8', 310, '2019-07-02 10:52:05+00', 'OK'),
('SHA=8818e5251f608b06804b6862fe601b0fa3d9fa57d3be8c00b92f14c8ab783631', 281, '2019-07-02 10:53:49+00', 'OK'),
('SHA=a51bfbd7e062205766b8fb682269345c0d9282b590557bfdf29a3ae138e504bf', 310, '2019-07-02 10:50:57+00', 'OK');
which is easier to parse even for humans.
We want to do this using stream processing instead of dumping data into
a temporary file and then reading the file to do the transformation
because we want to keep the whole pipeline busy to get the best results
(shortest times). And similarly to running things in parallel in this
case we also don’t want to force users to install any extra
dependencies. Again, GODS are here to help. In particular, the
merge_sql_inserts.awk
AWK script which does exactly the transformation described above.
Running 10 GiB of data through AWK of course takes a while, 100%
utilizing 1 CPU core. However, the transformation is not specific to a
particular version of PostgreSQL or anything else specific to the
superhub and thus can be done on the feeder hubs in parallel, each
processing “only” 1 GiB of data.
Plus, it is the total time that matters. And when running data through a pipeline, the effect of adding or slowing down one step can range from slowing the whole pipeline down to speeding it up depending on what the bottleneck (or resource shortage) is. For example, when reading data from disk and sending it over network, adding compression (on the sender) and decompression (on the target) can speed things up if network speed is the limiting factor and the compression is effective and fast enough. Which is something our pipeline is doing. On the other hand, if the compression throughput is much lower than the network speed and/or the compression ratio is low (1:1), the effect of adding it to the pipeline is negative (or negligible at best).
In our case, the throughput of GNU AWK processing data with
merge_sql_inserts.awk
is only a little bit lower than the throughput
of GZIP following it in the pipeline so on a multi-core system
(recommended for a CFEngine hub) the negative impact of this extra step
in the pipeline is very small. In contrast to a very big positive impact
on the throughput of PostgreSQL importing the data with much fewer
commands/statements.
For example, on a smaller data set (~2 GiB) and a low-end VM, the
overhead of adding merge_sql_inserts.awk
into the dumping part of the
pipeline makes it take 14 seconds instead of 8 seconds. The
importing part of the pipeline, however, goes from 9 minutes to
1.5 minutes!
Good Old Damn Scripts (GODS)
Sometimes, actually quite often, some may even say always, simple solutions using good old technologies can be very helpful and very powerful. In our particular case on the setup we use for testing, utilizing GNU Bash, GNU AWK and optionally GNU Parallel resulted in a speedup of the ETL (Extract Transform Load) pipeline from ~20 minutes to ~5 minutes. From the user’s perspective this means that the data they are looking at on the superhub is 5 minutes old at worst instead of 20 minutes old at worst which is quite a difference.
If you want some more examples of how good old technologies can help solve modern problems, search for things like “bash big data”, “awk big data” with your favorite search engine.
And because Open Source and UNIX principles are part of our nature at Northern.tech, we are proud of using such simple yet powerful solutions and want to make them available for others. This is how the new GODS repository was born. At the time of writing this blog post, the repository only contains the two scripts mentioned here, but more are coming soon. They are licensed under the MIT license and we encourage everybody to use them, report any kinds of issues and ideally, submit improvements or even new GOD-grade scripts.