[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: extragear/network/ktorrent/libbtcore
From: Joris Guisson <joris.guisson () gmail ! com>
Date: 2010-01-20 17:37:14
Message-ID: 1264009034.661584.11727.nullmailer () svn ! kde ! org
[Download RAW message or body]
SVN commit 1077693 by guisson:
Further work on UTP:
- Add RTT calculations
- Add output_buffer and use that to store the data before sending
- Send packets according a packet size
M +1 -0 CMakeLists.txt
M +11 -11 util/circularbuffer.cpp
M +9 -3 util/circularbuffer.h
M +1 -0 utp/CMakeLists.txt
M +57 -48 utp/connection.cpp
M +16 -5 utp/connection.h
M +1 -5 utp/localwindow.h
M +22 -7 utp/remotewindow.cpp
M +16 -3 utp/remotewindow.h
A utp/timevalue.cpp [License: GPL (v2+)]
A utp/timevalue.h [License: GPL (v2+)]
M +2 -0 utp/utpprotocol.h
M +1 -5 utp/utpsocket.cpp
--- trunk/extragear/network/ktorrent/libbtcore/CMakeLists.txt #1077692:1077693
@@ -200,6 +200,7 @@
utp/localwindow.cpp
utp/remotewindow.cpp
utp/utpsocket.cpp
+ utp/timevalue.cpp
btversion.cpp
)
--- trunk/extragear/network/ktorrent/libbtcore/util/circularbuffer.cpp #1077692:1077693
@@ -25,9 +25,9 @@
namespace bt
{
- CircularBuffer::CircularBuffer(Uint32 cap) : window(0),capacity(cap),start(0),size(0)
+ CircularBuffer::CircularBuffer(Uint32 cap) : window(0),buffer_capacity(cap),start(0),size(0)
{
- window = new bt::Uint8[capacity];
+ window = new bt::Uint8[buffer_capacity];
}
CircularBuffer::~CircularBuffer()
@@ -41,7 +41,7 @@
return 0;
bt::Uint32 to_read = size < max_len ? size : max_len;
- if (start + to_read < capacity)
+ if (start + to_read < buffer_capacity)
{
// we are not going past the end of the data
memcpy(data,window + start,to_read);
@@ -52,8 +52,8 @@
else
{
// read until the end of the window
- memcpy(data,window + start,capacity - start);
- bt::Uint32 ar = capacity - start;
+ memcpy(data,window + start,buffer_capacity - start);
+ bt::Uint32 ar = buffer_capacity - start;
if (to_read > ar) // read the rest
memcpy(data + ar,window,to_read - ar);
@@ -65,13 +65,13 @@
bt::Uint32 CircularBuffer::write(const bt::Uint8* data, bt::Uint32 len)
{
- if (size == capacity)
+ if (size == buffer_capacity)
return 0;
- bt::Uint32 free_space = capacity - size;
+ bt::Uint32 free_space = buffer_capacity - size;
bt::Uint32 to_write = free_space < len ? free_space : len;
- bt::Uint32 off = (start + size) % capacity;
- if (off + to_write < capacity)
+ bt::Uint32 off = (start + size) % buffer_capacity;
+ if (off + to_write < buffer_capacity)
{
// everything will go in one go
memcpy(window + off,data,to_write);
@@ -80,8 +80,8 @@
}
else
{
- memcpy(window + off,data,capacity - off);
- bt::Uint32 aw = capacity - off;
+ memcpy(window + off,data,buffer_capacity - off);
+ bt::Uint32 aw = buffer_capacity - off;
if (to_write > aw)
memcpy(window,data + aw,to_write - aw);
--- trunk/extragear/network/ktorrent/libbtcore/util/circularbuffer.h #1077692:1077693
@@ -26,14 +26,14 @@
namespace bt
{
-
+
/**
Circular buffer class
*/
class BTCORE_EXPORT CircularBuffer
{
public:
- CircularBuffer(bt::Uint32 cap);
+ CircularBuffer(bt::Uint32 cap = 64 * 1024);
virtual ~CircularBuffer();
/**
@@ -52,9 +52,15 @@
*/
bt::Uint32 write(const bt::Uint8* data,bt::Uint32 len);
+ /// Get the buffer capacity
+ bt::Uint32 capacity() const {return buffer_capacity;}
+
+ /// Get how much is used
+ bt::Uint32 fill() const {return size;}
+
protected:
bt::Uint8* window;
- bt::Uint32 capacity;
+ bt::Uint32 buffer_capacity;
bt::Uint32 start;
bt::Uint32 size;
};
--- trunk/extragear/network/ktorrent/libbtcore/utp/CMakeLists.txt #1077692:1077693
@@ -5,6 +5,7 @@
utpsocket.h
connection.h
localwindow.h
+ timevalue.h
)
install(FILES ${utp_HDR} DESTINATION ${INCLUDE_INSTALL_DIR}/libbtcore/utp COMPONENT Devel)
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.cpp #1077692:1077693
@@ -36,6 +36,11 @@
eof_seq_nr = -1;
local_wnd = new LocalWindow();
remote_wnd = new RemoteWindow();
+ rtt = 100;
+ rtt_var = 0;
+ last_ack_nr = 0;
+ timeout = 1000;
+ packet_size = 1400;
if (type == OUTGOING)
{
send_connection_id = recv_connection_id + 1;
@@ -80,8 +85,9 @@
DumpHeader(*hdr);
updateDelayMeasurement(hdr);
- remote_wnd->packetReceived(hdr);
+ remote_wnd->packetReceived(hdr,this);
ack_nr = hdr->seq_nr;
+ last_ack_nr = hdr->ack_nr;
switch (state)
{
case CS_SYN_SENT:
@@ -121,8 +127,8 @@
local_wnd->write((const bt::Uint8*)packet.data() + data_off,s);
data_ready.wakeAll();
- // send back an ACK
- sendState();
+ // send back an ACK
+ sendStateOrData();
}
else if (hdr->type == ST_STATE)
{
@@ -163,7 +169,17 @@
return state;
}
+
+
+ void Connection::updateRTT(const utp::Header* hdr,bt::Uint32 packet_rtt)
+ {
+ int delta = rtt - packet_rtt;
+ rtt_var += (qAbs(delta) - rtt_var) / 4;
+ rtt += (packet_rtt - rtt) / 8;
+ timeout = qMin(rtt + rtt_var * 4, (bt::Uint32)500);
+ }
+
void Connection::sendSYN()
{
seq_nr = 1;
@@ -185,7 +201,6 @@
hdr->seq_nr = seq_nr;
hdr->ack_nr = ack_nr;
- remote_wnd->addPacket(ba);
srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
}
@@ -206,7 +221,6 @@
hdr->seq_nr = seq_nr;
hdr->ack_nr = ack_nr;
- remote_wnd->addPacket(ba);
srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
}
@@ -227,7 +241,6 @@
hdr->seq_nr = seq_nr;
hdr->ack_nr = ack_nr;
- remote_wnd->addPacket(ba);
srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
}
@@ -289,68 +302,64 @@
return data_off;
}
- bool Connection::send(const QByteArray & packet)
+ int Connection::send(const bt::Uint8* data, Uint32 len)
{
QMutexLocker lock(&mutex);
- if (state != CS_CONNECTED || !remote_wnd->allowedToSend(packet.size()))
- return false;
+ if (state != CS_CONNECTED)
+ return -1;
- struct timeval tv;
- gettimeofday(&tv,NULL);
-
- QByteArray ba(sizeof(Header) + packet.size(),0);
- Header* hdr = (Header*)ba.data();
- hdr->version = 1;
- hdr->type = ST_DATA;
- hdr->extension = 0;
- hdr->connection_id = send_connection_id;
- hdr->timestamp_microseconds = tv.tv_usec;
- hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
- hdr->seq_nr = seq_nr + 1;
- hdr->ack_nr = ack_nr;
-
- memcpy(ba.data() + sizeof(Header),packet.data(),packet.size());
- if (!srv->sendTo(ba,remote))
- return false;
-
- seq_nr++;
- remote_wnd->addPacket(packet);
- return true;
+ // first put data in the output buffer then send packets
+ bt::Uint32 ret = output_buffer.write(data,len);
+ sendPackets();
+ return ret;
}
- Uint32 Connection::send(const bt::Uint8* data, Uint32 len)
+ void Connection::sendPackets()
{
- QMutexLocker lock(&mutex);
- if (state != CS_CONNECTED)
- return 0;
+ // chop output_buffer data in packets and keep sending
+ // until we are no longer allowed or the buffer is empty
+ while (output_buffer.fill() > 0 && remote_wnd->availableSpace() > 0)
+ {
+ bt::Uint32 to_read = qMin(output_buffer.fill(),remote_wnd->availableSpace());
+ to_read = qMin(to_read,packet_size);
+
+ QByteArray packet(to_read,0);
+ output_buffer.read((bt::Uint8*)packet.data(),to_read);
+ doSend(packet);
+ }
+ }
+
+ void Connection::sendStateOrData()
+ {
+ if (output_buffer.fill() > 0 && remote_wnd->availableSpace() > 0)
+ sendPackets();
+ else
+ sendState();
+ }
+
+ int Connection::doSend(const QByteArray& packet)
+ {
+ bt::Uint32 to_send = packet.size();
+ TimeValue now;
- bt::Uint32 space = remote_wnd->availableSpace();
- if (space == 0)
- return 0;
-
- bt::Uint32 to_send = space <= len ? space : len;
- struct timeval tv;
- gettimeofday(&tv,NULL);
-
- QByteArray ba(sizeof(Header) + to_send,0);
+ QByteArray ba(sizeof(Header) + packet.size(),0);
Header* hdr = (Header*)ba.data();
hdr->version = 1;
hdr->type = ST_DATA;
hdr->extension = 0;
hdr->connection_id = send_connection_id;
- hdr->timestamp_microseconds = tv.tv_usec;
+ hdr->timestamp_microseconds = now.microseconds;
hdr->timestamp_difference_microseconds = reply_micro;
hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
hdr->seq_nr = seq_nr + 1;
hdr->ack_nr = ack_nr;
- memcpy(ba.data() + sizeof(Header),data,to_send);
+ memcpy(ba.data() + sizeof(Header),packet.data(),to_send);
if (!srv->sendTo(ba,remote))
- return 0;
+ return -1;
seq_nr++;
- remote_wnd->addPacket(QByteArray((const char*)data,to_send));
+ remote_wnd->addPacket(packet,seq_nr,now);
return to_send;
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.h #1077692:1077693
@@ -26,6 +26,7 @@
#include <btcore_export.h>
#include <net/address.h>
#include "utpprotocol.h"
+#include <util/circularbuffer.h>
@@ -58,12 +59,9 @@
/// Get the receive connection id
bt::Uint16 receiveConnectionID() const {return recv_connection_id;}
- /// Send a packet, will return false if there is no room in the remote window
- bool send(const QByteArray & packet);
+ /// Send some data, returns the amount of bytes sent (or -1 on error)
+ int send(const bt::Uint8* data,bt::Uint32 len);
- /// Send some data, returns the amount of bytes sent
- bt::Uint32 send(const bt::Uint8* data,bt::Uint32 len);
-
/// Read available data from local window, returns the amount of bytes read
bt::Uint32 recv(bt::Uint8* buf,bt::Uint32 max_len);
@@ -88,12 +86,18 @@
/// Close the socket
void close();
+ /// Update the RTT time
+ void updateRTT(const Header* hdr,bt::Uint32 packet_rtt);
+
private:
void sendSYN();
void waitForSYN();
void sendState();
void sendFIN();
void updateDelayMeasurement(const Header* hdr);
+ void sendStateOrData();
+ int doSend(const QByteArray & packet);
+ void sendPackets();
/**
Parses the packet, and retrieves pointer to the header, the SelectiveAck extension (if present)
@@ -116,11 +120,18 @@
bt::Uint16 recv_connection_id;
LocalWindow* local_wnd;
RemoteWindow* remote_wnd;
+ bt::CircularBuffer output_buffer;
bt::Uint16 seq_nr;
bt::Uint16 ack_nr;
+ bt::Uint16 last_ack_nr;
int eof_seq_nr;
+ bt::Uint32 timeout;
+ bt::Uint32 rtt;
+ bt::Uint32 rtt_var;
+ bt::Uint32 packet_size;
+
mutable QMutex mutex;
QWaitCondition connected;
QWaitCondition data_ready;
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.h #1077692:1077693
@@ -40,12 +40,8 @@
LocalWindow(bt::Uint32 cap = DEFAULT_CAPACITY);
virtual ~LocalWindow();
- bt::Uint32 maxWindow() const {return capacity;}
+ bt::Uint32 maxWindow() const {return buffer_capacity;}
bt::Uint32 currentWindow() const {return size;}
-
-
-
-
};
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.cpp #1077692:1077693
@@ -20,10 +20,21 @@
#include "remotewindow.h"
#include "utpprotocol.h"
+#include "connection.h"
namespace utp
{
+ UnackedPacket::UnackedPacket(const QByteArray& data, bt::Uint16 seq_nr, const TimeValue& send_time)
+ : data(data),seq_nr(seq_nr),send_time(send_time)
+ {
+ }
+
+ UnackedPacket::~UnackedPacket()
+ {
+ }
+
+
RemoteWindow::RemoteWindow() : cur_window(0),max_window(64 * 1024),wnd_size(0)
{
@@ -31,20 +42,24 @@
RemoteWindow::~RemoteWindow()
{
-
+ qDeleteAll(unacked_packets);
}
- void RemoteWindow::packetReceived(const utp::Header* hdr)
+ void RemoteWindow::packetReceived(const utp::Header* hdr,Connection* conn)
{
wnd_size = hdr->wnd_size;
+ TimeValue now;
// everything up until the ack_nr in the header should now have been acked
- QList<QByteArray>::iterator i = unacked_packets.begin();
+ QList<UnackedPacket*>::iterator i = unacked_packets.begin();
while (i != unacked_packets.end())
{
- if (((utp::Header*)i->data())->seq_nr <= hdr->ack_nr)
+ UnackedPacket* up = *i;
+ if (up->seq_nr <= hdr->ack_nr)
{
- cur_window -= i->size();
+ conn->updateRTT(hdr,now - up->send_time);
+ cur_window -= up->data.size();
+ delete up;
i = unacked_packets.erase(i);
}
else
@@ -52,10 +67,10 @@
}
}
- void RemoteWindow::addPacket(const QByteArray& data)
+ void RemoteWindow::addPacket(const QByteArray& data,bt::Uint16 seq_nr,const TimeValue & send_time)
{
cur_window += data.size();
- unacked_packets.append(data);
+ unacked_packets.append(new UnackedPacket(data,seq_nr,send_time));
}
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.h #1077692:1077693
@@ -26,10 +26,23 @@
#include <QMutex>
#include <btcore_export.h>
#include <util/constants.h>
+#include <utp/timevalue.h>
namespace utp
{
+ class Connection;
struct Header;
+
+ struct UnackedPacket
+ {
+ UnackedPacket(const QByteArray & data,bt::Uint16 seq_nr,const TimeValue & send_time);
+ ~UnackedPacket();
+
+ QByteArray data;
+ bt::Uint16 seq_nr;
+ TimeValue send_time;
+ };
+
/**
Keeps track of the remote sides window including all packets inflight.
*/
@@ -40,10 +53,10 @@
virtual ~RemoteWindow();
/// A packet was received (update window size and check for acks)
- void packetReceived(const Header* hdr);
+ void packetReceived(const Header* hdr,Connection* conn);
/// Add a packet to the remote window (should include headers)
- void addPacket(const QByteArray & data);
+ void addPacket(const QByteArray & data,bt::Uint16 seq_nr,const TimeValue & send_time);
/// Are we allowed to send
bool allowedToSend(bt::Uint32 packet_size) const
@@ -68,7 +81,7 @@
bt::Uint32 cur_window;
bt::Uint32 max_window;
bt::Uint32 wnd_size; // advertised window size from the other side
- QList<QByteArray> unacked_packets;
+ QList<UnackedPacket*> unacked_packets;
};
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpprotocol.h #1077692:1077693
@@ -98,6 +98,8 @@
CS_FINISHED,
CS_CLOSED
};
+
+ const quint32 MIN_PACKET_SIZE = 150;
}
#endif // UTP_UTPPROTOCOL_H
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpsocket.cpp #1077692:1077693
@@ -129,11 +129,7 @@
if (!conn)
return -1;
- bt::Uint32 ret = conn->send(buf,len);
- if (!ret)
- return -1;
-
- return ret;
+ return conn->send(buf,len);
}
void UTPSocket::setBlocking(bool on)
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic