[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: extragear/network/ktorrent/libbtcore
From: Joris Guisson <joris.guisson () gmail ! com>
Date: 2010-01-20 19:08:07
Message-ID: 1264014487.758972.20236.nullmailer () svn ! kde ! org
[Download RAW message or body]
SVN commit 1077722 by guisson:
UTP: LocalWindow can now handle packet loss
M +2 -2 util/circularbuffer.h
M +12 -10 utp/connection.cpp
M +87 -2 utp/localwindow.cpp
M +39 -2 utp/localwindow.h
M +6 -1 utp/remotewindow.cpp
M +2 -1 utp/remotewindow.h
M +140 -11 utp/tests/localwindowtest.cpp
M +1 -1 utp/utpprotocol.h
--- trunk/extragear/network/ktorrent/libbtcore/util/circularbuffer.h #1077721:1077722
@@ -42,7 +42,7 @@
@param max_len Maximum amount to read
@return The amount read
*/
- bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
+ virtual bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
/**
Write up to len bytes from data and store it in the window.
@@ -50,7 +50,7 @@
@param max_len Amount to write
@return The amount written
*/
- bt::Uint32 write(const bt::Uint8* data,bt::Uint32 len);
+ virtual bt::Uint32 write(const bt::Uint8* data,bt::Uint32 len);
/// Get the buffer capacity
bt::Uint32 capacity() const {return buffer_capacity;}
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.cpp #1077721:1077722
@@ -85,7 +85,7 @@
DumpHeader(*hdr);
updateDelayMeasurement(hdr);
- remote_wnd->packetReceived(hdr,this);
+ remote_wnd->packetReceived(hdr,sack,this);
ack_nr = hdr->seq_nr;
last_ack_nr = hdr->ack_nr;
switch (state)
@@ -96,6 +96,7 @@
{
// connection estabished
state = CS_CONNECTED;
+ local_wnd->setLastSeqNr(hdr->seq_nr);
Out(SYS_CON|LOG_NOTICE) << "UTP: established connection with " << remote.toString() << endl;
connected.wakeAll();
}
@@ -111,6 +112,7 @@
// Send back a state packet
sendState();
state = CS_CONNECTED;
+ local_wnd->setLastSeqNr(hdr->seq_nr);
Out(SYS_CON|LOG_NOTICE) << "UTP: established connection with " << remote.toString() << endl;
}
else
@@ -124,7 +126,7 @@
{
// push data into local window
int s = packet.size() - data_off;
- local_wnd->write((const bt::Uint8*)packet.data() + data_off,s);
+ local_wnd->packetReceived(hdr,(const bt::Uint8*)packet.data() + data_off,s);
data_ready.wakeAll();
// send back an ACK
@@ -152,7 +154,7 @@
// Check if we need to go to the closed state
// We can do this if all our packets have been acked and the local window
// has been fully read
- if (remote_wnd->allPacketsAcked() && local_wnd->currentWindow() == 0)
+ if (remote_wnd->allPacketsAcked() && local_wnd->isEmpty())
{
state = CS_CLOSED;
data_ready.wakeAll();
@@ -197,7 +199,7 @@
hdr->connection_id = recv_connection_id;
hdr->timestamp_microseconds = tv.tv_usec;
hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+ hdr->wnd_size = local_wnd->availableSpace();
hdr->seq_nr = seq_nr;
hdr->ack_nr = ack_nr;
@@ -217,7 +219,7 @@
hdr->connection_id = send_connection_id;
hdr->timestamp_microseconds = tv.tv_usec;
hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+ hdr->wnd_size = local_wnd->availableSpace();
hdr->seq_nr = seq_nr;
hdr->ack_nr = ack_nr;
@@ -237,7 +239,7 @@
hdr->connection_id = send_connection_id;
hdr->timestamp_microseconds = tv.tv_usec;
hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+ hdr->wnd_size = local_wnd->availableSpace();
hdr->seq_nr = seq_nr;
hdr->ack_nr = ack_nr;
@@ -257,7 +259,7 @@
hdr->connection_id = send_connection_id;
hdr->timestamp_microseconds = tv.tv_usec;
hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+ hdr->wnd_size = local_wnd->availableSpace();
hdr->seq_nr = seq_nr;
hdr->ack_nr = ack_nr;
@@ -350,7 +352,7 @@
hdr->connection_id = send_connection_id;
hdr->timestamp_microseconds = now.microseconds;
hdr->timestamp_difference_microseconds = reply_micro;
- hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+ hdr->wnd_size = local_wnd->availableSpace();
hdr->seq_nr = seq_nr + 1;
hdr->ack_nr = ack_nr;
@@ -366,7 +368,7 @@
bt::Uint32 Connection::bytesAvailable() const
{
QMutexLocker lock(&mutex);
- return local_wnd->currentWindow();
+ return local_wnd->fill();
}
Uint32 Connection::recv(Uint8* buf, Uint32 max_len)
@@ -390,7 +392,7 @@
{
mutex.lock();
data_ready.wait(&mutex);
- bool ret = local_wnd->currentWindow() > 0;
+ bool ret = local_wnd->fill() > 0;
mutex.unlock();
return ret;
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.cpp #1077721:1077722
@@ -19,22 +19,107 @@
***************************************************************************/
#include "localwindow.h"
+#include "utpprotocol.h"
+#include <QtAlgorithms>
namespace utp
{
+ FuturePacket::FuturePacket(bt::Uint16 seq_nr, const bt::Uint8* data, bt::Uint32 size)
+ : seq_nr(seq_nr),data((const char*)data,size)
+ {
+ }
+
+ FuturePacket::~FuturePacket()
+ {
+ }
- LocalWindow::LocalWindow(bt::Uint32 cap) : bt::CircularBuffer(cap)
+ LocalWindow::LocalWindow(bt::Uint32 cap) : bt::CircularBuffer(cap),window_space(cap)
{
}
LocalWindow::~LocalWindow()
{
-
+ qDeleteAll(future_packets);
}
+
+
+ void LocalWindow::setLastSeqNr(bt::Uint16 lsn)
+ {
+ last_seq_nr = lsn;
+ }
+ bt::Uint32 LocalWindow::read(bt::Uint8* data, bt::Uint32 max_len)
+ {
+ bt::Uint32 ret = CircularBuffer::read(data, max_len);
+ window_space += ret;
+ checkFuturePackets();
+ return ret;
+ }
+
+
+ void LocalWindow::checkFuturePackets()
+ {
+ QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
+ while (itr != future_packets.end())
+ {
+ FuturePacket* pkt = *itr;
+ if (pkt->seq_nr == last_seq_nr + 1)
+ {
+ last_seq_nr = pkt->seq_nr;
+ write((const bt::Uint8*)pkt->data.data(),pkt->data.size());
+ delete pkt;
+ itr = future_packets.erase(itr);
+ }
+ else
+ break;
+ }
+ }
+
+ bool LocalWindow::packetReceived(const utp::Header* hdr,const bt::Uint8* data,bt::Uint32 size)
+ {
+ if (availableSpace() < size)
+ return false;
+
+ if (window_space < size || hdr->seq_nr != last_seq_nr + 1)
+ {
+ // insert the packet into the future_packets list
+ QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
+ while (itr != future_packets.end())
+ {
+ FuturePacket* pkt = *itr;
+ if (pkt->seq_nr <= hdr->seq_nr)
+ {
+ itr++;
+ }
+ else
+ {
+ // we have found a packet with a higher sequence number
+ // so insert
+ future_packets.insert(itr,new FuturePacket(hdr->seq_nr,data,size));
+ break;
+ }
+ }
+
+ // at the end and not inserted yet, so just append
+ if (itr == future_packets.end())
+ future_packets.append(new FuturePacket(hdr->seq_nr,data,size));
+
+ window_space -= size;
+ checkFuturePackets();
+ }
+ else
+ {
+ last_seq_nr = hdr->seq_nr;
+ write(data,size);
+ window_space -= size;
+ checkFuturePackets();
+ }
+
+ return true;
+ }
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.h #1077721:1077722
@@ -25,11 +25,24 @@
#include <btcore_export.h>
#include <util/constants.h>
#include <util/circularbuffer.h>
+#include <QLinkedList>
+#include <QByteArray>
namespace utp
{
+ struct Header;
+
const bt::Uint32 DEFAULT_CAPACITY = 64*1024;
+ struct FuturePacket
+ {
+ FuturePacket(bt::Uint16 seq_nr,const bt::Uint8* data,bt::Uint32 size);
+ ~FuturePacket();
+
+ bt::Uint16 seq_nr;
+ QByteArray data;
+ };
+
/**
Manages the local window of a UTP connection.
This is a circular buffer.
@@ -40,8 +53,32 @@
LocalWindow(bt::Uint32 cap = DEFAULT_CAPACITY);
virtual ~LocalWindow();
- bt::Uint32 maxWindow() const {return buffer_capacity;}
- bt::Uint32 currentWindow() const {return size;}
+ /// Get back the available space
+ bt::Uint32 availableSpace() const {return window_space;}
+
+ /// Get back how large the window is
+ bt::Uint32 currentWindow() const {return capacity() - window_space;}
+
+ /// A packet was received
+ bool packetReceived(const Header* hdr,const bt::Uint8* data,bt::Uint32 size);
+
+ /// Set the last sequence number
+ void setLastSeqNr(bt::Uint16 lsn);
+
+ /// Is the window empty
+ bool isEmpty() const {return future_packets.isEmpty() && fill() == 0;}
+
+ virtual bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
+
+ private:
+ void checkFuturePackets();
+
+ private:
+ bt::Uint16 last_seq_nr;
+ // all the packets which have been received but we can yet write to the output buffer
+ // due to either missing packets or lack of space
+ QLinkedList<FuturePacket*> future_packets;
+ bt::Uint32 window_space;
};
}
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.cpp #1077721:1077722
@@ -25,6 +25,7 @@
namespace utp
{
+
UnackedPacket::UnackedPacket(const QByteArray& data, bt::Uint16 seq_nr, const TimeValue& send_time)
: data(data),seq_nr(seq_nr),send_time(send_time)
{
@@ -45,7 +46,7 @@
qDeleteAll(unacked_packets);
}
- void RemoteWindow::packetReceived(const utp::Header* hdr,Connection* conn)
+ void RemoteWindow::packetReceived(const utp::Header* hdr,const SelectiveAck* sack,Connection* conn)
{
wnd_size = hdr->wnd_size;
@@ -65,6 +66,10 @@
else
break;
}
+
+ if (sack)
+ {
+ }
}
void RemoteWindow::addPacket(const QByteArray& data,bt::Uint16 seq_nr,const TimeValue & send_time)
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.h #1077721:1077722
@@ -30,6 +30,7 @@
namespace utp
{
+ struct SelectiveAck;
class Connection;
struct Header;
@@ -53,7 +54,7 @@
virtual ~RemoteWindow();
/// A packet was received (update window size and check for acks)
- void packetReceived(const Header* hdr,Connection* conn);
+ void packetReceived(const Header* hdr,const SelectiveAck* sack,Connection* conn);
/// Add a packet to the remote window (should include headers)
void addPacket(const QByteArray & data,bt::Uint16 seq_nr,const TimeValue & send_time);
--- trunk/extragear/network/ktorrent/libbtcore/utp/tests/localwindowtest.cpp #1077721:1077722
@@ -21,6 +21,7 @@
#include <QtTest>
#include <QObject>
#include <utp/localwindow.h>
+#include <utp/utpprotocol.h>
using namespace utp;
@@ -38,43 +39,171 @@
void testWrite()
{
- LocalWindow wnd(20);
- QVERIFY(wnd.maxWindow() == 20);
+ bt::CircularBuffer wnd(20);
+ QVERIFY(wnd.capacity() == 20);
QVERIFY(wnd.write(data,13) == 13);
- QVERIFY(wnd.currentWindow() == 13);
+ QVERIFY(wnd.fill() == 13);
QVERIFY(wnd.write(data2,6) == 6);
- QVERIFY(wnd.currentWindow() == 19);
+ QVERIFY(wnd.fill() == 19);
QVERIFY(wnd.write(data2,6) == 1);
- QVERIFY(wnd.currentWindow() == 20);
+ QVERIFY(wnd.fill() == 20);
}
void testRead()
{
- LocalWindow wnd(20);
- QVERIFY(wnd.maxWindow() == 20);
+ bt::CircularBuffer wnd(20);
+ QVERIFY(wnd.capacity() == 20);
QVERIFY(wnd.write(data,13) == 13);
QVERIFY(wnd.write(data2,6) == 6);
bt::Uint8 ret[19];
QVERIFY(wnd.read(ret,19) == 19);
- QVERIFY(wnd.currentWindow() == 0);
+ QVERIFY(wnd.fill() == 0);
QVERIFY(memcmp(ret,data,13) == 0);
QVERIFY(memcmp(ret+13,data2,6) == 0);
QVERIFY(wnd.write(data,13) == 13);
- QVERIFY(wnd.currentWindow() == 13);
+ QVERIFY(wnd.fill() == 13);
QVERIFY(wnd.write(data2,6) == 6);
- QVERIFY(wnd.currentWindow() == 19);
+ QVERIFY(wnd.fill() == 19);
QVERIFY(wnd.read(ret,19) == 19);
- QVERIFY(wnd.currentWindow() == 0);
+ QVERIFY(wnd.fill() == 0);
QVERIFY(memcmp(ret,data,13) == 0);
QVERIFY(memcmp(ret+13,data2,6) == 0);
}
+
+ void testLocalWindow()
+ {
+ bt::Uint8 wdata[1000];
+ memset(wdata,0,1000);
+
+ utp::Header hdr;
+ utp::LocalWindow wnd(1000);
+ wnd.setLastSeqNr(1);
+
+ // write 500 bytes to it
+ hdr.seq_nr = 2;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,500) == true);
+ QVERIFY(wnd.availableSpace() == 500);
+ QVERIFY(wnd.currentWindow() == 500);
+
+ // write another 100 to it
+ hdr.seq_nr++;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,100) == true);
+ QVERIFY(wnd.availableSpace() == 400);
+ QVERIFY(wnd.currentWindow() == 600);
+
+ // read 300 from it
+ QVERIFY(wnd.read(wdata,300) == 300);
+ QVERIFY(wnd.availableSpace() == 700);
+ QVERIFY(wnd.currentWindow() == 300);
+ }
+
+ void testPacketLoss()
+ {
+ bt::Uint8 wdata[1000];
+ memset(wdata,0,1000);
+
+ utp::Header hdr;
+ utp::LocalWindow wnd(1000);
+ wnd.setLastSeqNr(1);
+
+ // write 500 bytes to it
+ hdr.seq_nr = 2;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,500) == true);
+ QVERIFY(wnd.availableSpace() == 500);
+ QVERIFY(wnd.currentWindow() == 500);
+
+ // write 100 bytes to it bit with the wrong sequence number
+ hdr.seq_nr = 4;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,100) == true);
+ QVERIFY(wnd.availableSpace() == 400);
+ QVERIFY(wnd.currentWindow() == 600);
+ QVERIFY(wnd.fill() == 500);
+
+ // Try to read all of it, but we should only get back 500
+ QVERIFY(wnd.read(wdata,600) == 500);
+ QVERIFY(wnd.availableSpace() == 900);
+ QVERIFY(wnd.currentWindow() == 100);
+ QVERIFY(wnd.fill() == 0);
+
+ // write the missing packet
+ hdr.seq_nr = 3;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,100) == true);
+ QVERIFY(wnd.availableSpace() == 800);
+ QVERIFY(wnd.currentWindow() == 200);
+ QVERIFY(wnd.fill() == 200);
+ }
+
+ void testPacketLoss2()
+ {
+ bt::Uint8 wdata[1000];
+ memset(wdata,0,1000);
+
+ utp::Header hdr;
+ utp::LocalWindow wnd(1000);
+ wnd.setLastSeqNr(0);
+
+ // first write first and last packet
+ bt::Uint32 step = 200;
+ hdr.seq_nr = 1;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - step);
+ QVERIFY(wnd.currentWindow() == step);
+ hdr.seq_nr = 5;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 2*step);
+ QVERIFY(wnd.currentWindow() == 2*step);
+ QVERIFY(wnd.fill() == step);
+
+ // Now write 4
+ hdr.seq_nr = 4;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 3*step);
+ QVERIFY(wnd.currentWindow() == 3*step);
+ QVERIFY(wnd.fill() == step);
+
+ // And then 3
+ hdr.seq_nr = 3;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 4*step);
+ QVERIFY(wnd.currentWindow() == 4*step);
+ QVERIFY(wnd.fill() == step);
+
+ // And then 2
+ hdr.seq_nr = 2;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+ QVERIFY(wnd.availableSpace() == wnd.capacity() - 5*step);
+ QVERIFY(wnd.currentWindow() == 5*step);
+ QVERIFY(wnd.fill() == 5*step);
+ }
+
+ void testToMuchData()
+ {
+ bt::Uint8 wdata[1000];
+ memset(wdata,0,1000);
+
+ utp::Header hdr;
+ utp::LocalWindow wnd(500);
+ wnd.setLastSeqNr(1);
+
+ // write 500 bytes to it
+ hdr.seq_nr = 2;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,500) == true);
+ QVERIFY(wnd.availableSpace() == 0);
+ QVERIFY(wnd.currentWindow() == 500);
+
+ // writing more data should now have no effect at all
+ hdr.seq_nr = 3;
+ QVERIFY(wnd.packetReceived(&hdr,wdata,500) == false);
+ QVERIFY(wnd.availableSpace() == 0);
+ QVERIFY(wnd.currentWindow() == 500);
+ }
private:
bt::Uint8 data[13];
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpprotocol.h #1077721:1077722
@@ -66,7 +66,7 @@
{
quint8 extension;
quint8 length;
- quint32 bitmask;
+ quint8 bitmask[4];
};
struct ExtensionBits
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic