[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: extragear/network/ktorrent/libbtcore
From: Joris Guisson <joris.guisson () gmail ! com>
Date: 2010-01-21 18:19:39
Message-ID: 1264097979.483680.5416.nullmailer () svn ! kde ! org
[Download RAW message or body]
SVN commit 1078186 by guisson:
UTP: Add selective ack extension support
M +1 -0 CMakeLists.txt
M +56 -69 utp/connection.cpp
M +3 -1 utp/connection.h
M +25 -1 utp/localwindow.cpp
M +11 -1 utp/localwindow.h
M +13 -5 utp/remotewindow.cpp
M +77 -0 utp/tests/localwindowtest.cpp
A utp/utpprotocol.cpp [License: GPL (v2+)]
M +10 -0 utp/utpprotocol.h
--- trunk/extragear/network/ktorrent/libbtcore/CMakeLists.txt #1078185:1078186
@@ -201,6 +201,7 @@
utp/remotewindow.cpp
utp/utpsocket.cpp
utp/timevalue.cpp
+ utp/utpprotocol.cpp
btversion.cpp
)
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.cpp #1078185:1078186
@@ -110,9 +110,9 @@
if (hdr->type == ST_SYN)
{
// Send back a state packet
+ local_wnd->setLastSeqNr(hdr->seq_nr);
sendState();
state = CS_CONNECTED;
- local_wnd->setLastSeqNr(hdr->seq_nr);
Out(SYS_CON|LOG_NOTICE) << "UTP: established connection with " << remote.toString() << endl;
}
else
@@ -127,7 +127,8 @@
// push data into local window
int s = packet.size() - data_off;
local_wnd->packetReceived(hdr,(const bt::Uint8*)packet.data() + data_off,s);
- data_ready.wakeAll();
+ if (local_wnd->fill() > 0)
+ data_ready.wakeAll();
// send back an ACK
sendStateOrData();
@@ -181,89 +182,62 @@
timeout = qMin(rtt + rtt_var * 4, (bt::Uint32)500);
}
-
- void Connection::sendSYN()
+
+ int Connection::sendPacket(Uint32 type,Uint16 p_ack_nr)
{
- seq_nr = 1;
- ack_nr = 0;
- state = CS_SYN_SENT;
-
struct timeval tv;
gettimeofday(&tv,NULL);
- QByteArray ba(sizeof(Header),0);
+ bt::Uint32 extension_length = 0;
+ bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
+ if (sack_bits > 0)
+ extension_length += 2 + qMin(sack_bits / 8,(bt::Uint32)4);
+
+ QByteArray ba(sizeof(Header) + extension_length,0);
Header* hdr = (Header*)ba.data();
hdr->version = 1;
- hdr->type = ST_SYN;
- hdr->extension = 0;
- hdr->connection_id = recv_connection_id;
+ hdr->type = type;
+ hdr->extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
+ hdr->connection_id = type == ST_SYN ? recv_connection_id : send_connection_id;
hdr->timestamp_microseconds = tv.tv_usec;
hdr->timestamp_difference_microseconds = reply_micro;
hdr->wnd_size = local_wnd->availableSpace();
hdr->seq_nr = seq_nr;
- hdr->ack_nr = ack_nr;
+ hdr->ack_nr = p_ack_nr;
- srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
+ if (extension_length > 0)
+ {
+ SelectiveAck* sack = (SelectiveAck*)(ba.data() + sizeof(Header));
+ sack->extension = 0;
+ sack->length = extension_length - 2;
+ local_wnd->fillSelectiveAck(sack);
+ }
+
+ return srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
}
+
+
+ void Connection::sendSYN()
+ {
+ seq_nr = 1;
+ ack_nr = 0;
+ state = CS_SYN_SENT;
+ sendPacket(ST_SYN,0);
+ }
void Connection::sendState()
{
- struct timeval tv;
- gettimeofday(&tv,NULL);
-
- QByteArray ba(sizeof(Header),0);
- Header* hdr = (Header*)ba.data();
- hdr->version = 1;
- hdr->type = ST_STATE;
- hdr->extension = 0;
- hdr->connection_id = send_connection_id;
- hdr->timestamp_microseconds = tv.tv_usec;
- hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->availableSpace();
- hdr->seq_nr = seq_nr;
- hdr->ack_nr = ack_nr;
-
- srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
+ sendPacket(ST_STATE,local_wnd->lastSeqNr());
}
void Connection::sendFIN()
{
- struct timeval tv;
- gettimeofday(&tv,NULL);
-
- QByteArray ba(sizeof(Header),0);
- Header* hdr = (Header*)ba.data();
- hdr->version = 1;
- hdr->type = ST_FIN;
- hdr->extension = 0;
- hdr->connection_id = send_connection_id;
- hdr->timestamp_microseconds = tv.tv_usec;
- hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->availableSpace();
- hdr->seq_nr = seq_nr;
- hdr->ack_nr = ack_nr;
-
- srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
+ sendPacket(ST_FIN,local_wnd->lastSeqNr());
}
void Connection::sendReset()
{
- struct timeval tv;
- gettimeofday(&tv,NULL);
-
- QByteArray ba(sizeof(Header),0);
- Header* hdr = (Header*)ba.data();
- hdr->version = 1;
- hdr->type = ST_RESET;
- hdr->extension = 0;
- hdr->connection_id = send_connection_id;
- hdr->timestamp_microseconds = tv.tv_usec;
- hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->availableSpace();
- hdr->seq_nr = seq_nr;
- hdr->ack_nr = ack_nr;
-
- srv->sendTo(ba,remote);
+ sendPacket(ST_RESET,local_wnd->lastSeqNr());
}
void Connection::waitForSYN()
@@ -294,7 +268,7 @@
while (data_off < packet.size() && ptr->extension != 0)
{
ptr = (UnknownExtension*)packet.data() + data_off;
- if (ext_id == 1)
+ if (ext_id == SELECTIVE_ACK_ID)
*selective_ack = (SelectiveAck*)ptr;
data_off += 2 + ptr->length;
@@ -327,7 +301,7 @@
QByteArray packet(to_read,0);
output_buffer.read((bt::Uint8*)packet.data(),to_read);
- doSend(packet);
+ sendDataPacket(packet);
}
}
@@ -339,24 +313,37 @@
sendState();
}
- int Connection::doSend(const QByteArray& packet)
+ int Connection::sendDataPacket(const QByteArray& packet)
{
bt::Uint32 to_send = packet.size();
TimeValue now;
- QByteArray ba(sizeof(Header) + packet.size(),0);
+ bt::Uint32 extension_length = 0;
+ bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
+ if (sack_bits > 0)
+ extension_length += 2 + qMin(sack_bits / 8,(bt::Uint32)4);
+
+ QByteArray ba(sizeof(Header) + extension_length + packet.size(),0);
Header* hdr = (Header*)ba.data();
hdr->version = 1;
hdr->type = ST_DATA;
- hdr->extension = 0;
+ hdr->extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
hdr->connection_id = send_connection_id;
hdr->timestamp_microseconds = now.microseconds;
hdr->timestamp_difference_microseconds = reply_micro;
hdr->wnd_size = local_wnd->availableSpace();
hdr->seq_nr = seq_nr + 1;
- hdr->ack_nr = ack_nr;
+ hdr->ack_nr = local_wnd->lastSeqNr();
- memcpy(ba.data() + sizeof(Header),packet.data(),to_send);
+ if (extension_length > 0)
+ {
+ SelectiveAck* sack = (SelectiveAck*)(ba.data() + sizeof(Header));
+ sack->extension = 0;
+ sack->length = extension_length - 2;
+ local_wnd->fillSelectiveAck(sack);
+ }
+
+ memcpy(ba.data() + sizeof(Header) + extension_length,packet.data(),to_send);
if (!srv->sendTo(ba,remote))
return -1;
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.h #1078185:1078186
@@ -96,8 +96,9 @@
void sendFIN();
void updateDelayMeasurement(const Header* hdr);
void sendStateOrData();
- int doSend(const QByteArray & packet);
void sendPackets();
+ int sendPacket(bt::Uint32 type,bt::Uint16 p_ack_nr);
+ int sendDataPacket(const QByteArray & packet);
/**
Parses the packet, and retrieves pointer to the header, the SelectiveAck extension (if present)
@@ -108,6 +109,7 @@
*/
int parsePacket(const QByteArray & packet,Header** hdr,SelectiveAck** selective_ack);
+
private:
Type type;
UTPServer* srv;
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.cpp #1078185:1078186
@@ -83,7 +83,7 @@
if (availableSpace() < size)
return false;
- if (window_space < size || hdr->seq_nr != last_seq_nr + 1)
+ if (hdr->seq_nr != last_seq_nr + 1)
{
// insert the packet into the future_packets list
QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
@@ -121,5 +121,29 @@
return true;
}
+
+ bt::Uint32 LocalWindow::selectiveAckBits() const
+ {
+ if (future_packets.isEmpty())
+ return 0;
+ else
+ return future_packets.last()->seq_nr - last_seq_nr - 1;
+ }
+
+
+ void LocalWindow::fillSelectiveAck(SelectiveAck* sack)
+ {
+ // First turn off all bits
+ bt::Uint8* bitset = (bt::Uint8*)sack + 2;
+ memset(bitset,0,sack->length);
+
+ QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
+ while (itr != future_packets.end())
+ {
+ Acked(sack,(*itr)->seq_nr - last_seq_nr);
+ itr++;
+ }
+ }
+
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.h #1078185:1078186
@@ -30,6 +30,7 @@
namespace utp
{
+ struct SelectiveAck;
struct Header;
const bt::Uint32 DEFAULT_CAPACITY = 64*1024;
@@ -65,18 +66,27 @@
/// Set the last sequence number
void setLastSeqNr(bt::Uint16 lsn);
+ /// Get the last sequence number we can safely ack
+ bt::Uint16 lastSeqNr() const {return last_seq_nr;}
+
/// Is the window empty
bool isEmpty() const {return future_packets.isEmpty() && fill() == 0;}
virtual bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
+ /// Get the number of selective ack bits needed when sending a packet
+ bt::Uint32 selectiveAckBits() const;
+
+ /// Fill a SelectiveAck structure
+ void fillSelectiveAck(SelectiveAck* sack);
+
private:
void checkFuturePackets();
private:
bt::Uint16 last_seq_nr;
// all the packets which have been received but we can yet write to the output buffer
- // due to either missing packets or lack of space
+ // due to missing packets
QLinkedList<FuturePacket*> future_packets;
bt::Uint32 window_space;
};
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.cpp #1078185:1078186
@@ -51,25 +51,33 @@
wnd_size = hdr->wnd_size;
TimeValue now;
- // everything up until the ack_nr in the header should now have been acked
QList<UnackedPacket*>::iterator i = unacked_packets.begin();
while (i != unacked_packets.end())
{
UnackedPacket* up = *i;
if (up->seq_nr <= hdr->ack_nr)
{
+ // everything up until the ack_nr in the header is acked
conn->updateRTT(hdr,now - up->send_time);
cur_window -= up->data.size();
delete up;
i = unacked_packets.erase(i);
}
+ else if (sack)
+ {
+ if (Acked(sack,up->seq_nr - hdr->ack_nr))
+ {
+ conn->updateRTT(hdr,now - up->send_time);
+ cur_window -= up->data.size();
+ delete up;
+ i = unacked_packets.erase(i);
+ }
+ else
+ i++;
+ }
else
break;
}
-
- if (sack)
- {
- }
}
void RemoteWindow::addPacket(const QByteArray& data,bt::Uint16 seq_nr,const TimeValue & send_time)
--- trunk/extragear/network/ktorrent/libbtcore/utp/tests/localwindowtest.cpp #1078185:1078186
@@ -204,6 +204,83 @@
QVERIFY(wnd.availableSpace() == 0);
QVERIFY(wnd.currentWindow() == 500);
}
+
+ void testSelectiveAck()
+ {
+ bt::Uint8 wdata[1000];
+ memset(wdata,0,1000);
+
+ utp::Header hdr;
+ utp::LocalWindow wnd(1000);
+ wnd.setLastSeqNr(0);
+
+ // first write first and last packet
+ bt::Uint32 step = 200;
+ hdr.seq_nr = 1;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - step);
+ QVERIFY(wnd.currentWindow() == step);
+ hdr.seq_nr = 5;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 2*step);
+ QVERIFY(wnd.currentWindow() == 2*step);
+ QVERIFY(wnd.fill() == step);
+
+ // Check SelectiveAck generation
+ QVERIFY(wnd.selectiveAckBits() == 3);
+ bt::Uint8 sack_data[6];
+ SelectiveAck* sack = (SelectiveAck*)sack_data;
+ sack->length = 4;
+ sack->extension = 0;
+ wnd.fillSelectiveAck(sack);
+ QVERIFY(sack_data[2] == 0x4);
+ QVERIFY(sack_data[3] == 0x0);
+ QVERIFY(sack_data[4] == 0x0);
+ QVERIFY(sack_data[5] == 0x0);
+
+ // Now write 4
+ hdr.seq_nr = 4;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 3*step);
+ QVERIFY(wnd.currentWindow() == 3*step);
+ QVERIFY(wnd.fill() == step);
+
+ // Check selective ack again
+ QVERIFY(wnd.selectiveAckBits() == 3);
+ sack->length = 4;
+ sack->extension = 0;
+ wnd.fillSelectiveAck(sack);
+ QVERIFY(sack_data[2] == 0x6);
+ QVERIFY(sack_data[3] == 0x0);
+ QVERIFY(sack_data[4] == 0x0);
+ QVERIFY(sack_data[5] == 0x0);
+
+ // Now write 3
+ hdr.seq_nr = 3;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 4*step);
+ QVERIFY(wnd.currentWindow() == 4*step);
+ QVERIFY(wnd.fill() == step);
+
+ // Check selective ack again
+ QVERIFY(wnd.selectiveAckBits() == 3);
+ sack->length = 4;
+ sack->extension = 0;
+ wnd.fillSelectiveAck(sack);
+ QVERIFY(sack_data[2] == 0x7);
+ QVERIFY(sack_data[3] == 0x0);
+ QVERIFY(sack_data[4] == 0x0);
+ QVERIFY(sack_data[5] == 0x0);
+
+ // And then 2
+ hdr.seq_nr = 2;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 5*step);
+ QVERIFY(wnd.currentWindow() == 5*step);
+ QVERIFY(wnd.fill() == 5*step);
+ // selective ack should now be unnecessary
+ QVERIFY(wnd.selectiveAckBits() == 0);
+ }
private:
bt::Uint8 data[13];
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpprotocol.h #1078185:1078186
@@ -22,6 +22,7 @@
#define UTP_UTPPROTOCOL_H
#include <QtGlobal>
+#include <util/constants.h>
namespace utp
{
@@ -82,6 +83,9 @@
quint8 length;
};
+ const bt::Uint8 SELECTIVE_ACK_ID = 1;
+ const bt::Uint8 EXTENSION_BITS_ID = 2;
+
// type field values
const quint8 ST_DATA = 0;
const quint8 ST_FIN = 1;
@@ -100,6 +104,12 @@
};
const quint32 MIN_PACKET_SIZE = 150;
+
+ // Test if a bit is acked
+ bool Acked(const SelectiveAck* sack,bt::Uint16 bit);
+
+ // Turn on a bit in the SelectiveAck
+ void Acked(SelectiveAck* sack,bt::Uint16 bit);
}
#endif // UTP_UTPPROTOCOL_H
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic