[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    extragear/network/ktorrent/libbtcore
From:       Joris Guisson <joris.guisson () gmail ! com>
Date:       2010-01-21 18:19:39
Message-ID: 1264097979.483680.5416.nullmailer () svn ! kde ! org
[Download RAW message or body]

SVN commit 1078186 by guisson:

UTP: Add selective ack extension support

 M  +1 -0      CMakeLists.txt  
 M  +56 -69    utp/connection.cpp  
 M  +3 -1      utp/connection.h  
 M  +25 -1     utp/localwindow.cpp  
 M  +11 -1     utp/localwindow.h  
 M  +13 -5     utp/remotewindow.cpp  
 M  +77 -0     utp/tests/localwindowtest.cpp  
 A             utp/utpprotocol.cpp   [License: GPL (v2+)]
 M  +10 -0     utp/utpprotocol.h  


--- trunk/extragear/network/ktorrent/libbtcore/CMakeLists.txt #1078185:1078186
@@ -201,6 +201,7 @@
 	utp/remotewindow.cpp
 	utp/utpsocket.cpp
 	utp/timevalue.cpp
+	utp/utpprotocol.cpp
 	
 	btversion.cpp
 )
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.cpp #1078185:1078186
@@ -110,9 +110,9 @@
 				if (hdr->type == ST_SYN)
 				{
 					// Send back a state packet
+					local_wnd->setLastSeqNr(hdr->seq_nr);
 					sendState();
 					state = CS_CONNECTED;
-					local_wnd->setLastSeqNr(hdr->seq_nr);
 					Out(SYS_CON|LOG_NOTICE) << "UTP: established connection with " << remote.toString() << endl;
 				}
 				else
@@ -127,7 +127,8 @@
 					// push data into local window
 					int s = packet.size() - data_off;
 					local_wnd->packetReceived(hdr,(const bt::Uint8*)packet.data() + data_off,s);
-					data_ready.wakeAll();
+					if (local_wnd->fill() > 0)
+						data_ready.wakeAll();
 					
 					// send back an ACK 
 					sendStateOrData();
@@ -181,89 +182,62 @@
 		timeout = qMin(rtt + rtt_var * 4, (bt::Uint32)500);
 	}
 
-
-	void Connection::sendSYN()
+	
+	int Connection::sendPacket(Uint32 type,Uint16 p_ack_nr)
 	{
-		seq_nr = 1;
-		ack_nr = 0;
-		state = CS_SYN_SENT;
-		
 		struct timeval tv;
 		gettimeofday(&tv,NULL);
 		
-		QByteArray ba(sizeof(Header),0);
+		bt::Uint32 extension_length = 0;
+		bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
+		if (sack_bits > 0)
+			extension_length += 2 + qMin(sack_bits / 8,(bt::Uint32)4);
+		
+		QByteArray ba(sizeof(Header) + extension_length,0);
 		Header* hdr = (Header*)ba.data();
 		hdr->version = 1;
-		hdr->type = ST_SYN;
-		hdr->extension = 0;
-		hdr->connection_id = recv_connection_id;
+		hdr->type = type;
+		hdr->extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
+		hdr->connection_id = type == ST_SYN ? recv_connection_id : send_connection_id;
 		hdr->timestamp_microseconds = tv.tv_usec;
 		hdr->timestamp_difference_microseconds = reply_micro;
 		hdr->wnd_size = local_wnd->availableSpace();
 		hdr->seq_nr = seq_nr;
-		hdr->ack_nr = ack_nr;
+		hdr->ack_nr = p_ack_nr;
 		
-		srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
+		if (extension_length > 0)
+		{
+			SelectiveAck* sack = (SelectiveAck*)(ba.data() + sizeof(Header));
+			sack->extension = 0;
+			sack->length = extension_length - 2;
+			local_wnd->fillSelectiveAck(sack);
+		}
+		
+		return srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
 	}
+
+
+	void Connection::sendSYN()
+	{
+		seq_nr = 1;
+		ack_nr = 0;
+		state = CS_SYN_SENT;
+		sendPacket(ST_SYN,0);
+	}
 	
 	void Connection::sendState()
 	{
-		struct timeval tv;
-		gettimeofday(&tv,NULL);
-		
-		QByteArray ba(sizeof(Header),0);
-		Header* hdr = (Header*)ba.data();
-		hdr->version = 1;
-		hdr->type = ST_STATE;
-		hdr->extension = 0;
-		hdr->connection_id = send_connection_id;
-		hdr->timestamp_microseconds = tv.tv_usec;
-		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->availableSpace();
-		hdr->seq_nr = seq_nr;
-		hdr->ack_nr = ack_nr;
-		
-		srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
+		sendPacket(ST_STATE,local_wnd->lastSeqNr());
 	}
 
 	void Connection::sendFIN()
 	{
-		struct timeval tv;
-		gettimeofday(&tv,NULL);
-		
-		QByteArray ba(sizeof(Header),0);
-		Header* hdr = (Header*)ba.data();
-		hdr->version = 1;
-		hdr->type = ST_FIN;
-		hdr->extension = 0;
-		hdr->connection_id = send_connection_id;
-		hdr->timestamp_microseconds = tv.tv_usec;
-		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->availableSpace();
-		hdr->seq_nr = seq_nr;
-		hdr->ack_nr = ack_nr;
-		
-		srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
+		sendPacket(ST_FIN,local_wnd->lastSeqNr());
 	}
 
 	void Connection::sendReset()
 	{
-		struct timeval tv;
-		gettimeofday(&tv,NULL);
-		
-		QByteArray ba(sizeof(Header),0);
-		Header* hdr = (Header*)ba.data();
-		hdr->version = 1;
-		hdr->type = ST_RESET;
-		hdr->extension = 0;
-		hdr->connection_id = send_connection_id;
-		hdr->timestamp_microseconds = tv.tv_usec;
-		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->availableSpace();
-		hdr->seq_nr = seq_nr;
-		hdr->ack_nr = ack_nr;
-		
-		srv->sendTo(ba,remote);
+		sendPacket(ST_RESET,local_wnd->lastSeqNr());
 	}
 
 	void Connection::waitForSYN()
@@ -294,7 +268,7 @@
 		while (data_off < packet.size() && ptr->extension != 0)
 		{
 			ptr = (UnknownExtension*)packet.data() + data_off;
-			if (ext_id == 1)
+			if (ext_id == SELECTIVE_ACK_ID)
 				*selective_ack = (SelectiveAck*)ptr;
 				
 			data_off += 2 + ptr->length;
@@ -327,7 +301,7 @@
 			
 			QByteArray packet(to_read,0);
 			output_buffer.read((bt::Uint8*)packet.data(),to_read);
-			doSend(packet);
+			sendDataPacket(packet);
 		}
 	}
 
@@ -339,24 +313,37 @@
 			sendState();
 	}
 
-	int Connection::doSend(const QByteArray& packet)
+	int Connection::sendDataPacket(const QByteArray& packet)
 	{
 		bt::Uint32 to_send = packet.size();
 		TimeValue now;
 		
-		QByteArray ba(sizeof(Header) + packet.size(),0);
+		bt::Uint32 extension_length = 0;
+		bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
+		if (sack_bits > 0)
+			extension_length += 2 + qMin(sack_bits / 8,(bt::Uint32)4);
+		
+		QByteArray ba(sizeof(Header) + extension_length + packet.size(),0);
 		Header* hdr = (Header*)ba.data();
 		hdr->version = 1;
 		hdr->type = ST_DATA;
-		hdr->extension = 0;
+		hdr->extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
 		hdr->connection_id = send_connection_id;
 		hdr->timestamp_microseconds = now.microseconds;
 		hdr->timestamp_difference_microseconds = reply_micro;
 		hdr->wnd_size = local_wnd->availableSpace();
 		hdr->seq_nr = seq_nr + 1;
-		hdr->ack_nr = ack_nr;
+		hdr->ack_nr = local_wnd->lastSeqNr();
 		
-		memcpy(ba.data() + sizeof(Header),packet.data(),to_send);
+		if (extension_length > 0)
+		{
+			SelectiveAck* sack = (SelectiveAck*)(ba.data() + sizeof(Header));
+			sack->extension = 0;
+			sack->length = extension_length - 2;
+			local_wnd->fillSelectiveAck(sack);
+		}
+		
+		memcpy(ba.data() + sizeof(Header) + extension_length,packet.data(),to_send);
 		if (!srv->sendTo(ba,remote))
 			return -1;
 		
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.h #1078185:1078186
@@ -96,8 +96,9 @@
 		void sendFIN();
 		void updateDelayMeasurement(const Header* hdr);
 		void sendStateOrData();
-		int doSend(const QByteArray & packet);
 		void sendPackets();
+		int sendPacket(bt::Uint32 type,bt::Uint16 p_ack_nr);
+		int sendDataPacket(const QByteArray & packet);
 		
 		/** 
 			Parses the packet, and retrieves pointer to the header, the SelectiveAck extension (if present)
@@ -108,6 +109,7 @@
 		*/
 		int parsePacket(const QByteArray & packet,Header** hdr,SelectiveAck** selective_ack);
 		
+		
 	private:
 		Type type;
 		UTPServer* srv;
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.cpp #1078185:1078186
@@ -83,7 +83,7 @@
 		if (availableSpace() < size)
 			return false;
 		
-		if (window_space < size || hdr->seq_nr != last_seq_nr + 1)
+		if (hdr->seq_nr != last_seq_nr + 1)
 		{
 			// insert the packet into the future_packets list
 			QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
@@ -121,5 +121,29 @@
 		return true;
 	}
 
+	
+	bt::Uint32 LocalWindow::selectiveAckBits() const
+	{
+		if (future_packets.isEmpty())
+			return 0;
+		else
+			return future_packets.last()->seq_nr - last_seq_nr - 1;
+	}
+
+
+	void LocalWindow::fillSelectiveAck(SelectiveAck* sack)
+	{
+		// First turn off all bits
+		bt::Uint8* bitset = (bt::Uint8*)sack + 2;
+		memset(bitset,0,sack->length);
+		
+		QLinkedList<FuturePacket*>::iterator itr = future_packets.begin();
+		while (itr != future_packets.end())
+		{
+			Acked(sack,(*itr)->seq_nr - last_seq_nr);
+			itr++;
+		}
+	}
+
 }
 
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.h #1078185:1078186
@@ -30,6 +30,7 @@
 
 namespace utp
 {
+	struct SelectiveAck;
 	struct Header;
 	
 	const bt::Uint32 DEFAULT_CAPACITY = 64*1024;
@@ -65,18 +66,27 @@
 		/// Set the last sequence number
 		void setLastSeqNr(bt::Uint16 lsn);
 		
+		/// Get the last sequence number we can safely ack
+		bt::Uint16 lastSeqNr() const {return last_seq_nr;}
+		
 		/// Is the window empty
 		bool isEmpty() const {return future_packets.isEmpty() && fill() == 0;}
 		
 		virtual bt::Uint32 read(bt::Uint8* data,bt::Uint32 max_len);
 		
+		/// Get the number of selective ack bits needed when sending a packet
+		bt::Uint32 selectiveAckBits() const;
+		
+		/// Fill a SelectiveAck structure
+		void fillSelectiveAck(SelectiveAck* sack);
+		
 	private:
 		void checkFuturePackets();
 		
 	private:
 		bt::Uint16 last_seq_nr;
 		// all the packets which have been received but we can yet write to the output buffer
-		// due to either missing packets or lack of space
+		// due to missing packets
 		QLinkedList<FuturePacket*> future_packets;
 		bt::Uint32 window_space;
 	};
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.cpp #1078185:1078186
@@ -51,25 +51,33 @@
 		wnd_size = hdr->wnd_size;
 		
 		TimeValue now;
-		// everything up until the ack_nr in the header should now have been acked
 		QList<UnackedPacket*>::iterator i = unacked_packets.begin();
 		while (i != unacked_packets.end())
 		{
 			UnackedPacket* up = *i;
 			if (up->seq_nr <= hdr->ack_nr)
 			{
+				// everything up until the ack_nr in the header is acked
 				conn->updateRTT(hdr,now - up->send_time);
 				cur_window -= up->data.size();
 				delete up;
 				i = unacked_packets.erase(i);
 			}
+			else if (sack)
+			{
+				if (Acked(sack,up->seq_nr - hdr->ack_nr))
+				{
+					conn->updateRTT(hdr,now - up->send_time);
+					cur_window -= up->data.size();
+					delete up;
+					i = unacked_packets.erase(i);
+				}
+				else
+					i++;
+			}
 			else
 				break;
 		}
-		
-		if (sack)
-		{
-		}
 	}
 
 	void RemoteWindow::addPacket(const QByteArray& data,bt::Uint16 seq_nr,const TimeValue & send_time)
--- trunk/extragear/network/ktorrent/libbtcore/utp/tests/localwindowtest.cpp #1078185:1078186
@@ -204,6 +204,83 @@
 		QVERIFY(wnd.availableSpace() == 0);
 		QVERIFY(wnd.currentWindow() == 500);
 	}
+	
+	void testSelectiveAck()
+	{
+		bt::Uint8 wdata[1000];
+		memset(wdata,0,1000);
+		
+		utp::Header hdr;
+		utp::LocalWindow wnd(1000);
+		wnd.setLastSeqNr(0);
+		
+		// first write first and last packet
+		bt::Uint32 step = 200;
+		hdr.seq_nr = 1;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - step);
+		QVERIFY(wnd.currentWindow() == step);
+		hdr.seq_nr = 5;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 2*step);
+		QVERIFY(wnd.currentWindow() == 2*step);
+		QVERIFY(wnd.fill() == step);
+		
+		// Check SelectiveAck generation
+		QVERIFY(wnd.selectiveAckBits() == 3);
+		bt::Uint8 sack_data[6];
+		SelectiveAck* sack = (SelectiveAck*)sack_data;
+		sack->length = 4;
+		sack->extension = 0;
+		wnd.fillSelectiveAck(sack);
+		QVERIFY(sack_data[2] == 0x4);
+		QVERIFY(sack_data[3] == 0x0);
+		QVERIFY(sack_data[4] == 0x0);
+		QVERIFY(sack_data[5] == 0x0);
+		
+		// Now write 4
+		hdr.seq_nr = 4;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 3*step);
+		QVERIFY(wnd.currentWindow() == 3*step);
+		QVERIFY(wnd.fill() == step);
+		
+		// Check selective ack again
+		QVERIFY(wnd.selectiveAckBits() == 3);
+		sack->length = 4;
+		sack->extension = 0;
+		wnd.fillSelectiveAck(sack);
+		QVERIFY(sack_data[2] == 0x6);
+		QVERIFY(sack_data[3] == 0x0);
+		QVERIFY(sack_data[4] == 0x0);
+		QVERIFY(sack_data[5] == 0x0);
+		
+		// Now write 3
+		hdr.seq_nr = 3;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 4*step);
+		QVERIFY(wnd.currentWindow() == 4*step);
+		QVERIFY(wnd.fill() == step);
+		
+		// Check selective ack again
+		QVERIFY(wnd.selectiveAckBits() == 3);
+		sack->length = 4;
+		sack->extension = 0;
+		wnd.fillSelectiveAck(sack);
+		QVERIFY(sack_data[2] == 0x7);
+		QVERIFY(sack_data[3] == 0x0);
+		QVERIFY(sack_data[4] == 0x0);
+		QVERIFY(sack_data[5] == 0x0);
+		
+		// And then 2
+		hdr.seq_nr = 2;
+		QVERIFY(wnd.packetReceived(&hdr,wdata,step) == true);
+		QVERIFY(wnd.availableSpace() == wnd.capacity() - 5*step);
+		QVERIFY(wnd.currentWindow() == 5*step);
+		QVERIFY(wnd.fill() == 5*step);
+		// selective ack should now be unnecessary
+		QVERIFY(wnd.selectiveAckBits() == 0);
+	}
 
 private:
 	bt::Uint8 data[13];
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpprotocol.h #1078185:1078186
@@ -22,6 +22,7 @@
 #define UTP_UTPPROTOCOL_H
 
 #include <QtGlobal>
+#include <util/constants.h>
 
 namespace utp
 {
@@ -82,6 +83,9 @@
 		quint8 length;
 	};
 	
+	const bt::Uint8 SELECTIVE_ACK_ID = 1;
+	const bt::Uint8 EXTENSION_BITS_ID = 2;
+	
 	// type field values
 	const quint8 ST_DATA = 0;
 	const quint8 ST_FIN = 1;
@@ -100,6 +104,12 @@
 	};
 	
 	const quint32 MIN_PACKET_SIZE = 150;
+	
+	// Test if a bit is acked
+	bool Acked(const SelectiveAck* sack,bt::Uint16 bit);
+	
+	// Turn on a bit in the SelectiveAck
+	void Acked(SelectiveAck* sack,bt::Uint16 bit);
 }
 
 #endif // UTP_UTPPROTOCOL_H
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic