In the latest version of the CFEngine network protocol
(filestream - v4), we leveraged
librsync
for efficient file copying
using their Streaming
API.
While implementing the file streaming in CFEngine, I found that the documentation on the Streaming API was a bit unclear. Thus, I created two example programs to experiment with how it works. I thought I’d share them in this blog post as a tutorial to help other developers get up to speed faster.
The complete files after following this tutorial should look more or less like these:
Implementing a server
To implement efficient file streaming, we’ll first need a client and a server to connect and communicate through sockets. Let’s start with implementing the server, given that a client cannot function without a server.
We’ll start by creating a file server.c
with the entry point main()
calling
a function accept_connection()
(I also included all the header files we’ll
need throughout this blog post).
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <librsync.h>
#include "common.h"
static int accept_connection(void);
int main(int argc, char *argv[])
{
puts("Waiting for connection...");
int sock = accept_connection();
if (sock == -1) {
return EXIT_FAILURE;
}
close(sock);
return EXIT_SUCCESS;
}
static int accept_connection(void) {
/* Create socket */
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
perror("Failed to create socket");
return -1;
}
/* Enable reuse address (to avoid "Address already in use" errors */
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
/* Assign IP address and port */
struct sockaddr_in server_addr = { 0 };
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY); /* Use local IP */
server_addr.sin_port = htons(PORT);
/* Bind socket to given IP address */
int ret = bind(sock, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (ret == -1) {
perror("Failed to bind socket");
close(sock);
return -1;
}
/* Listen for incoming connections. In a real-world application you should
have a larger "connection request" queue. */
ret = listen(sock, 1);
if (ret == -1) {
perror("Failed to listen");
close(sock);
return -1;
}
/* Accept incoming connection */
struct sockaddr_in client_addr; socklen_t addr_len;
int conn = accept(sock, (struct sockaddr *)&client_addr, &addr_len);
/* We don't expect any more connections in this example. In a real-world
application you should keep this socket open to accept more
connections. */
close(sock);
if (conn == -1) {
perror("Failed to accept");
return -1;
}
return conn;
}
I won’t go into great detail about network programming. If there is something you don’t understand, I would highly recommend reading Beej’s Guide to Network Programming. It’s free and written in a language mortal programmers can understand.
Nonetheless, here is a short description of what the code does. We create an
endpoint for communication through TCP/IP using the “system call” socket()
which provides a file descriptor for reading and writing.
Next comes a little hack that can save you some time and frustration. Soon, we’ll bind our newly created socket to a port on our machine. However, when rerunning the server program, one might experience errors such as “Address already in use.” This is because parts of the socket may still linger in the kernel after exiting our program. We can either wait for the socket to clear or tell the kernel to allow the port to be reused.
Finally, we bind the server to a local IPv4 address and a port (that we will later specify using a macro definition), followed by listening for incoming connections.
In the call to listen()
, I specified 1
as the backlog argument. You’d
probably want a grander connection request queue in a real-world application.
Otherwise, the server would accept the first connection and refuse all others
while processing the first one.
The program blocks until a connection request is received. At this point, we will have to decide whether to accept. Of course - we’ll give the nod - because “Happiness can exist only in acceptance” (George Orwell).
The call to accept()
returns a new connected socket for talking to the client.
Given that we don’t expect any more connections in our example server program,
we will close our listening socket and return the connected socket.
We are almost ready to compile our server program. However, we still need to
define the PORT
macro. We will add the definition in a shared header file,
common.h
. This way, the client program will also be able to access it. I took
the liberty to include all of the header files that we’ll need throughout this
blog post here as well.
#ifndef COMMON_H
#define COMMON_H
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <arpa/inet.h>
#include <librsync.h>
/** The port the server will be listening on / connecting to */
#define PORT 5612
#endif /* COMMON_H */
We should now be able to compile the server program using the following command.
gcc -g -o server server.c
Run it with the following command.
./server
Waiting for connection...
^C
However, no clients are connecting it. Thus, the program is blocked forever. Type CTRL+C to stop it.
Implementing a client
A server without any clients ain’t no fun. Thus, let’s create a new file
client.c
- also with the entry point main()
calling the function
connect_to_server()
. As always, I included all the header files we’ll need
throughout.
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <limits.h>
#include <librsync.h>
#include "common.h"
#define IP_ADDRESS "127.0.0.1"
static int connect_to_server(const char *ip_addr);
int main(int argc, char *argv[]) {
puts("Connecting to server...");
int sock = connect_to_server(IP_ADDRESS);
if (sock == -1) {
return EXIT_FAILURE;
}
close(sock);
return EXIT_SUCCESS;
}
static int connect_to_server(const char *ip_addr) {
/* Create socket */
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
perror("Failed to create socket");
return -1;
}
/* Assign IP address and port */
struct sockaddr_in addr = { 0 };
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip_addr);
addr.sin_port = htons(PORT);
/* Connect to server */
int ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
if (ret == -1) {
perror("Failed to connect");
close(sock);
return -1;
}
return sock;
}
We create a new socket, just like in the server program. However, for the
client, we call the connect()
function using the loop-back address
127.0.0.1
. There is nothing more to it. It’s pretty straightforward.
By now, we are ready to compile the client. However, I cannot talk on behalf of
everyone else, but I, for one, am tired of typing these gcc
commands. Instead,
let’s create a new file, Makefile
, to take care of this once and for all.
CFLAGS = -g -Wall -Wextra -Wconversion
LDLIBS = -lrsync
.PHONY: all clean
all: server client
server: server.c common.h
client: client.c common.h
clean:
rm -f server
rm -f client
rm -f *.o
Perfect! Now, we compile both our programs by simply running this one command:
make
cc -g -Wall -Wextra -Wconversion server.c common.h -lrsync -o server
---snip---
cc -g -Wall -Wextra -Wconversion client.c common.h -lrsync -o client
---snip---
Don’t worry about any warnings related to unused arguments. We will take care of these later.
Now, let’s open two terminal sessions. In the first one, we run the server:
./server
Waiting for connection...
In the second one, we run the client:
./client
Connecting to server...
It works! Although, there’s not much to it yet. The client connects to the server. Then, both ends close the connection and exit.
Implementing a communication protocol
By now, we should be able to send and receive messages. However, knowing when the other end is finished sending and ready to receive remains an issue for us to solve. We can solve it by building an application layer network protocol on top of TCP/IP. Most importantly, we need a cool name, like SNP (short for Simple Network Protocol.) Yup, that name is definitely original and has never been used before. Trust me!
Network protocols usually send data in chunks called PDUs (Protocol Data Units). Furthermore, PDUs are built from a protocol header and a payload, often called an SDU (Service Data Unit). The SDU usually contains the actual data that you want to transmit. While the protocol header provides information such as the payload size and whether to expect more data (among other things).
We should aim for restricted bandwidth consumption and forward compatibility when designing the protocol. In other words, we want to pack the required information into as few bits as possible while reserving some for the future. Hence, I suggest a protocol header consisting of 16 bits distributed as follows:
SDU Len | Reserved | EoF Flag |
---|---|---|
12 bits | 3 bits | 1 bit |
- SDU Len: 12 bits are allocated to indicate the payload length within the datagram. Providing a total length of 4095 bytes.
- Reserved: 3 bits are reserved for future use.
- EoF Flag: 1 bit determines whether the receiver should expect to receive more PDUs.
Now that we have a bulletproof design, we can implement two utility functions
for sending and receiving messages. Add the following code to common.h
:
/** It's the largest value we can represent with the 12 bits in the SDU Length
* header field (2^12-1 = 4095). Hence, it's the largest allowed payload.
*/
#define BUFFER_SIZE 4095
static int send_message(int sock, const char *msg, size_t len, int eof) {
assert(len <= BUFFER_SIZE);
uint16_t header = (uint16_t)len << 4;
/* Set EoF flag */
if (eof != 0) {
header |= 1;
}
/* Send header */
header = htons(header);
size_t n_bytes = 0;
do {
ssize_t ret = write(sock, &header + n_bytes, sizeof(header) - n_bytes);
if (ret < 0) {
if (errno == EINTR) {
/* The call was interrupted by a signal before any data was
written. It is safe to continue. */
continue;
}
perror("Failed to send message header");
return -1;
}
n_bytes += (size_t)ret;
} while (n_bytes < sizeof(header));
if (len > 0) {
/* Send payload */
n_bytes = 0;
do {
ssize_t ret = write(sock, msg + n_bytes, len - n_bytes);
if (ret < 0) {
if (errno == EINTR) {
/* The call was interrupted by a signal before any data was
written. It is safe to continue. */
continue;
}
perror("Failed to send message payload");
return -1;
}
n_bytes += (size_t)ret;
} while (n_bytes < len);
}
return 0;
}
static int recv_message(int sock, char *msg, size_t *len, int *eof) {
/* Receive header */
uint16_t header;
size_t n_bytes = 0;
do {
ssize_t ret = read(sock, &header + n_bytes, sizeof(header) - n_bytes);
if (ret < 0) {
if (errno == EINTR) {
/* The call was interrupted by a signal before any data was
read. It is safe to continue. */
continue;
}
perror("Failed to receive message header");
return -1;
}
n_bytes += (size_t)ret;
} while (n_bytes < sizeof(header));
header = ntohs(header);
/* Extract EOF flag */
*eof = header & 1;
/* Extract message length */
*len = header >> 4;
if (*len > 0) {
/* Read payload */
n_bytes = 0;
do {
ssize_t ret = read(sock, msg + n_bytes, *len - n_bytes);
if (ret < 0) {
if (errno == EINTR) {
/* The call was interrupted by a signal before any data was
read. It is safe to continue. */
continue;
}
perror("Failed to receive message payload");
return -1;
}
n_bytes += (size_t)ret;
} while (n_bytes < *len);
}
return 0;
}
The function send_message()
marshals and sends the protocol header, followed
by the payload. The recv_message()
function reads and unmarshals the protocol
header first. Based on the header fields, it determines how many bytes to read
to get the entire payload. Furthermore, it determines whether to expect more
PDUs.
Different computer architectures can have different byte orders. Hence, we must
convert the 16-bit header from host to network byte order and back. Otherwise,
the header can be misinterpreted. Please note the use of htons()
and ntohs()
in send_message()
and recv_message()
.
Let’s test the protocol by adding two calls to send_message()
in client.c
.
In the second call to send_message()
, we set the End-of-File flag.
int main(int argc, char *argv[]) {
...
const char *first_msg = "Hello";
if (send_message(sock, first_msg, strlen(first_msg), 0) == -1) {
close(sock);
return EXIT_FAILURE;
}
const char *second_msg = " from client\n";
if (send_message(sock, second_msg, strlen(second_msg), 1) == -1) {
close(sock);
return EXIT_FAILURE;
}
close(sock);
return EXIT_SUCCESS;
}
Similarly, we repeat calling the recv_message()
function in server.c
and
print the contents received in the buffer until the End-of-File flag gets set.
int main(int argc, char *argv[])
{
...
int eof;
size_t len;
char buffer[BUFFER_SIZE + 1 /* 1 extra to add terminating null-byte */];
do {
int ret = recv_message(sock, buffer, &len, &eof);
if (ret == -1) {
close(sock);
return EXIT_FAILURE;
}
buffer[len] = '\0';
printf("%s", buffer);
} while (!eof);
close(sock);
return EXIT_SUCCESS;
}
Again, we can build the program with a simple command thanks to our Makefile
:
make
cc -g -Wall -Wextra -Wconversion server.c common.h -lrsync -o server
---snip---
cc -g -Wall -Wextra -Wconversion client.c common.h -lrsync -o client
---snip---
Now, we can run the server and the client in two terminal sessions and see that messages are sent and received. Furthermore, we can see that the server knows when to exit, thanks to the End-of-File flag.
./server
Waiting for connection...
Hello from client
./client
Connecting to server...
With a fully functional and battle-tested network protocol, we are ready to
start implementing efficient file streams. N.B. Remember to remove the temporary
calls to send_message()
and recv_message()
before proceeding with the
tutorial.
Computing & sending the signature
The first step in implementing efficient file streams using the rsync protocol involves computing a signature of the basis file and sending this to the server. The basis file is basically the client’s copy of the file. Furthermore, the signature lets the server determine which parts of the client’s copy are up-to-date.
In the main()
function of client.c
, we’ll add some code to extract the path
of the basis file from the command-line arguments. We will assume that the user
wants to use the IO streams if no arguments are passed. I.e., we will read from
stdin
and write to stdout
.
static int send_signature(int sock, const char *fname);
int main(int argc, char *argv[]) {
/* Parse arguments (use stdin and stdout if no argument) */
const char *fname = (argc >= 2) ? argv[1] : NULL;
puts("Connecting to server...");
int sock = connect_to_server(IP_ADDRESS);
if (sock == -1) {
return EXIT_FAILURE;
}
puts("Sending signature...");
int ret = send_signature(sock, fname);
if (ret == -1) {
close(sock);
return EXIT_FAILURE;
}
close(sock);
return EXIT_SUCCESS;
}
Furthermore, we’ll declare a function, send_signature(),
in client.c
and
call it from main()
. Now, let’s define it:
static int send_signature(int sock, const char *fname) {
/* Make sure the basis file exists, unless it is stdin */
const int use_io_stream = (fname == NULL) || (strcmp(fname, "-") == 0);
if (!use_io_stream) {
int fd = open(fname, O_RDONLY | O_CREAT);
if (fd != -1) {
close(fd);
}
}
/* Open basis file */
FILE *file = rs_file_open(fname, "rb", 0);
assert(file != NULL);
rs_file_close(file);
return 0;
}
To compute the signature of the basis file, we’ll first need to open it.
Unfortunately, to my knowledge, there is no good way of opening a file in
read-only mode and creating it if it does not exist. I.e., when using the
fopen(3)
API. Hence, we will have to do this in two steps. First, we open it
with the O_CREAT
flag (unless it’s stdin
) to ensure the file exists while
avoiding truncation. Next, we open it in read-only mode.
The library librsync
provides handy utility functions for opening and closing
files. The function rs_file_open()
provides a platform-independent way to open
large binary files. Furthermore, it uses stdin
for reading and stdout
for
writing if NULL
or "-"
is passed as the filename argument. The function
rs_file_close()
provides special handling to avoid actually closing stdin
and stdout
.
The Streaming API in librsync
is designed around starting jobs, filling the
input buffer, iterating the job, and draining the output buffer. Hence, we’ll
need to allocate two buffers for our jobs. Add the following line just before
the function declarations in client.c
:
static char in_buf[BUFFER_SIZE * 2], out_buf[BUFFER_SIZE];
Given that our network protocol can send chunks of BUFFER_SIZE
bytes. And that
there may still be leftovers in the buffers after iterating a job. We need a
BUFFER_SIZE * 2
length input buffer to contain a new message in addition to
leftovers.
The file stream can be optimized through parameters given to the signature job.
Thankfully, we don’t have to worry too much about them. The reason is that
librsync
provides a helper function rs_sig_args()
that can be used to get
the recommended arguments based on the input file’s size.
The documentation explicitly says to use -1
in the input argument of
rs_sig_args()
if the file size is unknown, as it would be if we read from
stdin
. Once again, librsync
provides a utility function rs_file_size()
to
get the file size in a platform-independent way and return -1
if the size
cannot be determined due to not being a regular file.
static int send_signature(int sock, const char *fname) {
...
/* Get file size */
rs_long_t fsize = rs_file_size(file);
/* Get recommended arguments */
rs_magic_number sig_magic = 0;
size_t block_len = 0, strong_len = 0;
rs_result res = rs_sig_args(fsize, &sig_magic, &block_len, &strong_len);
if (res != RS_DONE) {
rs_file_close(file);
return -1;
}
rs_file_close(file);
return 0;
}
With what is hopefully close to optimal parameters, we can start the signature
job with a call to rs_sig_begin()
. Furthermore, we’ll set up a rs_buffers_t
struct, pointing the next_out
attribute to the output buffer. The avail_out
attribute specifies the remaining free space in the output buffer before
iterating the job. We’ll set that to BUFFER_SIZE,
given the limitations of our
network protocol. Bear in mind that this attribute will be updated to hold the
size of the unused buffer after iterating the job.
static int send_signature(int sock, const char *fname) {
...
/* Start generating signature */
rs_job_t *job = rs_sig_begin(block_len, strong_len, sig_magic);
assert(job != NULL);
/* Setup buffers */
rs_buffers_t bufs = { 0 };
bufs.next_out = out_buf;
bufs.avail_out = BUFFER_SIZE; /* We cannot send more in one message */
rs_job_free(job);
rs_file_close(file);
return 0;
}
Now comes the hard part. We will iterate the job in a do-while loop until
rs_job_iter()
returns RS_DONE
. Unless End-of-File has been reached or the
input buffer is already full, we try to fill it.
There is a potential for leftover input data after the last job iteration. If this is the case, we must move it to the front of the buffer.
Finally, we fill the input buffer by appending to the input buffer with a call
to fread().
Furthermore, we check if the End-of-File is reached and update the
eof_in
attribute in the rs_buffers_t
struct to inform the rsync algorithm.
Also, we set the next_in
attribute to point to the start of the input buffer,
in addition to updating avail_in
with the number of bytes read.
static int send_signature(int sock, const char *fname) {
...
/* Generate signature */
do {
if ((bufs.eof_in == 0) && (bufs.avail_in < sizeof(in_buf))) {
if (bufs.avail_in > 0) {
/* Leftover tail data, move it to front */
memmove(in_buf, bufs.next_in, bufs.avail_in);
}
/* Fill input buffer */
size_t n_bytes = fread(in_buf + bufs.avail_in, 1, sizeof(in_buf) - bufs.avail_in, file);
if (n_bytes == 0) {
if (ferror(file)) {
perror("Failed to read file");
rs_file_close(file);
rs_job_free(job);
return -1;
}
/* End-of-File reached */
bufs.eof_in = feof(file);
assert(bufs.eof_in != 0);
}
bufs.next_in = in_buf;
bufs.avail_in += n_bytes;
}
} while (res != RS_DONE);
rs_job_free(job);
rs_file_close(file);
return 0;
}
With the input buffer filled, we can execute rs_job_iter()
to run the rsync
algorithm. The algorithm may drain the input buffer and fill the output buffer.
However, this is not always the case. Sending a message without a payload does
not make sense unless it is to signal the End-of-File. Hence, we check for
available data before draining the output buffer and updating the next_out
and
avail_out
attributes.
static int send_signature(int sock, const char *fname) {
...
/* Generate signature */
do {
...
/* Iterate job */
res = rs_job_iter(job, &bufs);
if (res != RS_DONE && res != RS_BLOCKED) {
rs_file_close(file);
rs_job_free(job);
return -1;
}
assert(bufs.next_out >= out_buf);
size_t present = (size_t)(bufs.next_out - out_buf);
if (present > 0 || res == RS_DONE) {
/* Drain output buffer */
assert(present <= BUFFER_SIZE);
int ret = send_message(sock, out_buf, present, (res == RS_DONE) ? 1 : 0);
if (ret == -1) {
rs_file_close(file);
rs_job_free(job);
return -1;
}
bufs.next_out = out_buf;
bufs.avail_out = BUFFER_SIZE;
}
} while (res != RS_DONE);
rs_job_free(job);
rs_file_close(file);
return 0;
}
That’s it for sending signatures. Next, we’ll look at receiving signatures on the server side.
Receiving the signature
Currently, the client computes and sends the signature to the server, but the
server does not even bother to receive it. Let’s change that by declaring a new
function recv_signature()
in server.c
and calling it from main().
The
recv_signature()
function will take an additional output parameter to hold
the received signature. Don’t forget to allocate the input and output buffers.
static char in_buf[BUFFER_SIZE * 2], out_buf[BUFFER_SIZE];
static int accept_connection(void);
static int recv_signature(int sock, rs_signature_t **sig);
int main(int argc, char *argv[])
{
...
puts("Receiving signature...");
rs_signature_t *sig;
int ret = recv_signature(sock, &sig);
if (ret == -1) {
close(sock);
return EXIT_FAILURE;
}
close(sock);
return EXIT_SUCCESS;
}
There isn’t much that needs to be explained from the definition of
receive_signature().
One notable difference is that the job started by the
call to rs_loadsig_begin()
completely handles the draining of the output
buffer for us. Ergo, one less worry on our minds. A more subtle difference is
that the input buffer is filled with messages from a client instead of file
contents. Thus, we ensure the buffer has at least BUFFER_SIZE
remaining space.
static int recv_signature(int sock, rs_signature_t **sig) {
rs_job_t *job = rs_loadsig_begin(sig);
assert(job != NULL);
/* Setup buffers */
rs_buffers_t bufs = { 0 };
rs_result res;
do {
if ((bufs.eof_in == 0) && (bufs.avail_in <= BUFFER_SIZE)) {
if (bufs.avail_in > 0) {
/* Leftover tail data, move it to front */
memmove(in_buf, bufs.next_in, bufs.avail_in);
}
size_t n_bytes;
int ret = recv_message(sock, in_buf + bufs.avail_in, &n_bytes, &bufs.eof_in);
if (ret == -1) {
rs_job_free(job);
return -1;
}
bufs.next_in = in_buf;
bufs.avail_in += n_bytes;
}
/* Iterate job */
res = rs_job_iter(job, &bufs);
if (res != RS_DONE && res != RS_BLOCKED) {
rs_job_free(job);
return -1;
}
/* The job should take care of draining the buffers */
} while (res != RS_DONE);
rs_job_free(job);
return 0;
}
If you want to test the code thus far. You can compile it with make
and run
the server as follows:
./server
Waiting for connection...
Receiving signature...
For the client, you can either use stdin
as illustrated below or provide a
file path as a command-line argument:
echo "foo bar baz" | ./client
Connecting to server...
Sending signature...
Now, let’s move on to generating deltas.
Generating & sending the delta
We can now compute a delta for the client to patch the basis file from its signature and the up-to-date file (available on the server).
You know the drill. We declare a new function, send_delta()
, and call it from
main()
. Furthermore, we get the filename of the up-to-date file from the
command-line arguments. This should probably be sent as an argument from the
client through the network protocol. But, hey! We want to keep it simple.
static int accept_connection(void);
static int recv_signature(int sock, rs_signature_t **sig);
static int send_delta(int sock, rs_signature_t *sig, const char *fname);
int main(int argc, char *argv[])
{
/* Parse arguments (use stdin if no argument) */
const char *fname = (argc >= 2) ? argv[1] : NULL;
...
puts("Sending delta...");
ret = send_delta(sock, sig, fname);
rs_free_sumset(sig);
if (ret == -1) {
close(sock);
return EXIT_FAILURE;
}
close(sock);
puts("Success!");
return EXIT_SUCCESS;
}
We need to initialize the hash table with a call to rs_build_hash_table()
before using the signature to calculate the delta. Furthermore, we’ll need to
open the “up-to-date” file to fill the input buffer with its contents. The job
is started with rs_delta_begin()
. The remaining code is practically the same
as what we’ve seen thus far.
static int send_delta(int sock, rs_signature_t *sig, const char *fname) {
/* Build hash table */
rs_result res = rs_build_hash_table(sig);
if (res != RS_DONE) {
return -1;
}
/* Open file up-to-date file */
FILE *file = rs_file_open(fname, "rb", 0);
assert(file != NULL);
/* Start generating delta */
rs_job_t *job = rs_delta_begin(sig);
assert(job != NULL);
/* Setup buffers */
rs_buffers_t bufs = { 0 };
bufs.next_out = out_buf;
bufs.avail_out = BUFFER_SIZE; /* We cannot send more in one message */
do {
if ((bufs.eof_in == 0) && (bufs.avail_in < sizeof(in_buf))) {
if (bufs.avail_in > 0) {
/* Left over tail data, move to front */
memmove(in_buf, bufs.next_in, bufs.avail_in);
}
/* Fill input buffer */
size_t n_bytes = fread(in_buf + bufs.avail_in, 1, sizeof(in_buf) - bufs.avail_in, file);
if (n_bytes == 0) {
if (ferror(file)) {
perror("Failed to read file");
rs_file_close(file);
rs_job_free(job);
return -1;
}
bufs.eof_in = feof(file);
}
bufs.next_in = in_buf;
bufs.avail_in += n_bytes;
}
res = rs_job_iter(job, &bufs);
if (res != RS_DONE && res != RS_BLOCKED) {
rs_file_close(file);
rs_job_free(job);
return -1;
}
/* Drain output buffer, if there is data */
assert(bufs.next_out >= out_buf);
size_t present = (size_t)(bufs.next_out - out_buf);
if (present > 0 || res == RS_DONE) {
assert(present <= BUFFER_SIZE);
int ret = send_message(sock, out_buf, present, (res == RS_DONE) ? 1 : 0);
if (ret == -1) {
rs_file_close(file);
rs_job_free(job);
return -1;
}
bufs.next_out = out_buf;
bufs.avail_out = BUFFER_SIZE;
}
} while (res != RS_DONE);
rs_file_close(file);
rs_job_free(job);
return 0;
}
Receiving the delta & patching the file
Dare I say it? We declare a new function, recv_delta_and_patch_file()
, and
call it from main()
. Shocking indeed!
static int connect_to_server(const char *ip_addr);
static int send_signature(int sock, const char *fname);
static int recv_delta_and_patch_file(int sock, const char *fname);
int main(int argc, char *argv[]) {
...
puts("Receiving delta and patching file...");
ret = recv_delta_and_patch_file(sock, fname);
if (ret == -1) {
close(sock);
return EXIT_FAILURE;
}
close(sock);
return EXIT_SUCCESS;
}
The definition of recv_delta_and_patch_file()
has some new nuts and bolts.
It’s worth noticing that rs_patch_begin()
takes a callback function as an
argument. This callback is supposed to retrieve content from the basis file at
specific offsets during the job iterations. As we’ve seen multiple times
throughout this tutorial, librsync
provides a helper function,
rs_file_copy_cb
, to take care of this.
static int recv_delta_and_patch_file(int sock, const char *fname_old) {
const int use_io_stream = (fname_old == NULL) || (strcmp(fname_old, "-") == 0);
const char *fname_new = NULL;
char path[PATH_MAX];
if (!use_io_stream) {
int ret = snprintf(path, sizeof(path), "%s.new", fname_old);
if (ret < 0 || (size_t)ret >= sizeof(path)) {
fputs("Filename too long\n", stderr);
return -1;
}
fname_new = path;
}
/* Open new file */
FILE *new = rs_file_open(fname_new, "wb", 1);
assert(new != NULL);
/* Open basis file */
FILE *old = old = rs_file_open(fname_old, "rb", 0);
assert(old != NULL);
rs_job_t *job = rs_patch_begin(rs_file_copy_cb, old);
assert(job != NULL);
/* Setup RSYNC buffers */
rs_buffers_t bufs = { 0 };
bufs.next_out = out_buf;
bufs.avail_out = sizeof(out_buf);
rs_result res;
do {
if ((bufs.eof_in == 0) && (bufs.avail_in < BUFFER_SIZE)) {
if (bufs.avail_in > 0) {
/* Left over tail data, move to front */
memmove(in_buf, bufs.next_in, bufs.avail_in);
}
size_t n_bytes;
int ret = recv_message(sock, in_buf + bufs.avail_in, &n_bytes, &bufs.eof_in);
if (ret == -1) {
rs_file_close(new);
rs_file_close(old);
rs_job_free(job);
return -1;
}
bufs.next_in = in_buf;
bufs.avail_in += n_bytes;
}
res = rs_job_iter(job, &bufs);
if (res != RS_DONE && res != RS_BLOCKED) {
rs_file_close(new);
rs_file_close(old);
rs_job_free(job);
return -1;
}
/* Drain output buffer, if there is data */
assert(bufs.next_out >= out_buf);
size_t present = (size_t)(bufs.next_out - out_buf);
if (present > 0) {
size_t n_bytes = fwrite(out_buf, 1, present, new);
if (n_bytes == 0) {
perror("Failed to write to file");
rs_file_close(new);
rs_file_close(old);
rs_job_free(job);
return -1;
}
bufs.next_out = out_buf;
bufs.avail_out = sizeof(out_buf);
}
} while (res != RS_DONE);
rs_file_close(new);
rs_file_close(old);
rs_job_free(job);
if (!use_io_stream) {
int ret = rename(fname_new, fname_old);
if (ret == -1) {
perror("Failed to swap basis file with patched file");
return -1;
}
}
return 0;
}
While running the job, we must be careful not to modify the basis file. Instead,
we’ll write the patched content to a new file, <FILENAME>.new
, and replace it
with the basis file once the job is complete. We’ll make an exception when using
file streams. In this case, we are already reading and writing from different
files (stdin
and stdout
). Furthermore, attempting to replace stdout
does
not sound like a good idea.
Now, let’s compile our programs with make
:
make
cc -g -Wall -Wextra -Wconversion server.c common.h -lrsync -o server
cc -g -Wall -Wextra -Wconversion client.c common.h -lrsync -o client
By now, all the warnings should have cleared. Let’s run the server by piping
some text into stdin
:
echo "foo bar baz" | ./server
Waiting for connection...
Receiving signature...
Sending delta...
Finally, let’s run the client by piping some slightly different text into
stdin
:
echo "foo baz bar" | ./client
Connecting to server...
Sending signature...
Receiving delta and patching file...
foo bar baz
Voila, the patched content is written to stdout
.
Trail by fire: Let’s see if it holds up
We have proven that we can send a stream of bytes from one side to the other. However, is it efficient? To find out, let’s first add some code to print the number of bytes sent and received by the client.
Declare a variable to hold the total number of bytes sent just before the
do-while loop in send_signature()
in client.c
:
size_t tot_bytes_sent = 0;
Next, add the number of bytes sent to the total just after the call to
send_message()
toward the end of the do-while loop:
tot_bytes_sent += present;
Finally, print the result just before returning from send_signature()
:
printf("Sent %zu bytes\n", tot_bytes_sent);
Similarly, in recv_delta_and_patch_file(),
declare a variable to hold the
total number of bytes received just before the do-while loop:
size_t tot_bytes_received = 0;
Next, add the number of bytes received to the total just after the call to
recv_message()
toward the start of the do-while loop:
tot_bytes_received += n_bytes;
And finally, print the result just before returning from
recv_delta_and_patch_file()
:
printf("Received %zu bytes\n", tot_bytes_received);
Now let’s create the basis file by dumping 1 GB worth of random data into it:
head -c 1G /dev/urandom > bar.txt
Next, we create the up-to-date file by concatenating 500 MB to the basis file:
head -c 500M /dev/urandom | cat bar.txt - > foo.txt
Finally, compile the program with make
and run the server with foo.txt
as
the argument:
./server foo.txt
Waiting for connection...
Receiving signature...
Sending delta...
In another shell, run the client with bar.txt
as an argument:
./client bar.txt
Connecting to server...
Sending signature...
Sent 1179660 bytes
Receiving delta and patching file...
Received 524312014 bytes
From the client’s output, we can see that the client sent approximately 1.2 MB and received around 524.3 MB, which adds up to 525.5 MB. This is considerably less than having to transmit the entire 1,5 GB file.
Now, let’s make sure the files are actually identical:
diff -s foo.txt bar.txt
Files foo.txt and bar.txt are identical
Which they indeed are!
Conclusion
Congratulations! You have successfully implemented efficient file streaming. One
can argue that the “trial by fire” is synthetic and does not adhere to the real
world. However, in a future blog post, we plan to show the performance benefits
of updating the CFEngine MPF (Masterfiles Policy
Framework)
from one version to another using CFEngine. CFEngine leverages the librsync
Stream API in the latest network protocol, and we saw a performance benefit of
over 80%. If you cannot wait for the next blog post, this experiment is also
featured in The agent is in - Episode
45.
Share your thoughts
You are now an expert user of the librsync
Streaming API. If you choose to use
this code as a launch pad for an upcoming project or have any questions about
the code, let us know at GitHub
Discussions. We are eager to hear
from you.