[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    extragear/network/ktorrent/libbtcore
From:       Joris Guisson <joris.guisson () gmail ! com>
Date:       2010-01-20 19:08:07
Message-ID: 1264014487.758972.20236.nullmailer () svn ! kde ! org
[Download RAW message or body]

SVN commit 1077722 by guisson:

UTP: LocalWindow can now handle packet loss

 M  +2 -2      util/circularbuffer.h  
 M  +12 -10    utp/connection.cpp  
 M  +87 -2     utp/localwindow.cpp  
 M  +39 -2     utp/localwindow.h  
 M  +6 -1      utp/remotewindow.cpp  
 M  +2 -1      utp/remotewindow.h  
 M  +140 -11   utp/tests/localwindowtest.cpp  
 M  +1 -1      utp/utpprotocol.h  


--- trunk/extragear/network/ktorrent/libbtcore/util/circularbuffer.h #1077721:1077722
@@ -42,7 +42,7 @@
 			@param max_len Maximum amount to read
 			@return The amount read
 		*/
-		bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
+		virtual bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
 		
 		/**
 			Write up to len bytes from data and store it in the window.
@@ -50,7 +50,7 @@
 			@param max_len Amount to write
 			@return The amount written
 		*/
-		bt::Uint32 write(const bt::Uint8* data,bt::Uint32 len);
+		virtual bt::Uint32 write(const bt::Uint8* data,bt::Uint32 len);
 		
 		/// Get the buffer capacity
 		bt::Uint32 capacity() const {return buffer_capacity;}
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.cpp #1077721:1077722
@@ -85,7 +85,7 @@
 		DumpHeader(*hdr);
 		
 		updateDelayMeasurement(hdr);
-		remote_wnd->packetReceived(hdr,this);
+		remote_wnd->packetReceived(hdr,sack,this);
 		ack_nr = hdr->seq_nr;
 		last_ack_nr = hdr->ack_nr;
 		switch (state)
@@ -96,6 +96,7 @@
 				{
 					// connection estabished
 					state = CS_CONNECTED;
+					local_wnd->setLastSeqNr(hdr->seq_nr);
 					Out(SYS_CON|LOG_NOTICE) << "UTP: established connection with " << remote.toString() << endl;
 					connected.wakeAll();
 				}
@@ -111,6 +112,7 @@
 					// Send back a state packet
 					sendState();
 					state = CS_CONNECTED;
+					local_wnd->setLastSeqNr(hdr->seq_nr);
 					Out(SYS_CON|LOG_NOTICE) << "UTP: established connection with " << remote.toString() << endl;
 				}
 				else
@@ -124,7 +126,7 @@
 				{
 					// push data into local window
 					int s = packet.size() - data_off;
-					local_wnd->write((const bt::Uint8*)packet.data() + data_off,s);
+					local_wnd->packetReceived(hdr,(const bt::Uint8*)packet.data() + data_off,s);
 					data_ready.wakeAll();
 					
 					// send back an ACK 
@@ -152,7 +154,7 @@
 					// Check if we need to go to the closed state
 					// We can do this if all our packets have been acked and the local window
 					// has been fully read
-					if (remote_wnd->allPacketsAcked() && local_wnd->currentWindow() == 0)
+					if (remote_wnd->allPacketsAcked() && local_wnd->isEmpty())
 					{
 						state = CS_CLOSED;
 						data_ready.wakeAll();
@@ -197,7 +199,7 @@
 		hdr->connection_id = recv_connection_id;
 		hdr->timestamp_microseconds = tv.tv_usec;
 		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+		hdr->wnd_size = local_wnd->availableSpace();
 		hdr->seq_nr = seq_nr;
 		hdr->ack_nr = ack_nr;
 		
@@ -217,7 +219,7 @@
 		hdr->connection_id = send_connection_id;
 		hdr->timestamp_microseconds = tv.tv_usec;
 		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+		hdr->wnd_size = local_wnd->availableSpace();
 		hdr->seq_nr = seq_nr;
 		hdr->ack_nr = ack_nr;
 		
@@ -237,7 +239,7 @@
 		hdr->connection_id = send_connection_id;
 		hdr->timestamp_microseconds = tv.tv_usec;
 		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+		hdr->wnd_size = local_wnd->availableSpace();
 		hdr->seq_nr = seq_nr;
 		hdr->ack_nr = ack_nr;
 		
@@ -257,7 +259,7 @@
 		hdr->connection_id = send_connection_id;
 		hdr->timestamp_microseconds = tv.tv_usec;
 		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+		hdr->wnd_size = local_wnd->availableSpace();
 		hdr->seq_nr = seq_nr;
 		hdr->ack_nr = ack_nr;
 		
@@ -350,7 +352,7 @@
 		hdr->connection_id = send_connection_id;
 		hdr->timestamp_microseconds = now.microseconds;
 		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
+		hdr->wnd_size = local_wnd->availableSpace();
 		hdr->seq_nr = seq_nr + 1;
 		hdr->ack_nr = ack_nr;
 		
@@ -366,7 +368,7 @@
 	bt::Uint32 Connection::bytesAvailable() const
 	{
 		QMutexLocker lock(&mutex);
-		return local_wnd->currentWindow();
+		return local_wnd->fill();
 	}
 
 	Uint32 Connection::recv(Uint8* buf, Uint32 max_len)
@@ -390,7 +392,7 @@
 	{
 		mutex.lock();
 		data_ready.wait(&mutex);
-		bool ret = local_wnd->currentWindow() > 0;
+		bool ret = local_wnd->fill() > 0;
 		mutex.unlock();
 		return ret;
 	}
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.cpp #1077721:1077722
@@ -19,22 +19,107 @@
  ***************************************************************************/
 
 #include "localwindow.h"
+#include "utpprotocol.h"
+#include <QtAlgorithms>
 
 namespace utp
 {
 
+	FuturePacket::FuturePacket(bt::Uint16 seq_nr, const bt::Uint8* data, bt::Uint32 size) 
+		: seq_nr(seq_nr),data((const char*)data,size)
+	{
+	}
+
+	FuturePacket::~FuturePacket()
+	{
+	}
 	
-	LocalWindow::LocalWindow(bt::Uint32 cap) : bt::CircularBuffer(cap)
+	LocalWindow::LocalWindow(bt::Uint32 cap) : bt::CircularBuffer(cap),window_space(cap)
 	{
 		
 	}
 
 	LocalWindow::~LocalWindow()
 	{
-		
+		qDeleteAll(future_packets);
 	}
+	
+	
+	void LocalWindow::setLastSeqNr(bt::Uint16 lsn)
+	{
+		last_seq_nr = lsn;
+	}
 
+	bt::Uint32 LocalWindow::read(bt::Uint8* data, bt::Uint32 max_len)
+	{
+		bt::Uint32 ret = CircularBuffer::read(data, max_len);
+		window_space += ret;
+		checkFuturePackets();
+		return ret;
+	}
+
+
+	void LocalWindow::checkFuturePackets()
+	{
+		QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
+		while (itr != future_packets.end())
+		{
+			FuturePacket* pkt = *itr;
+			if (pkt->seq_nr == last_seq_nr + 1)
+			{
+				last_seq_nr = pkt->seq_nr;
+				write((const bt::Uint8*)pkt->data.data(),pkt->data.size());
+				delete pkt;
+				itr = future_packets.erase(itr);
+			}
+			else
+				break;
+		}
+	}
+
 	
+	bool LocalWindow::packetReceived(const utp::Header* hdr,const bt::Uint8* data,bt::Uint32 size)
+	{
+		if (availableSpace() < size)
+			return false;
+		
+		if (window_space < size || hdr->seq_nr != last_seq_nr + 1)
+		{
+			// insert the packet into the future_packets list
+			QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
+			while (itr != future_packets.end())
+			{
+				FuturePacket* pkt = *itr;
+				if (pkt->seq_nr <= hdr->seq_nr)
+				{
+					itr++;
+				}
+				else
+				{
+					// we have found a packet with a higher sequence number
+					// so insert
+					future_packets.insert(itr,new FuturePacket(hdr->seq_nr,data,size));
+					break;
+				}
+			}
+			
+			// at the end and not inserted yet, so just append
+			if (itr == future_packets.end())
+				future_packets.append(new FuturePacket(hdr->seq_nr,data,size));
+		
+			window_space -= size;
+			checkFuturePackets();
+		}
+		else
+		{
+			last_seq_nr = hdr->seq_nr;
+			write(data,size);
+			window_space -= size;
+			checkFuturePackets();
+		}
+		
+		return true;
+	}
 
 }
 
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.h #1077721:1077722
@@ -25,11 +25,24 @@
 #include <btcore_export.h>
 #include <util/constants.h>
 #include <util/circularbuffer.h>
+#include <QLinkedList>
+#include <QByteArray>
 
 namespace utp
 {
+	struct Header;
+	
 	const bt::Uint32 DEFAULT_CAPACITY = 64*1024;
 	
+	struct FuturePacket
+	{
+		FuturePacket(bt::Uint16 seq_nr,const bt::Uint8* data,bt::Uint32 size);
+		~FuturePacket();
+		
+		bt::Uint16 seq_nr;
+		QByteArray data;
+	};
+	
 	/**
 		Manages the local window of a UTP connection.
 		This is a circular buffer.
@@ -40,8 +53,32 @@
 		LocalWindow(bt::Uint32 cap = DEFAULT_CAPACITY);
 		virtual ~LocalWindow();
 		
-		bt::Uint32 maxWindow() const {return buffer_capacity;}
-		bt::Uint32 currentWindow() const {return size;}
+		/// Get back the available space
+		bt::Uint32 availableSpace() const {return window_space;}
+		
+		/// Get back how large the window is
+		bt::Uint32 currentWindow() const {return capacity() - window_space;}
+	
+		/// A packet was received
+		bool packetReceived(const Header* hdr,const bt::Uint8* data,bt::Uint32 size);
+		
+		/// Set the last sequence number
+		void setLastSeqNr(bt::Uint16 lsn);
+		
+		/// Is the window empty
+		bool isEmpty() const {return future_packets.isEmpty() && fill() == 0;}
+		
+		virtual bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
+		
+	private:
+		void checkFuturePackets();
+		
+	private:
+		bt::Uint16 last_seq_nr;
+		// all the packets which have been received but we can yet write to the output buffer
+		// due to either missing packets or lack of space
+		QLinkedList<FuturePacket*> future_packets;
+		bt::Uint32 window_space;
 	};
 
 }
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.cpp #1077721:1077722
@@ -25,6 +25,7 @@
 namespace utp
 {
 	
+	
 	UnackedPacket::UnackedPacket(const QByteArray& data, bt::Uint16 seq_nr, const TimeValue& send_time) 
 		: data(data),seq_nr(seq_nr),send_time(send_time)
 	{
@@ -45,7 +46,7 @@
 		qDeleteAll(unacked_packets);
 	}
 
-	void RemoteWindow::packetReceived(const utp::Header* hdr,Connection* conn)
+	void RemoteWindow::packetReceived(const utp::Header* hdr,const SelectiveAck* sack,Connection* conn)
 	{
 		wnd_size = hdr->wnd_size;
 		
@@ -65,6 +66,10 @@
 			else
 				break;
 		}
+		
+		if (sack)
+		{
+		}
 	}
 
 	void RemoteWindow::addPacket(const QByteArray& data,bt::Uint16 seq_nr,const TimeValue & send_time)
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.h #1077721:1077722
@@ -30,6 +30,7 @@
 
 namespace utp
 {
+	struct SelectiveAck;
 	class Connection;
 	struct Header;
 	
@@ -53,7 +54,7 @@
 		virtual ~RemoteWindow();
 		
 		/// A packet was received (update window size and check for acks)
-		void packetReceived(const Header* hdr,Connection* conn);
+		void packetReceived(const Header* hdr,const SelectiveAck* sack,Connection* conn);
 		
 		/// Add a packet to the remote window (should include headers)
 		void addPacket(const QByteArray & data,bt::Uint16 seq_nr,const TimeValue & send_time);
--- trunk/extragear/network/ktorrent/libbtcore/utp/tests/localwindowtest.cpp #1077721:1077722
@@ -21,6 +21,7 @@
 #include <QtTest>
 #include <QObject>
 #include <utp/localwindow.h>
+#include <utp/utpprotocol.h>
 
 using namespace utp;
 
@@ -38,43 +39,171 @@
 	
 	void testWrite()
 	{
-		LocalWindow wnd(20);
-		QVERIFY(wnd.maxWindow() == 20);
+		bt::CircularBuffer wnd(20);
+		QVERIFY(wnd.capacity() == 20);
 		
 		QVERIFY(wnd.write(data,13) == 13);
-		QVERIFY(wnd.currentWindow() == 13);
+		QVERIFY(wnd.fill() == 13);
 		
 		QVERIFY(wnd.write(data2,6) == 6);
-		QVERIFY(wnd.currentWindow() == 19);
+		QVERIFY(wnd.fill() == 19);
 		
 		QVERIFY(wnd.write(data2,6) == 1);
-		QVERIFY(wnd.currentWindow() == 20);
+		QVERIFY(wnd.fill() == 20);
 	}
 	
 	void testRead()
 	{
-		LocalWindow wnd(20);
-		QVERIFY(wnd.maxWindow() == 20);
+		bt::CircularBuffer wnd(20);
+		QVERIFY(wnd.capacity() == 20);
 		QVERIFY(wnd.write(data,13) == 13);
 		QVERIFY(wnd.write(data2,6) == 6);
 		
 		bt::Uint8 ret[19];
 		QVERIFY(wnd.read(ret,19) == 19);
-		QVERIFY(wnd.currentWindow() == 0);
+		QVERIFY(wnd.fill() == 0);
 		QVERIFY(memcmp(ret,data,13) == 0);
 		QVERIFY(memcmp(ret+13,data2,6) == 0);
 		
 		QVERIFY(wnd.write(data,13) == 13);
-		QVERIFY(wnd.currentWindow() == 13);
+		QVERIFY(wnd.fill() == 13);
 		
 		QVERIFY(wnd.write(data2,6) == 6);
-		QVERIFY(wnd.currentWindow() == 19);
+		QVERIFY(wnd.fill() == 19);
 		
 		QVERIFY(wnd.read(ret,19) == 19);
-		QVERIFY(wnd.currentWindow() == 0);
+		QVERIFY(wnd.fill() == 0);
 		QVERIFY(memcmp(ret,data,13) == 0);
 		QVERIFY(memcmp(ret+13,data2,6) == 0);
 	}
+	
+	void testLocalWindow()
+	{
+		bt::Uint8 wdata[1000];
+		memset(wdata,0,1000);
+		
+		utp::Header hdr;
+		utp::LocalWindow wnd(1000);
+		wnd.setLastSeqNr(1);
+		
+		// write 500 bytes to it
+		hdr.seq_nr = 2;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,500) == true);
+		QVERIFY(wnd.availableSpace() == 500);
+		QVERIFY(wnd.currentWindow() == 500);
+		
+		// write another 100 to it
+		hdr.seq_nr++;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,100) == true);
+		QVERIFY(wnd.availableSpace() == 400);
+		QVERIFY(wnd.currentWindow() == 600);
+		
+		// read 300 from it
+		QVERIFY(wnd.read(wdata,300) == 300);
+		QVERIFY(wnd.availableSpace() == 700);
+		QVERIFY(wnd.currentWindow() == 300);
+	}
+	
+	void testPacketLoss()
+	{
+		bt::Uint8 wdata[1000];
+		memset(wdata,0,1000);
+		
+		utp::Header hdr;
+		utp::LocalWindow wnd(1000);
+		wnd.setLastSeqNr(1);
+		
+		// write 500 bytes to it
+		hdr.seq_nr = 2;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,500) == true);
+		QVERIFY(wnd.availableSpace() == 500);
+		QVERIFY(wnd.currentWindow() == 500);
+		
+		// write 100 bytes to it bit with the wrong sequence number
+		hdr.seq_nr = 4;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,100) == true);
+		QVERIFY(wnd.availableSpace() == 400);
+		QVERIFY(wnd.currentWindow() == 600);
+		QVERIFY(wnd.fill() == 500);
+		
+		// Try to read all of it, but we should only get back 500
+		QVERIFY(wnd.read(wdata,600) == 500);
+		QVERIFY(wnd.availableSpace() == 900);
+		QVERIFY(wnd.currentWindow() == 100);
+		QVERIFY(wnd.fill() == 0);
+		
+		// write the missing packet
+		hdr.seq_nr = 3;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,100) == true);
+		QVERIFY(wnd.availableSpace() == 800);
+		QVERIFY(wnd.currentWindow() == 200);
+		QVERIFY(wnd.fill() == 200);
+	}
+	
+	void testPacketLoss2()
+	{
+		bt::Uint8 wdata[1000];
+		memset(wdata,0,1000);
+		
+		utp::Header hdr;
+		utp::LocalWindow wnd(1000);
+		wnd.setLastSeqNr(0);
+		
+		// first write first and last packet
+		bt::Uint32 step = 200;
+		hdr.seq_nr = 1;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - step);
+		QVERIFY(wnd.currentWindow() == step);
+		hdr.seq_nr = 5;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 2*step);
+		QVERIFY(wnd.currentWindow() == 2*step);
+		QVERIFY(wnd.fill() == step);
+		
+		// Now write 4
+		hdr.seq_nr = 4;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 3*step);
+		QVERIFY(wnd.currentWindow() == 3*step);
+		QVERIFY(wnd.fill() == step);
+		
+		// And then 3
+		hdr.seq_nr = 3;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 4*step);
+		QVERIFY(wnd.currentWindow() == 4*step);
+		QVERIFY(wnd.fill() == step);
+		
+		// And then 2
+		hdr.seq_nr = 2;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 5*step);
+		QVERIFY(wnd.currentWindow() == 5*step);
+		QVERIFY(wnd.fill() == 5*step);
+	}
+	
+	void testToMuchData()
+	{
+		bt::Uint8 wdata[1000];
+		memset(wdata,0,1000);
+		
+		utp::Header hdr;
+		utp::LocalWindow wnd(500);
+		wnd.setLastSeqNr(1);
+		
+		// write 500 bytes to it
+		hdr.seq_nr = 2;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,500) == true);
+		QVERIFY(wnd.availableSpace() == 0);
+		QVERIFY(wnd.currentWindow() == 500);
+		
+		// writing more data should now have no effect at all
+		hdr.seq_nr = 3;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,500) == false);
+		QVERIFY(wnd.availableSpace() == 0);
+		QVERIFY(wnd.currentWindow() == 500);
+	}
 
 private:
 	bt::Uint8 data[13];
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpprotocol.h #1077721:1077722
@@ -66,7 +66,7 @@
 	{
 		quint8 extension;
 		quint8 length;
-		quint32 bitmask;
+		quint8 bitmask[4];
 	};
 	
 	struct ExtensionBits
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic