COMS 4995 Advanced Systems Programming

Nonblocking I/O and I/O multiplexing

Nonblocking I/O

Two ways to make “slow” systems calls nonblocking:

  1. call open() with O_NONBLOCK
  2. call fcntl() to turn on O_NONBLOCK file status flag
    • file status flag is part of file table entry (the middle layer)

Nonblocking slow system call returns -1 with errno set to EAGAIN if it would have blocked

APUE presents set_fl() and clr_fl() wrapper functions on fcnt() for convenience. These wrappers let you set and clear a specified set of flags for the file descriptor without affecting the rest of the file status flags:

#include "apue.h"
#include <fcntl.h>

void set_fl(int fd, int flags) /* flags are file status flags to turn on */
{
        int		val;

        if ((val = fcntl(fd, F_GETFL, 0)) < 0)
                err_sys("fcntl F_GETFL error");

        val |= flags;		/* turn on flags */

        if (fcntl(fd, F_SETFL, val) < 0)
                err_sys("fcntl F_SETFL error");
}

void clr_fl(int fd, int flags) /* flags are file status flags to turn off */
{
        int		val;

        if ((val = fcntl(fd, F_GETFL, 0)) < 0)
                err_sys("fcntl F_GETFL error");

        val &= ~flags;		/* turn flags off */

        if (fcntl(fd, F_SETFL, val) < 0)
                err_sys("fcntl F_SETFL error");
}

Consider the following example program from APUE Figure 14.1 which reads 500,000 bytes from stdin and writes them to stdout. The stdout is set to non-blocking to demonstrate that, in some situations, write() may not be able to write the entire chunk at once:

char buf[500000];

int main(void)
{
    int     ntowrite, nwrite;
    char    *ptr;

    ntowrite = read(STDIN_FILENO, buf, sizeof(buf));
    fprintf(stderr, "read %d bytes\n", ntowrite);

    set_fl(STDOUT_FILENO, O_NONBLOCK);  /* set nonblocking */

    ptr = buf;
    while (ntowrite > 0) {
        errno = 0;
        nwrite = write(STDOUT_FILENO, ptr, ntowrite);
        fprintf(stderr, "nwrite = %d, errno = %d\n", nwrite, errno);

        if (nwrite > 0) {
            ptr += nwrite;
            ntowrite -= nwrite;
        }
    }

    clr_fl(STDOUT_FILENO, O_NONBLOCK);  /* clear nonblocking */
}

Below, we run the program with its stdin redirected to read from some large file and its stdout redirected to write to a file on disk named out. The write() system call succeeded in writing the entire chunk at once.

$ ./nonblockw <  /boot/config-6.2.0-1019-gcp > out
read 275150 bytes
nwrite = 275150, errno = 0

Now, instead of writing to a normal file, we write to a named pipe named mypipe. To accomplish this, we’ll have to read from the named pipe from another terminal window. (Recall that open() on a named pipe blocks until both ends are opened.)

$ # Run this in terminal 1
$ mkfifo mypipe
$ cat mypipe

<output redacted for brevity>

For convenience, redirect the stderr of the program to a file named err. We redact repeated error lines below for brevity.

$ # Run this in terminal 2
$ ./nonblockw <  /boot/config-6.2.0-1019-gcp > mypipe 2> err
$ cat err
read 275150 bytes
nwrite = 65536, errno = 0
nwrite = -1, errno = 11
nwrite = -1, errno = 11
nwrite = -1, errno = 11
nwrite = -1, errno = 11
nwrite = 65536, errno = 0
nwrite = -1, errno = 11
...
nwrite = -1, errno = 11
nwrite = 65536, errno = 0
nwrite = -1, errno = 11
...
nwrite = -1, errno = 11
nwrite = 65536, errno = 0
nwrite = -1, errno = 11
...
nwrite = -1, errno = 11
nwrite = 13006, errno = 0

The program’s write loop ran 100s of times, most of which were EAGAIN errors. Recall that pipes are fixed-size buffers and that writes block if the pipe is full.

I/O multiplexing

Consider the read-write loop in cat:

while ((n = read(STDIN_FILENO, buf, BUFSIZ)) > 0)
    if (write(STDOUT_FILENO, buf, n) != n)
        err_sys("write error");

netcat (nc) would have to perform two such loops at the same time:

Can netcat perform blocking I/O on two file descriptors at the same time without forking?

Use select() for I/O multiplexing! The API is shown below:

#include <sys/select.h>

int select(int maxfdp1, // max fd plus 1, or simply pass FD_SETSIZE

           fd_set *restrict readfds,   // see if they're ready for reading
           fd_set *restrict writefds,  // see if they're ready for writing
           fd_set *restrict exceptfds, // see if exceptional condition occurred
                                       // ex) urgent out-of-band data in TCP

           struct timeval *restrict tvptr); // timeout

        // Returns: count of ready descriptors, 0 on timeout, –1 on error

int FD_ISSET(int fd, fd_set *fdset);

        // Returns: nonzero if fd is in set, 0 otherwise

void FD_CLR(int fd, fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_ZERO(fd_set *fdset);

Things to note on select() API:

The example program below implements a select() server with the following features:

int init_server_socket(unsigned short port)
{
    int servsock;
    if ((servsock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        die("socket() failed");

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);
    if (bind(servsock, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
        die("bind() failed");

    if (listen(servsock, 5) < 0)
        die("listen() failed");

    return servsock;
}

static void sig_handler(int sig) {}

int main(int argc, char **argv)
{
    int i;
    int sock;
    char buf[100];
    socklen_t size;
    struct sockaddr_in client_name;
    fd_set active_fd_set, read_fd_set;

    if (argc != 2) {
        fprintf(stderr, "%s <port>\n", argv[0]);
        exit(1);
    }
    sock = init_server_socket(atoi(argv[1]));

    // catch SIGINT
    struct sigaction act, oact;
    act.sa_handler = sig_handler;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    act.sa_flags |= SA_RESTART;
    if (sigaction(SIGINT, &act, &oact) < 0)
        die("sigaction failed");

    FD_ZERO(&active_fd_set);
    FD_SET(sock, &active_fd_set);
    FD_SET(STDIN_FILENO, &active_fd_set);

    while (1) {
        read_fd_set = active_fd_set;

        if (select(FD_SETSIZE, &read_fd_set, NULL, NULL, NULL) < 0) {
            if (errno == EINTR) {
                fprintf(stderr, "%s\n", "select interrupted");
                continue;
            }
            die("select failed");
        }

        for (i = 0; i < FD_SETSIZE; ++i) {
            if (FD_ISSET(i, &read_fd_set)) {
                if (i == sock) {
                    size = sizeof(client_name);
                    int new = accept(sock,
                                     (struct sockaddr *) &client_name, &size);
                    if (new < 0) {
                        die("accept failed");
                    } else {
                        fprintf(stderr, "new connection from %s (fd:%d)\n",
                                inet_ntoa(client_name.sin_addr), new);
                        FD_SET(new, &active_fd_set);
                    }
                }
                else if (i == STDIN_FILENO) {
                    buf[0] = '\0';
                    read(i, buf, sizeof(buf));
                    if (strncmp(buf, "quit\n", 5) == 0) {
                        exit(0);
                    } else {
                        fprintf(stderr, "unknown command\n");
                    }
                }
                else{
                    int r = read(i, buf, sizeof(buf));
                    if (r < 0) {
                        die("read failed");
                    } else if (r == 0) {
                        fprintf(stderr, "connection closed (fd:%d)\n", i);
                        close(i);
                        FD_CLR(i, &active_fd_set);
                    } else {
                        write(STDOUT_FILENO, buf, r);
                    }
                }
            }
        }
    }
}

Last updated: 2024-02-20