Efficient file streams with librsync

Posted by Lars Erik Wik
March 6, 2025

In the latest version of the CFEngine network protocol (filestream - v4), we leveraged librsync for efficient file copying using their Streaming API.

While implementing the file streaming in CFEngine, I found that the documentation on the Streaming API was a bit unclear. Thus, I created two example programs to experiment with how it works. I thought I’d share them in this blog post as a tutorial to help other developers get up to speed faster.

The complete files after following this tutorial should look more or less like these:

Implementing a server

To implement efficient file streaming, we’ll first need a client and a server to connect and communicate through sockets. Let’s start with implementing the server, given that a client cannot function without a server.

We’ll start by creating a file server.c with the entry point main() calling a function accept_connection() (I also included all the header files we’ll need throughout this blog post).

server.c
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <librsync.h>

#include "common.h"

static int accept_connection(void);

int main(int argc, char *argv[])
{
    puts("Waiting for connection...");
    int sock = accept_connection();
    if (sock == -1) {
        return EXIT_FAILURE;
    }

    close(sock);
    return EXIT_SUCCESS;
}

static int accept_connection(void) {
    /* Create socket */
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock == -1) {
        perror("Failed to create socket");
        return -1;
    }

    /* Enable reuse address (to avoid "Address already in use" errors */
    int opt = 1;
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    /* Assign IP address and port */
    struct sockaddr_in server_addr = { 0 };
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htons(INADDR_ANY); /* Use local IP */
    server_addr.sin_port = htons(PORT);

    /* Bind socket to given IP address */
    int ret = bind(sock, (struct sockaddr *)&server_addr, sizeof(server_addr));
    if (ret == -1) {
        perror("Failed to bind socket");
        close(sock);
        return -1;
    }

    /* Listen for incoming connections. In a real-world application you should
       have a larger "connection request" queue. */
    ret = listen(sock, 1);
    if (ret == -1) {
        perror("Failed to listen");
        close(sock);
        return -1;
    }

    /* Accept incoming connection */
    struct sockaddr_in client_addr; socklen_t addr_len;
    int conn = accept(sock, (struct sockaddr *)&client_addr, &addr_len);

    /* We don't expect any more connections in this example. In a real-world
       application you should keep this socket open to accept more
       connections. */
    close(sock);

    if (conn == -1) {
        perror("Failed to accept");
        return -1;
    }

    return conn;
}

I won’t go into great detail about network programming. If there is something you don’t understand, I would highly recommend reading Beej’s Guide to Network Programming. It’s free and written in a language mortal programmers can understand.

Nonetheless, here is a short description of what the code does. We create an endpoint for communication through TCP/IP using the “system call” socket() which provides a file descriptor for reading and writing.

Next comes a little hack that can save you some time and frustration. Soon, we’ll bind our newly created socket to a port on our machine. However, when rerunning the server program, one might experience errors such as “Address already in use.” This is because parts of the socket may still linger in the kernel after exiting our program. We can either wait for the socket to clear or tell the kernel to allow the port to be reused.

Finally, we bind the server to a local IPv4 address and a port (that we will later specify using a macro definition), followed by listening for incoming connections.

In the call to listen(), I specified 1 as the backlog argument. You’d probably want a grander connection request queue in a real-world application. Otherwise, the server would accept the first connection and refuse all others while processing the first one.

The program blocks until a connection request is received. At this point, we will have to decide whether to accept. Of course - we’ll give the nod - because “Happiness can exist only in acceptance” (George Orwell).

The call to accept() returns a new connected socket for talking to the client. Given that we don’t expect any more connections in our example server program, we will close our listening socket and return the connected socket.

We are almost ready to compile our server program. However, we still need to define the PORT macro. We will add the definition in a shared header file, common.h. This way, the client program will also be able to access it. I took the liberty to include all of the header files that we’ll need throughout this blog post here as well.

common.h
#ifndef COMMON_H
#define COMMON_H

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <arpa/inet.h>
#include <librsync.h>

/** The port the server will be listening on / connecting to */
#define PORT 5612

#endif /* COMMON_H */

We should now be able to compile the server program using the following command.

command
gcc -g -o server server.c

Run it with the following command.

command
./server
output
Waiting for connection...
^C

However, no clients are connecting it. Thus, the program is blocked forever. Type CTRL+C to stop it.

Implementing a client

A server without any clients ain’t no fun. Thus, let’s create a new file client.c - also with the entry point main() calling the function connect_to_server(). As always, I included all the header files we’ll need throughout.

client.c
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <limits.h>
#include <librsync.h>

#include "common.h"

#define IP_ADDRESS "127.0.0.1"

static int connect_to_server(const char *ip_addr);

int main(int argc, char *argv[]) {
    puts("Connecting to server...");
    int sock = connect_to_server(IP_ADDRESS);
    if (sock == -1) {
        return EXIT_FAILURE;
    }

    close(sock);
    return EXIT_SUCCESS;
}

static int connect_to_server(const char *ip_addr) {
    /* Create socket */
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock == -1) {
        perror("Failed to create socket");
        return -1;
    }

    /* Assign IP address and port */
    struct sockaddr_in addr = { 0 };
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr(ip_addr);
    addr.sin_port = htons(PORT);

    /* Connect to server */
    int ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
    if (ret == -1) {
        perror("Failed to connect");
        close(sock);
        return -1;
    }

    return sock;
}

We create a new socket, just like in the server program. However, for the client, we call the connect() function using the loop-back address 127.0.0.1. There is nothing more to it. It’s pretty straightforward.

By now, we are ready to compile the client. However, I cannot talk on behalf of everyone else, but I, for one, am tired of typing these gcc commands. Instead, let’s create a new file, Makefile, to take care of this once and for all.

Makefile
CFLAGS = -g -Wall -Wextra -Wconversion
LDLIBS = -lrsync
.PHONY: all clean

all: server client

server: server.c common.h

client: client.c common.h

clean:
	rm -f server
	rm -f client
	rm -f *.o

Perfect! Now, we compile both our programs by simply running this one command:

command
make
output
cc -g -Wall -Wextra -Wconversion    server.c common.h  -lrsync -o server
 ---snip---
cc -g -Wall -Wextra -Wconversion    client.c common.h  -lrsync -o client
 ---snip---

Don’t worry about any warnings related to unused arguments. We will take care of these later.

Now, let’s open two terminal sessions. In the first one, we run the server:

command
./server
output
Waiting for connection...

In the second one, we run the client:

command
./client
output
Connecting to server...

It works! Although, there’s not much to it yet. The client connects to the server. Then, both ends close the connection and exit.

Implementing a communication protocol

By now, we should be able to send and receive messages. However, knowing when the other end is finished sending and ready to receive remains an issue for us to solve. We can solve it by building an application layer network protocol on top of TCP/IP. Most importantly, we need a cool name, like SNP (short for Simple Network Protocol.) Yup, that name is definitely original and has never been used before. Trust me!

Network protocols usually send data in chunks called PDUs (Protocol Data Units). Furthermore, PDUs are built from a protocol header and a payload, often called an SDU (Service Data Unit). The SDU usually contains the actual data that you want to transmit. While the protocol header provides information such as the payload size and whether to expect more data (among other things).

We should aim for restricted bandwidth consumption and forward compatibility when designing the protocol. In other words, we want to pack the required information into as few bits as possible while reserving some for the future. Hence, I suggest a protocol header consisting of 16 bits distributed as follows:

SDU Len Reserved EoF Flag
12 bits 3 bits 1 bit
  • SDU Len: 12 bits are allocated to indicate the payload length within the datagram. Providing a total length of 4095 bytes.
  • Reserved: 3 bits are reserved for future use.
  • EoF Flag: 1 bit determines whether the receiver should expect to receive more PDUs.

Now that we have a bulletproof design, we can implement two utility functions for sending and receiving messages. Add the following code to common.h:

common.h
/** It's the largest value we can represent with the 12 bits in the SDU Length
 * header field (2^12-1 = 4095). Hence, it's the largest allowed payload.
 */
#define BUFFER_SIZE 4095


static int send_message(int sock, const char *msg, size_t len, int eof) {
    assert(len <= BUFFER_SIZE);

    uint16_t header = (uint16_t)len << 4;

    /* Set EoF flag */
    if (eof != 0) {
        header |= 1;
    }

    /* Send header */
    header = htons(header);
    size_t n_bytes = 0;
    do {
        ssize_t ret = write(sock, &header + n_bytes, sizeof(header) - n_bytes);
        if (ret < 0) {
            if (errno == EINTR) {
                /* The call was interrupted by a signal before any data was
                   written. It is safe to continue. */
                continue;
            }
            perror("Failed to send message header");
            return -1;
        }
        n_bytes += (size_t)ret;
    } while (n_bytes < sizeof(header));

    if (len > 0) {
        /* Send payload */
        n_bytes = 0;
        do {
            ssize_t ret = write(sock, msg + n_bytes, len - n_bytes);
            if (ret < 0) {
                if (errno == EINTR) {
                    /* The call was interrupted by a signal before any data was
                       written. It is safe to continue. */
                    continue;
                }
                perror("Failed to send message payload");
                return -1;
            }
            n_bytes += (size_t)ret;
        } while (n_bytes < len);
    }

    return 0;
}

static int recv_message(int sock, char *msg, size_t *len, int *eof) {
    /* Receive header */
    uint16_t header;
    size_t n_bytes = 0;
    do {
        ssize_t ret = read(sock, &header + n_bytes, sizeof(header) - n_bytes);
        if (ret < 0) {
            if (errno == EINTR) {
                /* The call was interrupted by a signal before any data was
                   read. It is safe to continue. */
                continue;
            }
            perror("Failed to receive message header");
            return -1;
        }
        n_bytes += (size_t)ret;
    } while (n_bytes < sizeof(header));
    header = ntohs(header);

    /* Extract EOF flag */
    *eof = header & 1;

    /* Extract message length */
    *len = header >> 4;

    if (*len > 0) {
        /* Read payload */
        n_bytes = 0;
        do {
            ssize_t ret = read(sock, msg + n_bytes, *len - n_bytes);
            if (ret < 0) {
                if (errno == EINTR) {
                    /* The call was interrupted by a signal before any data was
                       read. It is safe to continue. */
                    continue;
                }
                perror("Failed to receive message payload");
                return -1;
            }
            n_bytes += (size_t)ret;
        } while (n_bytes < *len);
    }

    return 0;
}

The function send_message() marshals and sends the protocol header, followed by the payload. The recv_message() function reads and unmarshals the protocol header first. Based on the header fields, it determines how many bytes to read to get the entire payload. Furthermore, it determines whether to expect more PDUs.

Different computer architectures can have different byte orders. Hence, we must convert the 16-bit header from host to network byte order and back. Otherwise, the header can be misinterpreted. Please note the use of htons() and ntohs() in send_message() and recv_message().

Let’s test the protocol by adding two calls to send_message() in client.c. In the second call to send_message(), we set the End-of-File flag.

client.c
int main(int argc, char *argv[]) {
    ...

    const char *first_msg = "Hello";
    if (send_message(sock, first_msg, strlen(first_msg), 0) == -1) {
        close(sock);
        return EXIT_FAILURE;
    }

    const char *second_msg = " from client\n";
    if (send_message(sock, second_msg, strlen(second_msg), 1) == -1) {
        close(sock);
        return EXIT_FAILURE;
    }

    close(sock);
    return EXIT_SUCCESS;
}

Similarly, we repeat calling the recv_message() function in server.c and print the contents received in the buffer until the End-of-File flag gets set.

server.c
int main(int argc, char *argv[])
{
    ...

    int eof;
    size_t len;
    char buffer[BUFFER_SIZE + 1 /* 1 extra to add terminating null-byte */];
    do {
        int ret = recv_message(sock, buffer, &len, &eof);
        if (ret == -1) {
            close(sock);
            return EXIT_FAILURE;
        }
        buffer[len] = '\0';
        printf("%s", buffer);
    } while (!eof);

    close(sock);
    return EXIT_SUCCESS;
}

Again, we can build the program with a simple command thanks to our Makefile:

command
make
output
cc -g -Wall -Wextra -Wconversion    server.c common.h  -lrsync -o server
 ---snip---
cc -g -Wall -Wextra -Wconversion    client.c common.h  -lrsync -o client
 ---snip---

Now, we can run the server and the client in two terminal sessions and see that messages are sent and received. Furthermore, we can see that the server knows when to exit, thanks to the End-of-File flag.

command
./server
output
Waiting for connection...
Hello from client
command
./client
output
Connecting to server...

With a fully functional and battle-tested network protocol, we are ready to start implementing efficient file streams. N.B. Remember to remove the temporary calls to send_message() and recv_message() before proceeding with the tutorial.

Computing & sending the signature

The first step in implementing efficient file streams using the rsync protocol involves computing a signature of the basis file and sending this to the server. The basis file is basically the client’s copy of the file. Furthermore, the signature lets the server determine which parts of the client’s copy are up-to-date.

In the main() function of client.c, we’ll add some code to extract the path of the basis file from the command-line arguments. We will assume that the user wants to use the IO streams if no arguments are passed. I.e., we will read from stdin and write to stdout.

client.c
static int send_signature(int sock, const char *fname);

int main(int argc, char *argv[]) {
    /* Parse arguments (use stdin and stdout if no argument) */
    const char *fname = (argc >= 2) ? argv[1] : NULL;

    puts("Connecting to server...");
    int sock = connect_to_server(IP_ADDRESS);
    if (sock == -1) {
        return EXIT_FAILURE;
    }

    puts("Sending signature...");
    int ret = send_signature(sock, fname);
    if (ret == -1) {
        close(sock);
        return EXIT_FAILURE;
    }

    close(sock);
    return EXIT_SUCCESS;
}

Furthermore, we’ll declare a function, send_signature(), in client.c and call it from main(). Now, let’s define it:

client.c
static int send_signature(int sock, const char *fname) {
    /* Make sure the basis file exists, unless it is stdin */
    const int use_io_stream = (fname == NULL) || (strcmp(fname, "-") == 0);
    if (!use_io_stream) {
        int fd = open(fname, O_RDONLY | O_CREAT);
        if (fd != -1) {
            close(fd);
        }
    }

    /* Open basis file */
    FILE *file = rs_file_open(fname, "rb", 0);
    assert(file != NULL);

    rs_file_close(file);
    return 0;
}

To compute the signature of the basis file, we’ll first need to open it. Unfortunately, to my knowledge, there is no good way of opening a file in read-only mode and creating it if it does not exist. I.e., when using the fopen(3) API. Hence, we will have to do this in two steps. First, we open it with the O_CREAT flag (unless it’s stdin) to ensure the file exists while avoiding truncation. Next, we open it in read-only mode.

The library librsync provides handy utility functions for opening and closing files. The function rs_file_open() provides a platform-independent way to open large binary files. Furthermore, it uses stdin for reading and stdout for writing if NULL or "-" is passed as the filename argument. The function rs_file_close() provides special handling to avoid actually closing stdin and stdout.

The Streaming API in librsync is designed around starting jobs, filling the input buffer, iterating the job, and draining the output buffer. Hence, we’ll need to allocate two buffers for our jobs. Add the following line just before the function declarations in client.c:

client.c
static char in_buf[BUFFER_SIZE * 2], out_buf[BUFFER_SIZE];

Given that our network protocol can send chunks of BUFFER_SIZE bytes. And that there may still be leftovers in the buffers after iterating a job. We need a BUFFER_SIZE * 2 length input buffer to contain a new message in addition to leftovers.

The file stream can be optimized through parameters given to the signature job. Thankfully, we don’t have to worry too much about them. The reason is that librsync provides a helper function rs_sig_args() that can be used to get the recommended arguments based on the input file’s size.

The documentation explicitly says to use -1 in the input argument of rs_sig_args() if the file size is unknown, as it would be if we read from stdin. Once again, librsync provides a utility function rs_file_size() to get the file size in a platform-independent way and return -1 if the size cannot be determined due to not being a regular file.

client.c
static int send_signature(int sock, const char *fname) {
    ...

    /* Get file size */
    rs_long_t fsize = rs_file_size(file);

    /* Get recommended arguments */
    rs_magic_number sig_magic = 0;
    size_t block_len = 0, strong_len = 0;
    rs_result res = rs_sig_args(fsize, &sig_magic, &block_len, &strong_len);
    if (res != RS_DONE) {
        rs_file_close(file);
        return -1;
    }

    rs_file_close(file);
    return 0;
}

With what is hopefully close to optimal parameters, we can start the signature job with a call to rs_sig_begin(). Furthermore, we’ll set up a rs_buffers_t struct, pointing the next_out attribute to the output buffer. The avail_out attribute specifies the remaining free space in the output buffer before iterating the job. We’ll set that to BUFFER_SIZE, given the limitations of our network protocol. Bear in mind that this attribute will be updated to hold the size of the unused buffer after iterating the job.

client.c
static int send_signature(int sock, const char *fname) {
    ...

    /* Start generating signature */
    rs_job_t *job = rs_sig_begin(block_len, strong_len, sig_magic);
    assert(job != NULL);

    /* Setup buffers */
    rs_buffers_t bufs = { 0 };
    bufs.next_out = out_buf;
    bufs.avail_out = BUFFER_SIZE; /* We cannot send more in one message */

    rs_job_free(job);
    rs_file_close(file);
    return 0;
}

Now comes the hard part. We will iterate the job in a do-while loop until rs_job_iter() returns RS_DONE. Unless End-of-File has been reached or the input buffer is already full, we try to fill it.

There is a potential for leftover input data after the last job iteration. If this is the case, we must move it to the front of the buffer.

Finally, we fill the input buffer by appending to the input buffer with a call to fread(). Furthermore, we check if the End-of-File is reached and update the eof_in attribute in the rs_buffers_t struct to inform the rsync algorithm. Also, we set the next_in attribute to point to the start of the input buffer, in addition to updating avail_in with the number of bytes read.

client.c
static int send_signature(int sock, const char *fname) {
    ...

    /* Generate signature */
    do {
        if ((bufs.eof_in == 0) && (bufs.avail_in < sizeof(in_buf))) {
            if (bufs.avail_in > 0) {
                /* Leftover tail data, move it to front */
                memmove(in_buf, bufs.next_in, bufs.avail_in);
            }

            /* Fill input buffer */
            size_t n_bytes = fread(in_buf + bufs.avail_in, 1, sizeof(in_buf) - bufs.avail_in, file);
            if (n_bytes == 0) {
                if (ferror(file)) {
                    perror("Failed to read file");
                    rs_file_close(file);
                    rs_job_free(job);
                    return -1;
                }

                /* End-of-File reached */
                bufs.eof_in = feof(file);
                assert(bufs.eof_in != 0);
            }

            bufs.next_in = in_buf;
            bufs.avail_in += n_bytes;
        }

    } while (res != RS_DONE);

    rs_job_free(job);
    rs_file_close(file);
    return 0;
}

With the input buffer filled, we can execute rs_job_iter() to run the rsync algorithm. The algorithm may drain the input buffer and fill the output buffer. However, this is not always the case. Sending a message without a payload does not make sense unless it is to signal the End-of-File. Hence, we check for available data before draining the output buffer and updating the next_out and avail_out attributes.

client.c
static int send_signature(int sock, const char *fname) {
    ...

    /* Generate signature */
    do {
        ...

        /* Iterate job */
        res = rs_job_iter(job, &bufs);
        if (res != RS_DONE && res != RS_BLOCKED) {
            rs_file_close(file);
            rs_job_free(job);
            return -1;
        }

        assert(bufs.next_out >= out_buf);
        size_t present = (size_t)(bufs.next_out - out_buf);
        if (present > 0 || res == RS_DONE) {
            /* Drain output buffer */
            assert(present <= BUFFER_SIZE);
            int ret = send_message(sock, out_buf, present, (res == RS_DONE) ? 1 : 0);
            if (ret == -1) {
                rs_file_close(file);
                rs_job_free(job);
                return -1;
            }

            bufs.next_out = out_buf;
            bufs.avail_out = BUFFER_SIZE;
        }
    } while (res != RS_DONE);

    rs_job_free(job);
    rs_file_close(file);
    return 0;
}

That’s it for sending signatures. Next, we’ll look at receiving signatures on the server side.

Receiving the signature

Currently, the client computes and sends the signature to the server, but the server does not even bother to receive it. Let’s change that by declaring a new function recv_signature() in server.c and calling it from main(). The recv_signature() function will take an additional output parameter to hold the received signature. Don’t forget to allocate the input and output buffers.

server.c
static char in_buf[BUFFER_SIZE * 2], out_buf[BUFFER_SIZE];

static int accept_connection(void);
static int recv_signature(int sock, rs_signature_t **sig);

int main(int argc, char *argv[])
{
    ...

    puts("Receiving signature...");
    rs_signature_t *sig;
    int ret = recv_signature(sock, &sig);
    if (ret == -1) {
        close(sock);
        return EXIT_FAILURE;
    }

    close(sock);
    return EXIT_SUCCESS;
}

There isn’t much that needs to be explained from the definition of receive_signature(). One notable difference is that the job started by the call to rs_loadsig_begin() completely handles the draining of the output buffer for us. Ergo, one less worry on our minds. A more subtle difference is that the input buffer is filled with messages from a client instead of file contents. Thus, we ensure the buffer has at least BUFFER_SIZE remaining space.

server.c
static int recv_signature(int sock, rs_signature_t **sig) {
    rs_job_t *job = rs_loadsig_begin(sig);
    assert(job != NULL);

    /* Setup buffers */
    rs_buffers_t bufs = { 0 };

    rs_result res;
    do {
        if ((bufs.eof_in == 0) && (bufs.avail_in <= BUFFER_SIZE)) {
            if (bufs.avail_in > 0) {
                /* Leftover tail data, move it to front */
                memmove(in_buf, bufs.next_in, bufs.avail_in);
            }

            size_t n_bytes;
            int ret = recv_message(sock, in_buf + bufs.avail_in, &n_bytes, &bufs.eof_in);
            if (ret == -1) {
                rs_job_free(job);
                return -1;
            }

            bufs.next_in = in_buf;
            bufs.avail_in += n_bytes;
        }

        /* Iterate job */
        res = rs_job_iter(job, &bufs);
        if (res != RS_DONE && res != RS_BLOCKED) {
            rs_job_free(job);
            return -1;
        }

        /* The job should take care of draining the buffers */
    } while (res != RS_DONE);

    rs_job_free(job);
    return 0;
}

If you want to test the code thus far. You can compile it with make and run the server as follows:

command
./server
output
Waiting for connection...
Receiving signature...

For the client, you can either use stdin as illustrated below or provide a file path as a command-line argument:

command
echo "foo bar baz" | ./client
output
Connecting to server...
Sending signature...

Now, let’s move on to generating deltas.

Generating & sending the delta

We can now compute a delta for the client to patch the basis file from its signature and the up-to-date file (available on the server).

You know the drill. We declare a new function, send_delta(), and call it from main(). Furthermore, we get the filename of the up-to-date file from the command-line arguments. This should probably be sent as an argument from the client through the network protocol. But, hey! We want to keep it simple.

server.c
static int accept_connection(void);
static int recv_signature(int sock, rs_signature_t **sig);
static int send_delta(int sock, rs_signature_t *sig, const char *fname);

int main(int argc, char *argv[])
{
    /* Parse arguments (use stdin if no argument) */
    const char *fname = (argc >= 2) ? argv[1] : NULL;

    ...

    puts("Sending delta...");
    ret = send_delta(sock, sig, fname);
    rs_free_sumset(sig);
    if (ret == -1) {
        close(sock);
        return EXIT_FAILURE;
    }

    close(sock);
    puts("Success!");
    return EXIT_SUCCESS;
}

We need to initialize the hash table with a call to rs_build_hash_table() before using the signature to calculate the delta. Furthermore, we’ll need to open the “up-to-date” file to fill the input buffer with its contents. The job is started with rs_delta_begin(). The remaining code is practically the same as what we’ve seen thus far.

server.c
static int send_delta(int sock, rs_signature_t *sig, const char *fname) {
    /* Build hash table */
    rs_result res = rs_build_hash_table(sig);
    if (res != RS_DONE) {
        return -1;
    }

    /* Open file up-to-date file */
    FILE *file = rs_file_open(fname, "rb", 0);
    assert(file != NULL);

    /* Start generating delta */
    rs_job_t *job = rs_delta_begin(sig);
    assert(job != NULL);

    /* Setup buffers */
    rs_buffers_t bufs = { 0 };
    bufs.next_out = out_buf;
    bufs.avail_out = BUFFER_SIZE; /* We cannot send more in one message */

    do {
        if ((bufs.eof_in == 0) && (bufs.avail_in < sizeof(in_buf))) {
            if (bufs.avail_in > 0) {
                /* Left over tail data, move to front */
                memmove(in_buf, bufs.next_in, bufs.avail_in);
            }

            /* Fill input buffer */
            size_t n_bytes = fread(in_buf + bufs.avail_in, 1, sizeof(in_buf) - bufs.avail_in, file);
            if (n_bytes == 0) {
                if (ferror(file)) {
                    perror("Failed to read file");
                    rs_file_close(file);
                    rs_job_free(job);
                    return -1;
                }
                bufs.eof_in = feof(file);
            }

            bufs.next_in = in_buf;
            bufs.avail_in += n_bytes;
        }

        res = rs_job_iter(job, &bufs);
        if (res != RS_DONE && res != RS_BLOCKED) {
            rs_file_close(file);
            rs_job_free(job);
            return -1;
        }

        /* Drain output buffer, if there is data */
        assert(bufs.next_out >= out_buf);
        size_t present = (size_t)(bufs.next_out - out_buf);
        if (present > 0 || res == RS_DONE) {
            assert(present <= BUFFER_SIZE);
            int ret = send_message(sock, out_buf, present, (res == RS_DONE) ? 1 : 0);
            if (ret == -1) {
                rs_file_close(file);
                rs_job_free(job);
                return -1;
            }

            bufs.next_out = out_buf;
            bufs.avail_out = BUFFER_SIZE;
        }
    } while (res != RS_DONE);

    rs_file_close(file);
    rs_job_free(job);
    return 0;
}

Receiving the delta & patching the file

Dare I say it? We declare a new function, recv_delta_and_patch_file(), and call it from main(). Shocking indeed!

client.c
static int connect_to_server(const char *ip_addr);
static int send_signature(int sock, const char *fname);
static int recv_delta_and_patch_file(int sock, const char *fname);

int main(int argc, char *argv[]) {
    ...

    puts("Receiving delta and patching file...");
    ret = recv_delta_and_patch_file(sock, fname);
    if (ret == -1) {
        close(sock);
        return EXIT_FAILURE;
    }

    close(sock);
    return EXIT_SUCCESS;
}

The definition of recv_delta_and_patch_file() has some new nuts and bolts. It’s worth noticing that rs_patch_begin() takes a callback function as an argument. This callback is supposed to retrieve content from the basis file at specific offsets during the job iterations. As we’ve seen multiple times throughout this tutorial, librsync provides a helper function, rs_file_copy_cb, to take care of this.

client.c
static int recv_delta_and_patch_file(int sock, const char *fname_old) {
    const int use_io_stream = (fname_old == NULL) || (strcmp(fname_old, "-") == 0);

    const char *fname_new = NULL;
    char path[PATH_MAX];
    if (!use_io_stream) {
        int ret = snprintf(path, sizeof(path), "%s.new", fname_old);
        if (ret < 0 || (size_t)ret >= sizeof(path)) {
            fputs("Filename too long\n", stderr);
            return -1;
        }
        fname_new = path;
    }

    /* Open new file */
    FILE *new = rs_file_open(fname_new, "wb", 1);
    assert(new != NULL);

    /* Open basis file */
    FILE *old = old = rs_file_open(fname_old, "rb", 0);
    assert(old != NULL);

    rs_job_t *job = rs_patch_begin(rs_file_copy_cb, old);
    assert(job != NULL);

    /* Setup RSYNC buffers */
    rs_buffers_t bufs = { 0 };
    bufs.next_out = out_buf;
    bufs.avail_out = sizeof(out_buf);

    rs_result res;
    do {
        if ((bufs.eof_in == 0) && (bufs.avail_in < BUFFER_SIZE)) {
            if (bufs.avail_in > 0) {
                /* Left over tail data, move to front */
                memmove(in_buf, bufs.next_in, bufs.avail_in);
            }

            size_t n_bytes;
            int ret = recv_message(sock, in_buf + bufs.avail_in, &n_bytes, &bufs.eof_in);
            if (ret == -1) {
                rs_file_close(new);
                rs_file_close(old);
                rs_job_free(job);
                return -1;
            }

            bufs.next_in = in_buf;
            bufs.avail_in += n_bytes;
        }

        res = rs_job_iter(job, &bufs);
        if (res != RS_DONE && res != RS_BLOCKED) {
            rs_file_close(new);
            rs_file_close(old);
            rs_job_free(job);
            return -1;
        }

        /* Drain output buffer, if there is data */
        assert(bufs.next_out >= out_buf);
        size_t present = (size_t)(bufs.next_out - out_buf);
        if (present > 0) {
            size_t n_bytes = fwrite(out_buf, 1, present, new);
            if (n_bytes == 0) {
                perror("Failed to write to file");
                rs_file_close(new);
                rs_file_close(old);
                rs_job_free(job);
                return -1;
            }

            bufs.next_out = out_buf;
            bufs.avail_out = sizeof(out_buf);
        }
    } while (res != RS_DONE);

    rs_file_close(new);
    rs_file_close(old);
    rs_job_free(job);

    if (!use_io_stream) {
        int ret = rename(fname_new, fname_old);
        if (ret == -1) {
            perror("Failed to swap basis file with patched file");
            return -1;
        }
    }

    return 0;
}

While running the job, we must be careful not to modify the basis file. Instead, we’ll write the patched content to a new file, <FILENAME>.new, and replace it with the basis file once the job is complete. We’ll make an exception when using file streams. In this case, we are already reading and writing from different files (stdin and stdout). Furthermore, attempting to replace stdout does not sound like a good idea.

Now, let’s compile our programs with make:

command
make
output
cc -g -Wall -Wextra -Wconversion    server.c common.h  -lrsync -o server
cc -g -Wall -Wextra -Wconversion    client.c common.h  -lrsync -o client

By now, all the warnings should have cleared. Let’s run the server by piping some text into stdin:

command
echo "foo bar baz" | ./server
output
Waiting for connection...
Receiving signature...
Sending delta...

Finally, let’s run the client by piping some slightly different text into stdin:

command
echo "foo baz bar" | ./client
output
Connecting to server...
Sending signature...
Receiving delta and patching file...
foo bar baz

Voila, the patched content is written to stdout.

Trail by fire: Let’s see if it holds up

We have proven that we can send a stream of bytes from one side to the other. However, is it efficient? To find out, let’s first add some code to print the number of bytes sent and received by the client.

Declare a variable to hold the total number of bytes sent just before the do-while loop in send_signature() in client.c:

client.c
size_t tot_bytes_sent = 0;

Next, add the number of bytes sent to the total just after the call to send_message() toward the end of the do-while loop:

client.c
tot_bytes_sent += present;

Finally, print the result just before returning from send_signature():

client.c
printf("Sent %zu bytes\n", tot_bytes_sent);

Similarly, in recv_delta_and_patch_file(), declare a variable to hold the total number of bytes received just before the do-while loop:

client.c
size_t tot_bytes_received = 0;

Next, add the number of bytes received to the total just after the call to recv_message() toward the start of the do-while loop:

client.c
tot_bytes_received += n_bytes;

And finally, print the result just before returning from recv_delta_and_patch_file():

client.c
printf("Received %zu bytes\n", tot_bytes_received);

Now let’s create the basis file by dumping 1 GB worth of random data into it:

command
head -c 1G /dev/urandom > bar.txt

Next, we create the up-to-date file by concatenating 500 MB to the basis file:

command
head -c 500M /dev/urandom | cat bar.txt - > foo.txt

Finally, compile the program with make and run the server with foo.txt as the argument:

command
./server foo.txt
output
Waiting for connection...
Receiving signature...
Sending delta...

In another shell, run the client with bar.txt as an argument:

command
./client bar.txt
output
Connecting to server...
Sending signature...
Sent 1179660 bytes
Receiving delta and patching file...
Received 524312014 bytes

From the client’s output, we can see that the client sent approximately 1.2 MB and received around 524.3 MB, which adds up to 525.5 MB. This is considerably less than having to transmit the entire 1,5 GB file.

Now, let’s make sure the files are actually identical:

command
diff -s foo.txt bar.txt
output
Files foo.txt and bar.txt are identical

Which they indeed are!

Conclusion

Congratulations! You have successfully implemented efficient file streaming. One can argue that the “trial by fire” is synthetic and does not adhere to the real world. However, in a future blog post, we plan to show the performance benefits of updating the CFEngine MPF (Masterfiles Policy Framework) from one version to another using CFEngine. CFEngine leverages the librsync Stream API in the latest network protocol, and we saw a performance benefit of over 80%. If you cannot wait for the next blog post, this experiment is also featured in The agent is in - Episode 45.

Share your thoughts

You are now an expert user of the librsync Streaming API. If you choose to use this code as a launch pad for an upcoming project or have any questions about the code, let us know at GitHub Discussions. We are eager to hear from you.