[prev in list] [next in list] [prev in thread] [next in thread] 

List:       zeromq-dev
Subject:    Re: [zeromq-dev] Reconnect REQ socket with REQ_RELAXED option
From:       Christian Kamm <kamm () incasoftware ! de>
Date:       2014-01-24 17:02:24
Message-ID: 52E29CA0.3040809 () incasoftware ! de
[Download RAW message or body]

It's indeed a bug with REQ_RELAXED. :/

The test2() function in the attached file models Alexey's report. I
don't have time to take care of this right now, unfortunately.

A brief debugging session tells me that when the second send() on the
REQ calls terminate() on the pipe, that pipe doesn't seem to come up
again. This is seen as send() calls returning EAGAIN when they
shouldn't. (sends for connected sockets should never EAGAIN unless we
hit the HWM)

So maybe terminate()ing that pipe is a bad idea? We wanted to terminate
the underlying connection to cause a reconnect. Maybe there's a better
way to do it. See the original discussion here:
https://github.com/zeromq/libzmq/pull/619/

Christian

On 01/24/2014 03:27 PM, Pieter Hintjens wrote:
> The REQ_RELAXED code is newish and may have things to fix. Could you
> make a minimal test case (see the zeromq/issues repo for examples),
> and log an issue?
> 
> Thanks
> Pieter
> 
> On Fri, Jan 24, 2014 at 6:09 AM, Alexey Melnichuk <mimir@newmail.ru> wrote:
>> Alexey Melnichuk <mimir <at> newmail.ru> writes:
>>
>>>
>>> Server ROUTER socket (e.g. basic echo server)
>>> Client REQ socket with REQ_RELAXED and REQ_CORRELATE.
>>>
>>> 1) Server and client starts.
>>> 2) Client send request and recv response.
>>> 3) Server stops
>>> 4) Client send request and get error EAGAIN on recv
>>> 5) Client get error EAGAIN on send
>>> 6) Server starts
>>> 7) Client continue gets error EAGAIN on send
>>>
>>
>> I also get same result with inproc transport.
>> Is it bug or it just should not works?
>>
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev@lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> 


["test_req_relaxed.cpp" (text/x-c++src)]

/*
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"

void test1 (void)
{
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    void *req = zmq_socket (ctx, ZMQ_REQ);
    assert (req);

    int enabled = 1;
    int rc = zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int));
    assert (rc == 0);

    rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
    assert (rc == 0);

    rc = zmq_bind (req, "tcp://127.0.0.1:5555");
    assert (rc == 0);

    const size_t services = 5;
    void *rep [services];
    for (size_t peer = 0; peer < services; peer++) {
        rep [peer] = zmq_socket (ctx, ZMQ_REP);
        assert (rep [peer]);

        int timeout = 100;
        rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
        assert (rc == 0);

        rc = zmq_connect (rep [peer], "tcp://localhost:5555");
        assert (rc == 0);
    }
    //  We have to give the connects time to finish otherwise the requests
    //  will not properly round-robin. We could alternatively connect the
    //  REQ sockets to the REP sockets.
    msleep (SETTLE_TIME);

    //  Case 1: Second send() before a reply arrives in a pipe.

    //  Send a request, ensure it arrives, don't send a reply
    s_send_seq (req, "A", "B", SEQ_END);
    s_recv_seq (rep [0], "A", "B", SEQ_END);

    //  Send another request on the REQ socket
    s_send_seq (req, "C", "D", SEQ_END);
    s_recv_seq (rep [1], "C", "D", SEQ_END);

    //  Send a reply to the first request - that should be discarded by the REQ
    s_send_seq (rep [0], "WRONG", SEQ_END);

    //  Send the expected reply
    s_send_seq (rep [1], "OK", SEQ_END);
    s_recv_seq (req, "OK", SEQ_END);


    //  Another standard req-rep cycle, just to check
    s_send_seq (req, "E", SEQ_END);
    s_recv_seq (rep [2], "E", SEQ_END);
    s_send_seq (rep [2], "F", "G", SEQ_END);
    s_recv_seq (req, "F", "G", SEQ_END);


    //  Case 2: Second send() after a reply is already in a pipe on the REQ.

    //  Send a request, ensure it arrives, send a reply
    s_send_seq (req, "H", SEQ_END);
    s_recv_seq (rep [3], "H", SEQ_END);
    s_send_seq (rep [3], "BAD", SEQ_END);

    // Wait for message to be there.
    rc = zmq_poll (0, 0, 100);
    assert (rc == 0);

    //  Without receiving that reply, send another request on the REQ socket
    s_send_seq (req, "I", SEQ_END);
    s_recv_seq (rep [4], "I", SEQ_END);

    //  Send the expected reply
    s_send_seq (rep [4], "GOOD", SEQ_END);
    s_recv_seq (req, "GOOD", SEQ_END);


    close_zero_linger (req);
    for (size_t peer = 0; peer < services; peer++)
        close_zero_linger (rep [peer]);

    // Wait for disconnects.
    rc = zmq_poll (0, 0, 100);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
}

void test2 (void)
{
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    //  Setup REQ
    void *req = zmq_socket (ctx, ZMQ_REQ);
    assert (req);

    int enabled = 1;
    int rc = zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int));
    assert (rc == 0);

    rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
    assert (rc == 0);

    int timeout = 100;
    rc = zmq_setsockopt (req, ZMQ_RCVTIMEO, &timeout, sizeof (int));
    assert (rc == 0);

    rc = zmq_connect (req, "tcp://127.0.0.1:5556");
    assert (rc == 0);

    //  Setup REP
    void *rep = zmq_socket (ctx, ZMQ_REP);
    assert (rep);

    rc = zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof (int));
    assert (rc == 0);

    rc = zmq_bind (rep, "tcp://127.0.0.1:5556");
    assert (rc == 0);

    //  REQ sends request, reads response (base case)
    s_send_seq (req, "A", "REQUEST", SEQ_END);
    s_recv_seq (rep, "A", "REQUEST", SEQ_END);
    s_send_seq (rep, "A", "REPLY", SEQ_END);
    s_recv_seq (req, "A", "REPLY", SEQ_END);

    //  REP stops
    rc = zmq_unbind (rep, "tcp://127.0.0.1:5556");
    assert (rc == 0);

    //  REQ sends request, receives nothing
    s_send_seq (req, "B", "REQUEST", SEQ_END);
    char buffer[1];
    rc = zmq_recv (req, buffer, 1, 0);
    assert (rc == -1);
    assert (zmq_errno () == EAGAIN);

    //  REQ can still queue new requests
    s_send_seq (req, "C", "REQUEST", SEQ_END);

    //  REP comes up again
    rc = zmq_bind (rep, "tcp://127.0.0.1:5556");
    assert (rc == 0);

    //  REP gets the queued messages
    s_recv_seq (rep, "B", "REQUEST", SEQ_END);
    s_send_seq (rep, "B", "REPLY", SEQ_END);
    s_recv_seq (rep, "C", "REQUEST", SEQ_END);
    s_send_seq (rep, "C", "REPLY", SEQ_END);

    //  REQ only shows the latest reply
    s_recv_seq (req, "C", "REPLY", SEQ_END);

    //  REQ sends request, reads response (base case again)
    s_send_seq (req, "D", "REQUEST", SEQ_END);
    s_recv_seq (rep, "D", "REQUEST", SEQ_END);
    s_send_seq (rep, "D", "REPLY", SEQ_END);
    s_recv_seq (req, "D", "REPLY", SEQ_END);

    close_zero_linger (req);
    close_zero_linger (rep);

    // Wait for disconnects.
    rc = zmq_poll (0, 0, 100);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
}

int main (void)
{
    setup_test_environment();
    test1 ();
    test2 ();
    return 0;
}


_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic