[prev in list] [next in list] [prev in thread] [next in thread] 

List:       linux-aio
Subject:    Re: [rfc/patch] implement IO_CMD_EPOLL_WAIT
From:       Jeff Moyer <jmoyer () redhat ! com>
Date:       2006-11-16 22:50:30
Message-ID: x49velfvweh.fsf () segfault ! boston ! devel ! redhat ! com
[Download RAW message or body]

==> On Wed, 15 Nov 2006 13:42:34 +0530, Suparna Bhattacharya <suparna@in.ibm.com> said:

Suparna> BTW, the other lockdep warning turned out to be the unusemm
Suparna> lock inversion problem that had been discussed a while
Suparna> ago. With that fix, I now no longer get these warnings and
Suparna> your test app runs fine.


Great.  I only get one warning, and it seems unrelated:

BUG: sleeping function called from invalid context at kernel/rwsem.c:20
in_atomic():0, irqs_disabled():1
no locks held by aio-epoll-examp/4817.
 [<c0404efd>] dump_trace+0x63/0x1cd
 [<c0405081>] show_trace_log_lvl+0x1a/0x2f
 [<c0405636>] show_trace+0x12/0x14
 [<c04056ba>] dump_stack+0x16/0x18
 [<c041ff51>] __might_sleep+0xb3/0xb9
 [<c043ad79>] down_read+0x18/0x51
 [<c043255e>] blocking_notifier_call_chain+0x11/0x2d
 [<c042781c>] profile_munmap+0x11/0x13
 [<c046970b>] sys_munmap+0x1c/0x3f
 [<c0403ed8>] syscall_call+0x7/0xb
DWARF2 unwinder stuck at syscall_call+0x7/0xb
Leftover inexact backtrace:
 =======================

Here's the program I conjured up for testing.  It's still pretty
rough, so if I've goofed please let me know.

I've only glanced at your cleanups, but thus far they look really
nice.  I'll take a closer look at everything tomorow.

-Jeff

/*
 *  aio-epoll-example.c
 *
 *  Sample code to demonstrate how to use IOCB_CMD_EPOLL_WAIT to mix network
 *  and disk I/O in a single, asynchronous event loop.
 *
 *  author: Jeff Moyer <jmoyer@redhat.com>
 *
 *  Copyright 2006 Red Hat, Inc.
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/un.h>
#include <netdb.h>
#include <signal.h>
#include <errno.h>
#include <libgen.h>
#include <time.h>
#include <fcntl.h>
#include <libaio.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define debug(fd, fmt, args...) do { ; } while (0)

#define IOCB_CMD_EPOLL_WAIT	9
#define MAX_AIO_QUEUE_DEPTH	1024 /* XXX this needs to be much higher */
#define MAX_BUF_SIZE		(1024 * 1024) /* this should be gating nr */
#define READ_BUF_SIZE		(1024 * 4)

static int exiting = 0;
static int epfd;
static int total_outstanding;  /* number of outstanding AIO operations */
static long total_written = 0; /* total number of bytes written to disk */
static int nr_polls = 0;
static struct iocb *epoll_iocb;
char *basedir = NULL; /* base directory for output files */

struct client;

struct io_unit {
	struct iocb iocb;
	struct client *client;
};

struct client {
	int sock;
	int outfd;

	char *buffer;
	int buffer_length;
	int buffer_offset;
	loff_t file_offset;
	int nr_ious;
};

void
usage(char *prog)
{
	fprintf(stderr, "Usage: %s [-p <port>] -d <base directory>\n",
		basename(prog));
}

static void
io_prep_epoll_wait(struct iocb *iocb, int epfd,
		   struct epoll_event *events, int maxevents, int timeout)
{
	memset(iocb, 0, sizeof(*iocb));
	iocb->aio_fildes = epfd;
	iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
	iocb->aio_reqprio = 0;

	iocb->u.c.nbytes = maxevents;
	iocb->u.c.offset = timeout;
	iocb->u.c.buf = events;
}

void
exit_handler(int __attribute__((unused)) signo)
{
	exiting = 1;
}

struct io_unit *
alloc_iou(struct client *client)
{
	struct io_unit *iou;

	iou = malloc(sizeof(*iou));
	if (!iou) {
		perror("malloc");
		return NULL;
	}
	memset(iou, 0, sizeof(*iou));
	iou->client = client;

	return iou;
}

void
free_iou(struct io_unit *iou)
{
	memset(iou, 0xee, sizeof(*iou)); /* Catch use after free */
	free(iou);
}

void
finish_one_io(struct io_event *event)
{
	struct io_unit *iou = (struct io_unit *)event->obj;
	struct iocb *iocb = &iou->iocb;
	struct client *client = iou->client;
	int ret;

	if ((long)event->res < 0) {
		fprintf(stderr, "%lu byte write to file at offset %lld "
			"failed with error %lu\n", iocb->u.c.nbytes,
			iocb->u.c.offset, event->res);
	}

	total_written += iocb->u.c.nbytes;
	/*
	 *  We wait for all I/O's to complete before scheduling any more
	 *  for a given client.
	 */
	if (--client->nr_ious == 0) {
		struct epoll_event new_event;

		client->buffer_offset = 0;
		/* Add this fd back to the poll set */
		if (client->sock >= 0) {
			memset(&new_event, 0, sizeof(new_event));
			new_event.events = EPOLLIN;
			new_event.data.ptr = iou->client;
			ret = epoll_ctl(epfd, EPOLL_CTL_ADD,
					client->sock, &new_event);
			if (ret < 0) {
				if (errno == EEXIST)
					debug(stderr, "fix this coding error");
				else
					perror("epoll_ctl");
			} else
				debug(stderr, "Adding network client back "
					"onto poll list.\n");
		}
	}
	free_iou(iou);
	total_outstanding--;

	return;
}

int
buffer_full(struct client *client)
{
	int buffer_offset = client->buffer_offset;
	int buffer_length = client->buffer_length;

	if (buffer_offset == buffer_length)
		return 1;
	if (buffer_offset + READ_BUF_SIZE > buffer_length)
		return 1;
	return 0;
}

int
read_network_data(io_context_t ioctx, struct client *client)
{
	/* We need to save one slot in the queue for the epoll_wait */
	/* If the max queue depth is made tunable, then it may not be
	 * wise to make this a stack variable.
	 */
	struct iocb *iocbs[MAX_AIO_QUEUE_DEPTH-1], *iocb;
	int prepared = 0;
	int bytes_queued = 0;
	int bufsize = READ_BUF_SIZE, bytes_read;
	int iocbs_available;
	int ret, i, socket_closed = 0;
	struct io_unit *iou;

	/* read the data from the network and write it out to disk */
	iocbs_available = MAX_AIO_QUEUE_DEPTH - total_outstanding - 1;
	while(iocbs_available > 0 && !buffer_full(client)) {
		bytes_read = read(client->sock,
			client->buffer + client->buffer_offset, bufsize);
		if (bytes_read == -1) {
			/*
			 * The socket is non-blocking, so we should stop
			 * trying to read once we hit EAGAIN.
			 */
			if (errno == EAGAIN)
				break;

			close(client->sock);
			break;
		}

		if (bytes_read == 0) {
			/* Remote end closed the connection. */
			close(client->sock);
			fprintf(stdout, "remote host closed connection.\n");
			socket_closed = 1;
			break;
		}

		iou = alloc_iou(client);
		if (!iou) {
			fprintf(stderr, "Error: failed to allocate memory "
				"in the receive path.  Expect lost data.\n");
			break;
		}
		iocb = &iou->iocb;

		io_prep_pwrite(iocb, client->outfd,
			       client->buffer + client->buffer_offset,
			       bytes_read, client->file_offset);
		if (client->file_offset == -1) {
			fprintf(stderr, "client file offset -1... how?\n");
			for (;;);
		}
		client->buffer_offset += bytes_read;
		client->file_offset += bytes_read;
		bytes_queued += bytes_read;

		iocbs[prepared] = iocb;

		prepared++;
		iocbs_available--;
	}

	if (prepared) {
		ret = io_submit(ioctx, prepared, iocbs);
		if (ret < 0) {
			fprintf(stderr, "io_submit returned error %s(%d)\n",
				strerror(-ret), ret);
			for (i = 0; i < prepared; i++)
				free_iou((struct io_unit *)iocbs[i]);
			if (client->sock >= 0)
				close(client->sock);
			client->buffer_offset -= bytes_queued;
			client->file_offset -= bytes_queued;
			return -1;
		}

		total_outstanding += ret;
		/*
		 *  To handle the case of io_submit returnig that it
		 *  queued a partial number of our requests, we need to
		 *  update the offset in the file to reflect the actual
		 *  amount of data submitted.  We also need to clean up
		 *  the iocbs for which we will receive no completion.
		 */
		if (ret < prepared) {
			fprintf(stderr, "Error: io_submit only submitted "
				"%d I/O's and we requested %d.  Some data "
				"will be lost!\n", ret, prepared);
		}
		client->nr_ious += ret;
	}

	/*
	 *  If the buffer is full, then remove ourselves from the epoll
	 *  watch list.  We'll add the client fd back later, when the
	 *  I/O's have completed.
	 */
	if (buffer_full(client) || !iocbs_available) {
		struct epoll_event new_event;

		memset(&new_event, 0, sizeof(new_event));
		ret = epoll_ctl(epfd, EPOLL_CTL_DEL, client->sock, &new_event);
		if (ret < 0) {
			perror("read_network_data: epoll_ctl DEL");
		} else
			debug(stdout, "Removing network client from the poll list temporarily.\n");
	}

	return socket_closed ? 0 : 1;
}

/*
 *  Create a new file in the supplied base directory for a client.  All data
 *  from the client goes to this file.
 *
 *  Returns 0 on success, -1 on failure.
 */
int
new_client_file(struct client *client)
{
	struct sockaddr_in cliaddr;
	socklen_t clilen = sizeof(struct sockaddr_in);
	int fd;
	struct tm *tm;
	char date[128];
	time_t now;
	int namelen;
	char *filename;
	unsigned int ip;

	if (getpeername(client->sock,
			(struct sockaddr *) &cliaddr, &clilen) != 0) {
		perror("getpeername");
		return -1;
	}
	ip = ntohl(cliaddr.sin_addr.s_addr);

	now = time(NULL);
	tm = localtime(&now);
	strftime(date, sizeof(date), "%Y-%m-%d-%H:%M:%S", tm);

	/*
	 * The filename format is the provided base directory plus:
	 *   111.111.111.111-2006-03-16-18:55
	 * or <ip address>-<date>
	 * Thus, we reserve an extra 12 for the IP address, and 2 for a
	 * '/' and a '\0'.
	 */
	namelen = strlen(date) + strlen(basedir) + 12 + 2;
	filename = malloc(namelen);
	if (!filename) {
		perror("malloc");
		return -1;
	}

	/* 48 is the space required for the format + some padding
	 * The format is actually only 32 bytes max;  for example:
	 *   111.111.111.111-2006-03-16-18:55
	 * Of course, to that we prepend a '/' and append a '\0'.
	 */
	snprintf(filename, namelen, "%s/%d.%d.%d.%d-%s", basedir,
		 (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff,
		 (ip) & 0xff, date);

	fd = open(filename, O_WRONLY | O_CREAT | O_EXCL, 0644);
	if (fd < 0) {
		perror("open");
		free(filename);
		return -1;
	}

	client->outfd = fd;
	return 0;
}

struct client *
new_client(int connfd)
{
	struct client *client;
	int ret;

	client = malloc(sizeof(*client));
	if (!client) {
		perror("malloc");
		return NULL;
	}

	memset(client, 0, sizeof(client));
	client->sock = connfd;
	if (new_client_file(client) < 0) {
		free(client);
		return NULL;
	}

	ret = posix_memalign((void **)&client->buffer,
			     getpagesize(), MAX_BUF_SIZE);
	if (ret < 0) {
		fprintf(stderr, "posix_memalign failed with %d\n", ret);
		close(client->outfd);
		free(client);
		return NULL;
	}
	client->buffer_length = MAX_BUF_SIZE;

	return client;
}

void
destroy_client(struct client *client)
{
	if (client->outfd >= 0)
		close(client->outfd);
	if (client->sock >= 0)
		close(client->sock);
	memset(client, 0xee, sizeof(*client)); /* catch use after free */
	free(client);
}

void
process_epoll_event(io_context_t ioctx, struct epoll_event *epevent,
		    int listen_sock)
{
	struct epoll_event new_event;
	struct sockaddr_in cliaddr;
	socklen_t clilen;
	struct client *client;
	int connfd;
	int ret;

	if (!(epevent->events & EPOLLIN)) {
		static int printed = 0;
		if (!printed) {
			fprintf(stderr, "Error: epoll_wait returned events "
				"ready, but not what we asked for!\n");
			printed = 1;
		}
		return;
	}

	/*
	 *  If the data pointer is NULL, it is the listen
	 *  socket that is ready for reading.  If the data
	 *  pointer is non-NULL, then it points to a client
	 *  structure for an existing connection.
	 */
	if (epevent->data.ptr == NULL) {
		clilen = sizeof(cliaddr);
		while ((connfd = accept(listen_sock,
				       (struct sockaddr *)&cliaddr,
					&clilen)) < 0) {
			if (errno != EINTR) {
				perror("accept");
				return;
			}
		}

		/* make sure that we don't block */
		if (fcntl(connfd, F_SETFL, O_NONBLOCK) != 0) {
			perror("fcntl");
			close(connfd);
			return;
		}
		client = new_client(connfd);
		if (!client) {
			close(connfd);
			return;
		}

		memset(&new_event, 0, sizeof(new_event));
		new_event.events = EPOLLIN;
		new_event.data.ptr = client;
		if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &new_event) < 0) {
			perror("process_epoll_event: epoll_ctl ADD");
			close(connfd);
			destroy_client(client);
			return;
		} else
			debug(stderr,
				"Added new network client to poll list.\n");
	} else
		client = (struct client *)epevent->data.ptr;

	/* a return of 0 means EOF */
	ret = read_network_data(ioctx, client);
	if (ret <= 0) {
		debug(stderr,
			"Removed network client from poll list.\n");
		client->sock = -1;
		if (client->nr_ious == 0)
			destroy_client(client);
	}

	return;
}

void
process_events(io_context_t ioctx, struct io_event *events, int nr_events,
	       int listen_sock)
{
	int i, ret;
	struct epoll_event *epevent;
	struct iocb *finished;
	for (i = 0; i < nr_events; i++) {

		finished = events[i].obj;
		switch (finished->aio_lio_opcode) {
		case IOCB_CMD_EPOLL_WAIT:
			/*
			 *  Extract the epoll event pointer from the iocb.
			 */
			epevent = (struct epoll_event *)finished->u.c.buf;
			if (!epevent) {
				/* this should never happen */
				fprintf(stderr, "Error: epoll event returned "
					"with no epevent buffer!\n");
				break;
			}

			process_epoll_event(ioctx, epevent, listen_sock);
			nr_polls++;

			/* requeue the poll */
			bzero(epevent, sizeof(*epevent));
			io_prep_epoll_wait(epoll_iocb, epfd, epevent, 1, -1);
			ret = io_submit(ioctx, 1, &epoll_iocb);
			if (ret != 1) {
				fprintf(stdout,
					"Error: io_submit returned %d\n", ret);
				exit(1);
			}

			break;

		case IO_CMD_PWRITE:
			finish_one_io(&events[i]);
			break;

		default:
			fprintf(stderr, "Error: event completion "
				"received for unknown opcode %d\n",
				finished->aio_lio_opcode);
			break;
		}
	}
}

int
setup_listen_socket(short port)
{
	int sock;
	struct sockaddr_in servaddr;
	socklen_t servlen = sizeof(struct sockaddr_in);

	sock = socket(AF_INET, SOCK_STREAM, 0);
	if (sock < 0) {
		perror("socket");
		return -1;
	}

	bzero(&servaddr, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = INADDR_ANY;
	servaddr.sin_port = htons(port);

	if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
		perror("bind");
		return -1;
	}
	if (listen(sock, 1) < 0) {
		perror("listen");
		return -1;
	}

	/*
	 *  If no port was specified on the command line, report to the
	 *  user which one we picked.
	 */
	if (!port) {
		if (getsockname(sock,
				(struct sockaddr *)&servaddr, &servlen) < 0) {
			perror("getsockname");
			fprintf(stderr, "You might want to specify the port "
				"to use on the command line.\n");
			close(sock);
			return -1;
		}

		fprintf(stdout, "listening on port %d\n",
			ntohs(servaddr.sin_port));
	}

	return sock;
}

int
main(int argc, char **argv)
{
	short port = 0;
	int sock, ret;
	int c;
	io_context_t ioctx;
	struct io_event events[MAX_AIO_QUEUE_DEPTH];
	int nr_events;
	struct epoll_event epevent;

	while ((c = getopt(argc, argv, "p:d:")) != -1) {
		switch (c) {
		case 'p':
			port = atoi(optarg);
			break;
		case 'd':
			basedir = strdup(optarg);
			break;
		default:
			fprintf(stderr, "unknown option %c\n", c);
			usage(argv[0]);
			exit(1);
		}
	}

	if (!basedir) {
		fprintf(stderr, "Error: you must supply a base directory to "
			"store files\n");
		exit(1);
	}

	ret = io_queue_init(MAX_AIO_QUEUE_DEPTH, &ioctx);
	if (ret != 0) {
		fprintf(stdout, "io_setup returned %s(%d)\n",
			strerror(-ret), ret);
		exit(1);
	}

	epoll_iocb = malloc(sizeof(*epoll_iocb));
	if (!epoll_iocb) {
		perror("malloc");
		exit(1);
	}

	sock = setup_listen_socket(port);
	if (sock < 0)
		exit(1);

	/* register the socket with epoll */
	epfd = epoll_create(MAX_AIO_QUEUE_DEPTH);
	if (epfd < 0) {
		perror("epoll_create");
		exit(1);
	}

	bzero(&epevent, sizeof(epevent));
	epevent.events = EPOLLIN;
	/* we use NULL here to signify that the event belongs to the listen
	 * socket.  Other data pointers will reference client structures.
	 */
	epevent.data.ptr = NULL;
	if (epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &epevent) < 0) {
		perror("main: epoll_ctl ADD");
		exit(1);
	}

	bzero(&epevent, sizeof(epevent));
	io_prep_epoll_wait(epoll_iocb, epfd, &epevent, 1, -1);
	ret = io_submit(ioctx, 1, &epoll_iocb);
	if (ret != 1) {
		fprintf(stdout, "Error: io_submit returned %d\n", ret);
		exit(1);
	}

	signal(SIGINT, exit_handler);

	while (!exiting) {
		while ((nr_events = io_getevents(ioctx, 1, MAX_AIO_QUEUE_DEPTH,
						 events, NULL)) <= 0) {
			if (nr_events == -EINTR)
				break;
			fprintf(stdout,"Error: io_getevents returned %d\n",
				nr_events);
			exit(1);
		}

		if (nr_events > 0)
			process_events(ioctx, events, nr_events, sock);
	}

	fprintf(stdout, "Completed %d polls, wrote %ld bytes\n", nr_polls,
		total_written);
	exit(0);
}

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic