[prev in list] [next in list] [prev in thread] [next in thread]
List: linux-kernel
Subject: [PATCH,BETA] new pipe code
From: Manfred Spraul <manfreds () colorfullife ! com>
Date: 2000-02-29 21:52:48
[Download RAW message or body]
I've rewritten the pipe code:
the old code caused lots of reschedules under certain loads. The new
code is not yet fully tested (esp. O_NONBLOCK is untested), but I'm
really interested what you think about it.
I've described the new locking in <linux/pipe_fs_i.h>.
--
Manfred
["patch-pipe" (text/plain)]
// $Header$
// Kernel Version:
// VERSION = 2
// PATCHLEVEL = 3
// SUBLEVEL = 48
// EXTRAVERSION =
--- 2.3/fs/pipe.c Sun Feb 27 08:57:11 2000
+++ build-2.3/fs/pipe.c Tue Feb 29 22:36:02 2000
@@ -29,10 +29,13 @@
*
* Reads with count = 0 should always return 0.
* -- Julian Bradfield 1999-06-07.
+ *
+ * new locking code
+ * (c) 2000 Manfred Spraul <manfreds@colorfullife.com>
*/
/* Drop the inode semaphore and wait for a pipe event, atomically */
-static void pipe_wait(struct inode * inode)
+void pipe_wait(struct inode * inode)
{
DECLARE_WAITQUEUE(wait, current);
current->state = TASK_INTERRUPTIBLE;
@@ -47,7 +50,8 @@
pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
- ssize_t size, read, ret;
+ struct pipe_readerdesc pr;
+ ssize_t read, ret;
/* Seeks are not allowed on pipes. */
ret = -ESPIPE;
@@ -59,82 +63,91 @@
ret = 0;
if (count == 0)
goto out_nolock;
+ pr.tsk = current;
- /* Get the pipe semaphore */
- ret = -ERESTARTSYS;
- if (down_interruptible(PIPE_SEM(*inode)))
- goto out_nolock;
-
- if (PIPE_EMPTY(*inode)) {
-do_more_read:
- ret = 0;
+ spin_lock(PIPE_LOCK(*inode));
+ list_add_tail(&pr.list, PIPE_RLIST(*inode));
+ for(;;) {
+ if(PIPE_RLIST(*inode)->next == &pr.list)
+ break;
+ if (filp->f_flags & O_NONBLOCK) {
+ /* FIXME: it's possible that data is available.*/
+ ret = -EAGAIN;
+ goto out_unlink;
+ }
+ current->state=TASK_INTERRUPTIBLE;
+ spin_unlock(PIPE_LOCK(*inode));
+ schedule();
+ spin_lock(PIPE_LOCK(*inode));
+ ret = -ERESTARTSYS;
+ if (signal_pending(current))
+ goto out_unlink;
+ }
+ for(;;) {
+ int avail=PIPE_SIZE-PIPE_FREE(*inode);
+ if(avail != 0) {
+ int start;
+ if(avail>count)
+ avail=count;
+ /* transfer data*/
+ start = PIPE_START(*inode);
+ spin_unlock(PIPE_LOCK(*inode));
+
+ if(start+avail<=PIPE_SIZE) {
+ ret = copy_to_user(buf, PIPE_BASE(*inode)+start, avail);
+ } else {
+ int p1=PIPE_SIZE-start;
+ ret = copy_to_user(buf, PIPE_BASE(*inode)+start, p1);
+ ret |= copy_to_user(buf+p1, PIPE_BASE(*inode), avail-p1);
+ }
+ spin_lock(PIPE_LOCK(*inode));
+ if(ret!=0) {
+ read=0;
+ ret = -EFAULT;
+ goto out_unlink;
+ }
+ read += avail;
+ buf += avail;
+ count-=avail;
+ PIPE_LEN(*inode) -= avail;
+ /* Cache behaviour optimization */
+ if (!PIPE_LEN(*inode) && list_empty(PIPE_WLIST(*inode))) {
+ PIPE_START(*inode) = 0;
+ } else {
+ PIPE_START(*inode) = (start+avail)&(PIPE_SIZE-1);
+ }
+ /* wake up polling threads*/
+ wake_up(PIPE_WAIT(*inode));
+ /* wake up writing threads*/
+ if(!list_empty(PIPE_WLIST(*inode)))
+ wake_up_process(list_entry(PIPE_WLIST(*inode)->next,struct pipe_writerdesc,list)->tsk);
+ }
+ if(count<=0)
+ break;
+ /* wait for more data*/
if (!PIPE_WRITERS(*inode))
- goto out;
-
- ret = -EAGAIN;
- if (filp->f_flags & O_NONBLOCK)
- goto out;
-
- for (;;) {
- PIPE_WAITING_READERS(*inode)++;
- pipe_wait(inode);
- PIPE_WAITING_READERS(*inode)--;
- ret = -ERESTARTSYS;
- if (signal_pending(current))
- goto out_nolock;
- if (down_interruptible(PIPE_SEM(*inode)))
- goto out_nolock;
- ret = 0;
- if (!PIPE_EMPTY(*inode))
- break;
- if (!PIPE_WRITERS(*inode))
- goto out;
+ break;
+ if (filp->f_flags & O_NONBLOCK) {
+ ret = -EAGAIN;
+ break;
}
+ current->state=TASK_INTERRUPTIBLE;
+ spin_unlock(PIPE_LOCK(*inode));
+ schedule();
+ spin_lock(PIPE_LOCK(*inode));
+ ret = -ERESTARTSYS;
+ if (signal_pending(current))
+ goto out_unlink;
}
- /* Read what data is available. */
- ret = -EFAULT;
- while (count > 0 && (size = PIPE_LEN(*inode))) {
- char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
- ssize_t chars = PIPE_MAX_RCHUNK(*inode);
-
- if (chars > count)
- chars = count;
- if (chars > size)
- chars = size;
-
- if (copy_to_user(buf, pipebuf, chars))
- goto out;
-
- read += chars;
- PIPE_START(*inode) += chars;
- PIPE_START(*inode) &= (PIPE_SIZE - 1);
- PIPE_LEN(*inode) -= chars;
- count -= chars;
- buf += chars;
- }
-
- /* Cache behaviour optimization */
- if (!PIPE_LEN(*inode))
- PIPE_START(*inode) = 0;
-
- if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) {
- /*
- * We know that we are going to sleep: signal
- * writers synchronously that there is more
- * room.
- */
- wake_up_interruptible_sync(PIPE_WAIT(*inode));
- if (!PIPE_EMPTY(*inode))
- BUG();
- goto do_more_read;
+out_unlink:
+ if ((PIPE_RLIST(*inode)->next == &pr.list)&&
+ (PIPE_RLIST(*inode)->prev != &pr.list)) {
+ wake_up_process(list_entry(pr.list.next,struct pipe_readerdesc,list)->tsk);
}
- /* Signal writers asynchronously that there is more room. */
- wake_up_interruptible(PIPE_WAIT(*inode));
+ list_del(&pr.list);
- ret = read;
-out:
- up(PIPE_SEM(*inode));
+ spin_unlock(PIPE_LOCK(*inode));
out_nolock:
if (read)
ret = read;
@@ -145,7 +158,8 @@
pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
- ssize_t free, written, ret;
+ struct pipe_writerdesc pw;
+ ssize_t minfree, written, ret;
/* Seeks are not allowed on pipes. */
ret = -ESPIPE;
@@ -158,100 +172,95 @@
if (count == 0)
goto out_nolock;
- ret = -ERESTARTSYS;
- if (down_interruptible(PIPE_SEM(*inode)))
- goto out_nolock;
-
-do_more_write:
- /* No readers yields SIGPIPE. */
- if (!PIPE_READERS(*inode))
- goto sigpipe;
-
- /* If count <= PIPE_BUF, we have to make it atomic. */
- free = (count <= PIPE_BUF ? count : 1);
-
- /* Wait, or check for, available space. */
- if (filp->f_flags & O_NONBLOCK) {
- ret = -EAGAIN;
- if (PIPE_FREE(*inode) < free)
- goto out;
- } else {
- while (PIPE_FREE(*inode) < free) {
- PIPE_WAITING_WRITERS(*inode)++;
- pipe_wait(inode);
- PIPE_WAITING_WRITERS(*inode)--;
- ret = -ERESTARTSYS;
- if (signal_pending(current))
- goto out_nolock;
- if (down_interruptible(PIPE_SEM(*inode)))
- goto out_nolock;
-
- if (!PIPE_READERS(*inode))
- goto sigpipe;
+ pw.tsk = current;
+ minfree=1;
+ if (count <= PIPE_BUF)
+ minfree=count;
+
+ spin_lock(PIPE_LOCK(*inode));
+ list_add_tail(&pw.list, PIPE_WLIST(*inode));
+ for(;;) {
+ if (!PIPE_READERS(*inode))
+ goto sigpipe;
+ if(PIPE_WLIST(*inode)->next == &pw.list)
+ break;
+ if (filp->f_flags & O_NONBLOCK) {
+ /* FIXME: it's possible that data is available.*/
+ ret = -EAGAIN;
+ goto out_unlink;
}
+ current->state=TASK_INTERRUPTIBLE;
+ spin_unlock(PIPE_LOCK(*inode));
+ schedule();
+ spin_lock(PIPE_LOCK(*inode));
+ ret = -ERESTARTSYS;
+ if (signal_pending(current))
+ goto out_unlink;
}
-
- /* Copy into available space. */
- ret = -EFAULT;
- while (count > 0) {
- int space;
- char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);
- ssize_t chars = PIPE_MAX_WCHUNK(*inode);
-
- if ((space = PIPE_FREE(*inode)) != 0) {
- if (chars > count)
- chars = count;
- if (chars > space)
- chars = space;
-
- if (copy_from_user(pipebuf, buf, chars))
- goto out;
-
- written += chars;
- PIPE_LEN(*inode) += chars;
- count -= chars;
- buf += chars;
- space = PIPE_FREE(*inode);
- continue;
+ for(;;) {
+ int free=PIPE_FREE(*inode);
+ if(free>=minfree) {
+ int start;
+ if(free>count)
+ free=count;
+ /* transfer data*/
+ start = PIPE_END(*inode);
+ spin_unlock(PIPE_LOCK(*inode));
+
+ if(start+free<=PIPE_SIZE) {
+ ret = copy_from_user(PIPE_BASE(*inode)+start, buf, free);
+ } else {
+ int p1=PIPE_SIZE-start;
+
+ ret = copy_from_user(PIPE_BASE(*inode)+start, buf, p1);
+ ret |= copy_from_user(PIPE_BASE(*inode), buf+p1, free-p1);
+ }
+
+ spin_lock(PIPE_LOCK(*inode));
+ if(ret!=0) {
+ written=0;
+ ret = -EFAULT;
+ goto out_unlink;
+ }
+ written += free;
+ buf += free;
+ count-=free;
+ PIPE_LEN(*inode) += free;
+ /* wake up polling threads*/
+ wake_up(PIPE_WAIT(*inode));
+ /* wake up reading threads*/
+ if(!list_empty(PIPE_RLIST(*inode)))
+ wake_up_process(list_entry(PIPE_RLIST(*inode)->next,struct pipe_readerdesc,list)->tsk);
}
-
- ret = written;
- if (filp->f_flags & O_NONBLOCK)
+ if(count<=0)
break;
-
- do {
- /*
- * Synchronous wake-up: it knows that this process
- * is going to give up this CPU, so it doesnt have
- * to do idle reschedules.
- */
- wake_up_interruptible_sync(PIPE_WAIT(*inode));
- PIPE_WAITING_WRITERS(*inode)++;
- pipe_wait(inode);
- PIPE_WAITING_WRITERS(*inode)--;
- if (signal_pending(current))
- goto out_nolock;
- if (down_interruptible(PIPE_SEM(*inode)))
- goto out_nolock;
- if (!PIPE_READERS(*inode))
- goto sigpipe;
- } while (!PIPE_FREE(*inode));
- ret = -EFAULT;
- }
-
- if (count && PIPE_WAITING_READERS(*inode) &&
- !(filp->f_flags & O_NONBLOCK)) {
- wake_up_interruptible_sync(PIPE_WAIT(*inode));
- goto do_more_write;
+ /* wait for more free space*/
+ if (!PIPE_READERS(*inode))
+ goto sigpipe;
+ if (filp->f_flags & O_NONBLOCK) {
+ ret = -EAGAIN;
+ break;
+ }
+ current->state=TASK_INTERRUPTIBLE;
+ spin_unlock(PIPE_LOCK(*inode));
+ schedule();
+ spin_lock(PIPE_LOCK(*inode));
+ ret = -ERESTARTSYS;
+ if (signal_pending(current))
+ goto out_unlink;
}
- /* Signal readers asynchronously that there is more data. */
- wake_up_interruptible(PIPE_WAIT(*inode));
-
- inode->i_ctime = inode->i_mtime = CURRENT_TIME;
- mark_inode_dirty(inode);
-
-out:
- up(PIPE_SEM(*inode));
+
+ if (written) {
+ inode->i_ctime = inode->i_mtime = CURRENT_TIME;
+ mark_inode_dirty(inode);
+ }
+out_unlink:
+ if ((PIPE_WLIST(*inode)->next == &pw.list) &&
+ (PIPE_WLIST(*inode)->prev != &pw.list)) {
+ wake_up_process(list_entry(pw.list.next,struct pipe_writerdesc,list)->tsk);
+ }
+ list_del(&pw.list);
+ spin_unlock(PIPE_LOCK(*inode));
out_nolock:
if (written)
ret = written;
@@ -259,10 +268,10 @@
sigpipe:
if (written)
- goto out;
- up(PIPE_SEM(*inode));
+ goto out_unlink;
+ ret=-EPIPE;
send_sig(SIGPIPE, current, 0);
- return -EPIPE;
+ goto out_unlink;
}
static loff_t
@@ -381,6 +390,40 @@
return mask;
}
+void cleanup_pipe_data(struct inode* inode)
+{
+ struct pipe_inode_info *info = inode->i_pipe;
+ inode->i_pipe = NULL;
+ free_page((unsigned long) info->base);
+ kfree(info);
+}
+
+void pipe_no_readers(struct inode* inode)
+{
+ wake_up_interruptible(PIPE_WAIT(*inode));
+ spin_lock(PIPE_LOCK(*inode));
+ if(!list_empty(PIPE_WLIST(*inode))) {
+ struct pipe_writerdesc* pw;
+
+ pw = list_entry(PIPE_WLIST(*inode)->next,struct pipe_writerdesc,list);
+ wake_up_process(pw->tsk);
+ }
+ spin_unlock(PIPE_LOCK(*inode));
+}
+
+void pipe_no_writers(struct inode* inode)
+{
+ wake_up_interruptible(PIPE_WAIT(*inode));
+ spin_lock(PIPE_LOCK(*inode));
+ if(!list_empty(PIPE_RLIST(*inode))) {
+ struct pipe_readerdesc* pr;
+
+ pr = list_entry(PIPE_RLIST(*inode)->next,struct pipe_readerdesc,list);
+ wake_up_process(pr->tsk);
+ }
+ spin_unlock(PIPE_LOCK(*inode));
+}
+
static int
pipe_release(struct inode *inode, int decr, int decw)
{
@@ -388,12 +431,15 @@
PIPE_READERS(*inode) -= decr;
PIPE_WRITERS(*inode) -= decw;
if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) {
- struct pipe_inode_info *info = inode->i_pipe;
- inode->i_pipe = NULL;
- free_page((unsigned long) info->base);
- kfree(info);
+ /* no readers and no writers means noone uses the pipe.
+ * FIXME: check if someone could poll the pipe.
+ */
+ cleanup_pipe_data(inode);
} else {
- wake_up_interruptible(PIPE_WAIT(*inode));
+ if(!PIPE_READERS(*inode))
+ pipe_no_readers(inode);
+ if(!PIPE_WRITERS(*inode))
+ pipe_no_writers(inode);
}
up(PIPE_SEM(*inode));
@@ -531,30 +577,43 @@
release: pipe_rdwr_release,
};
-static struct inode * get_pipe_inode(void)
+int init_pipe_data(struct inode* inode)
{
- struct inode *inode = get_empty_inode();
unsigned long page;
- if (!inode)
- goto fail_inode;
-
page = __get_free_page(GFP_USER);
if (!page)
- goto fail_iput;
+ return -ENOMEM;
inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
- if (!inode->i_pipe)
- goto fail_page;
-
- inode->i_fop = &rdwr_pipe_fops;
+ if (!inode->i_pipe) {
+ free_page(page);
+ return -ENOMEM;
+ }
- init_waitqueue_head(PIPE_WAIT(*inode));
+ spin_lock_init(PIPE_LOCK(*inode));
+ INIT_LIST_HEAD(PIPE_RLIST(*inode));
+ INIT_LIST_HEAD(PIPE_WLIST(*inode));
PIPE_BASE(*inode) = (char *) page;
PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
+ PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
+ init_waitqueue_head(PIPE_WAIT(*inode));
+
+ return 0;
+}
+
+static struct inode * get_pipe_inode(void)
+{
+ struct inode *inode = get_empty_inode();
+
+ if (!inode)
+ return NULL;
+ if(init_pipe_data(inode) < 0)
+ goto fail_iput;
+
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;
- PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0;
+ inode->i_fop = &rdwr_pipe_fops;
/*
* Mark the inode dirty from the very beginning,
* that way it will never be moved to the dirty
@@ -567,13 +626,11 @@
inode->i_gid = current->fsgid;
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
inode->i_blksize = PAGE_SIZE;
+
return inode;
-fail_page:
- free_page(page);
fail_iput:
iput(inode);
-fail_inode:
return NULL;
}
--- 2.3/fs/fifo.c Sun Feb 27 08:57:11 2000
+++ build-2.3/fs/fifo.c Tue Feb 29 20:31:40 2000
@@ -20,27 +20,10 @@
if (down_interruptible(PIPE_SEM(*inode)))
goto err_nolock_nocleanup;
- if (! inode->i_pipe) {
- unsigned long page;
- struct pipe_inode_info *info;
-
- info = kmalloc(sizeof(struct pipe_inode_info),GFP_KERNEL);
-
- ret = -ENOMEM;
- if (!info)
+ if (!inode->i_pipe) {
+ ret = init_pipe_data(inode);
+ if(ret < 0)
goto err_nocleanup;
- page = __get_free_page(GFP_KERNEL);
- if (!page) {
- kfree(info);
- goto err_nocleanup;
- }
-
- inode->i_pipe = info;
-
- init_waitqueue_head(PIPE_WAIT(*inode));
- PIPE_BASE(*inode) = (char *) page;
- PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
- PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
}
switch (filp->f_mode) {
@@ -58,8 +41,7 @@
while (!PIPE_WRITERS(*inode)) {
if (signal_pending(current))
goto err_rd;
- up(PIPE_SEM(*inode));
- interruptible_sleep_on(PIPE_WAIT(*inode));
+ pipe_wait(inode);
/* Note that using down_interruptible here
and similar places below is pointless,
@@ -90,8 +72,8 @@
while (!PIPE_READERS(*inode)) {
if (signal_pending(current))
goto err_wr;
- up(PIPE_SEM(*inode));
- interruptible_sleep_on(PIPE_WAIT(*inode));
+ pipe_wait(inode);
+
down(PIPE_SEM(*inode));
}
break;
@@ -122,22 +104,20 @@
err_rd:
if (!--PIPE_READERS(*inode))
- wake_up_interruptible(PIPE_WAIT(*inode));
+ pipe_no_readers(inode);
ret = -ERESTARTSYS;
goto err;
err_wr:
if (!--PIPE_WRITERS(*inode))
- wake_up_interruptible(PIPE_WAIT(*inode));
+ pipe_no_writers(inode);
ret = -ERESTARTSYS;
goto err;
err:
if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) {
- struct pipe_inode_info *info = inode->i_pipe;
- inode->i_pipe = NULL;
- free_page((unsigned long)info->base);
- kfree(info);
+ /* FIXME: could someone poll this filp? */
+ cleanup_pipe_data(inode);
}
err_nocleanup:
--- 2.3/include/linux/pipe_fs_i.h Tue Dec 7 10:48:22 1999
+++ build-2.3/include/linux/pipe_fs_i.h Tue Feb 29 15:53:39 2000
@@ -1,16 +1,34 @@
#ifndef _LINUX_PIPE_FS_I_H
#define _LINUX_PIPE_FS_I_H
+struct pipe_writerdesc {
+ struct list_head list;
+ struct task_struct* tsk;
+};
+
+struct pipe_readerdesc {
+ struct list_head list;
+ struct task_struct* tsk;
+};
+
struct pipe_inode_info {
- wait_queue_head_t wait;
+ spinlock_t lock;
+ struct list_head waiting_readers;
+ struct list_head waiting_writers;
char *base;
unsigned int start;
unsigned int readers;
unsigned int writers;
- unsigned int waiting_readers;
- unsigned int waiting_writers;
+
+ wait_queue_head_t wait;
};
+int init_pipe_data (struct inode* inode);
+void cleanup_pipe_data (struct inode* inode);
+void pipe_wait (struct inode* inode);
+void pipe_no_writers (struct inode* inode);
+void pipe_no_readers (struct inode* inode);
+
/* Differs from PIPE_BUF in that PIPE_SIZE is the length of the actual
memory allocation, whereas PIPE_BUF makes atomicity guarantees. */
#define PIPE_SIZE PAGE_SIZE
@@ -22,8 +40,9 @@
#define PIPE_LEN(inode) ((inode).i_size)
#define PIPE_READERS(inode) ((inode).i_pipe->readers)
#define PIPE_WRITERS(inode) ((inode).i_pipe->writers)
-#define PIPE_WAITING_READERS(inode) ((inode).i_pipe->waiting_readers)
-#define PIPE_WAITING_WRITERS(inode) ((inode).i_pipe->waiting_writers)
+#define PIPE_LOCK(inode) (&(inode).i_pipe->lock)
+#define PIPE_RLIST(inode) (&(inode).i_pipe->waiting_readers)
+#define PIPE_WLIST(inode) (&(inode).i_pipe->waiting_writers)
#define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0)
#define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE)
@@ -32,4 +51,15 @@
#define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode))
#define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode))
+/* synchronization:
+ * PIPE_SEM protects PIPE_READERS, PIPE_WRITERS and i_pipe.
+ * i_pipe is also changed before fd_install() and after the last
+ * frip().
+ * PIPE_LOCK protects PIPE_RLIST, PIPE_WLIST, PIPE_START, PIPE_LEN.
+ * new readers, writers are added to the tail of their lists.
+ * the head of PIPE_{R,W}LIST can {read,write} data without owning
+ * PIPE_LOCK.
+ * if a thread decreases PIPE_{READERS,WRITERS} to 0, then it must
+ * call pipe_no_{readers,writes}.
+ */
#endif
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@vger.rutgers.edu
Please read the FAQ at http://www.tux.org/lkml/
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic