[prev in list] [next in list] [prev in thread] [next in thread] 

List:       suse-programming-e
Subject:    [suse-programming-e]  EAGAIN on a blocking socket?
From:       Per Jessen <per () computer ! org>
Date:       2006-03-02 13:29:04
Message-ID: du6rv0$htf$2 () saturn ! local ! net
[Download RAW message or body]

OK, for the benefit of others listening in, this is what I've briefly
described to Steve and Anders on suse-linux-e - who both said "it's
impossible" - which I agree with.  Reality however seems to be of a
different opinion. 

I'm writing a simple, multi-threaded SMTP proxy.  New connections are
received/accepted, packed into a unit-of-work, then queued for one of X
worker-threads to pick up.  Each UOW has the socket returned by the
accept call.  I refer to this as the "left side".  When a UOW is picked
up for processing, the thread will connect to the outbound side (the
"right side"), and then start processing. 

Here's roughly what happens:  (code attached)

check that the socket is still valid - the client could have gone away
whilst the UOW was queued. 

connect to right side (create socket etc.)
create a copy of the socket - new one is called right2. 
The original is set nonblocking. 
Now create streams, one on each socket. 

Similar thing for the left side. 

The main code (not attached) now uses select() on the left and right
inbound, nonblocking sockets, and when e.g. the left side has some data
for us,:

        if ( FD_ISSET(left,&readfds) )
        {
            closed=1;
            while( NULL!=fgets( buffer, sizeof(buffer), left_in ) )
            {
                closed=0;
                while( EOF==(rc=fputs( buffer, right_out )) )
                {
                    if ( errno!=EAGAIN ) break;
                    log_error( "[%02u.%04u] %d.2: unable to write to
right; error %d: %s\n", c->id, work->id, stage, errno,
strerror(errno) );
                    sleep(1);
                }
                if ( EOF==rc )
                {
                    stage=99;
                    break;
                }
            }
            fflush( right_out );
            
            // if the left side closed the connection
            if ( closed ) 
            {
                log_error( "right side closed??\n");
                stage=99;
            }
        }



/Per Jessen, Zürich

["ibwd_wrk_stage0.c" (text/x-objcsrc)]

// stage 0

        a=inet_ntoa( work->addr.sin_addr);
        if ( 0==(rc=getaddrinfo( a, NULL, &hint, &ai )) )
        {
            strcpy( hostname, ai->ai_canonname );
            freeaddrinfo( ai );
        }
        else
        {
            strcpy( hostname, a );
            log_error("[%02d] warning: unable to determine hostname of %s, error %d", \
c->id, a, rc);  }

        // there is a possibility that the client has gone away because we took to \
                long
        // to get started on the connection - work was queued for too long.
        if ( -1==fstat( work->soc, &s ) || !S_ISSOCK(s.st_mode) )
        {
            log_error( "\x1b[1m[%02d] client %s [%s:%d] went away (%lu seconds wait), \
                skipping\x1b[0m\n", 
            c->id, hostname, a, ntohs(work->addr.sin_port), *(c->t)-work->t );
            free(work);
            continue;
        }


        log_error( "\x1b[1m[%02u.%04u] connection %s [%s:%d] accepted.\x1b[0m\n", \
c->id, work->id, hostname, a, ntohs(work->addr.sin_port) );  
        // check statsu of our connection to right hand side, if not there, open it.
        // we'll need a timeout setting? 
        // to reuse a connection, we send a RSET instead of QUIT, and start over for \
the next  // inbound client request. On timeout we send a QUIT to right.
        
        if ( -1==(right=socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto \
)) )  {
            log_error("\x1b[1m[%02d] unable to create socket for right \
side.\x1b[0m\n", c->id );  //writefputs( "437 blah\r\n", work->soc );
            close(work->soc);
            free(work);
            continue;
        }

        memset( &addr_right, 0, sizeof(addr_right));

        inet_aton( "127.0.0.1", &addr_right.sin_addr );
        addr_right.sin_family=AF_INET;
        addr_right.sin_port=htons(10025);

        if( 0!=connect( right, (struct sockaddr*)&addr_right, sizeof(addr_right)) )
        {
            log_error( "[%02d] connect() failed; error %d: %s\n", c->id, errno, \
strerror(errno) );  close(work->soc);
            free(work);
            continue;
        }
        log_error( "\x1b[1m[%02u.%04u] connected to %s [%s:%d]\x1b[0m\n", c->id, \
work->id, "localhost", "127.0.0.1", 10025 );  
        //log_error( "\x1b[%d;1m[%02u.%04u] proxying [%s:%d] to [%s:%d]\x1b[0;37m\n", \
logcolor[c->id], c->id, work->id,   //a, ntohs(work->addr.sin_port), "127.0.0.1", \
10025 );  
        right2=dup(right);
        fcntl( right, F_SETFL, O_NONBLOCK );
                
        if ( NULL==(right_out=fdopen( right2, "w")) )
        {
            log_error("unable to open right_out stream; error %d: %s\n", errno, \
strerror(errno) );  }
        
        if ( NULL==(right_in=fdopen( right, "r")) )
        {
            log_error("unable to open right_in stream; error %d: %s\n", errno, \
strerror(errno) );  }

        left=work->soc;
        left2=dup(left);
        fcntl( left, F_SETFL, O_NONBLOCK );
        
        if ( NULL==(left_in=fdopen( left, "r" )) )
        {
            log_error("unable to open smtp_in stream; error %d: %s\n", errno, \
strerror(errno) );  free(work);
            continue;
        }
        
        if ( NULL==(left_out=fdopen( left2, "w")) )
        {
            log_error("unable to open smtp_out stream; error %d: %s\n", errno, \
strerror(errno) );  fclose(left_in);
            free(work);
            continue;
        }

        
    stage=1;



-- 
To unsubscribe, email: suse-programming-e-unsubscribe@suse.com
For additional commands, email: suse-programming-e-help@suse.com
Archives can be found at: http://lists.suse.com/archive/suse-programming-e

[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic