[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    extragear/network/ktorrent/libbtcore
From:       Joris Guisson <joris.guisson () gmail ! com>
Date:       2010-01-20 17:37:14
Message-ID: 1264009034.661584.11727.nullmailer () svn ! kde ! org
[Download RAW message or body]

SVN commit 1077693 by guisson:

Further work on UTP:
- Add RTT calculations
- Add output_buffer and use that to store the data before sending
- Send packets according a packet size

 M  +1 -0      CMakeLists.txt  
 M  +11 -11    util/circularbuffer.cpp  
 M  +9 -3      util/circularbuffer.h  
 M  +1 -0      utp/CMakeLists.txt  
 M  +57 -48    utp/connection.cpp  
 M  +16 -5     utp/connection.h  
 M  +1 -5      utp/localwindow.h  
 M  +22 -7     utp/remotewindow.cpp  
 M  +16 -3     utp/remotewindow.h  
 A             utp/timevalue.cpp   [License: GPL (v2+)]
 A             utp/timevalue.h   [License: GPL (v2+)]
 M  +2 -0      utp/utpprotocol.h  
 M  +1 -5      utp/utpsocket.cpp  


--- trunk/extragear/network/ktorrent/libbtcore/CMakeLists.txt #1077692:1077693
@@ -200,6 +200,7 @@
 	utp/localwindow.cpp
 	utp/remotewindow.cpp
 	utp/utpsocket.cpp
+	utp/timevalue.cpp
 	
 	btversion.cpp
 )
--- trunk/extragear/network/ktorrent/libbtcore/util/circularbuffer.cpp #1077692:1077693
@@ -25,9 +25,9 @@
 namespace bt
 {
 	
-	CircularBuffer::CircularBuffer(Uint32 cap) : window(0),capacity(cap),start(0),size(0)
+	CircularBuffer::CircularBuffer(Uint32 cap) : window(0),buffer_capacity(cap),start(0),size(0)
 	{
-		window = new bt::Uint8[capacity];
+		window = new bt::Uint8[buffer_capacity];
 	}
 
 	CircularBuffer::~CircularBuffer()
@@ -41,7 +41,7 @@
 			return 0;
 		
 		bt::Uint32 to_read = size < max_len ? size : max_len;
-		if (start + to_read < capacity)
+		if (start + to_read < buffer_capacity)
 		{
 			// we are not going past the end of the data
 			memcpy(data,window + start,to_read);
@@ -52,8 +52,8 @@
 		else
 		{
 			// read until the end of the window
-			memcpy(data,window + start,capacity - start);
-			bt::Uint32 ar = capacity - start;
+			memcpy(data,window + start,buffer_capacity - start);
+			bt::Uint32 ar = buffer_capacity - start;
 			if (to_read > ar) // read the rest
 				memcpy(data + ar,window,to_read - ar);
 			
@@ -65,13 +65,13 @@
 	
 	bt::Uint32 CircularBuffer::write(const bt::Uint8* data, bt::Uint32 len)
 	{
-		if (size == capacity)
+		if (size == buffer_capacity)
 			return 0;
 		
-		bt::Uint32 free_space = capacity - size;
+		bt::Uint32 free_space = buffer_capacity - size;
 		bt::Uint32 to_write = free_space < len ? free_space : len;
-		bt::Uint32 off = (start + size) % capacity;
-		if (off + to_write < capacity)
+		bt::Uint32 off = (start + size) % buffer_capacity;
+		if (off + to_write < buffer_capacity)
 		{
 			// everything will go in one go
 			memcpy(window + off,data,to_write);
@@ -80,8 +80,8 @@
 		}
 		else
 		{
-			memcpy(window + off,data,capacity - off);
-			bt::Uint32 aw = capacity - off;
+			memcpy(window + off,data,buffer_capacity - off);
+			bt::Uint32 aw = buffer_capacity - off;
 			if (to_write > aw)
 				memcpy(window,data + aw,to_write - aw);
 			
--- trunk/extragear/network/ktorrent/libbtcore/util/circularbuffer.h #1077692:1077693
@@ -26,14 +26,14 @@
 
 namespace bt
 {
-
+	
 	/**
 		Circular buffer class
 	*/
 	class BTCORE_EXPORT CircularBuffer
 	{
 	public:
-		CircularBuffer(bt::Uint32 cap);
+		CircularBuffer(bt::Uint32 cap = 64 * 1024);
 		virtual ~CircularBuffer();
 		
 		/**
@@ -52,9 +52,15 @@
 		*/
 		bt::Uint32 write(const bt::Uint8* data,bt::Uint32 len);
 		
+		/// Get the buffer capacity
+		bt::Uint32 capacity() const {return buffer_capacity;}
+		
+		/// Get how much is used
+		bt::Uint32 fill() const {return size;}
+		
 	protected:
 		bt::Uint8* window;
-		bt::Uint32 capacity;
+		bt::Uint32 buffer_capacity;
 		bt::Uint32 start;
 		bt::Uint32 size;
 	};
--- trunk/extragear/network/ktorrent/libbtcore/utp/CMakeLists.txt #1077692:1077693
@@ -5,6 +5,7 @@
 	utpsocket.h
 	connection.h
 	localwindow.h
+	timevalue.h
 )
 
 install(FILES ${utp_HDR} DESTINATION ${INCLUDE_INSTALL_DIR}/libbtcore/utp COMPONENT Devel)
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.cpp #1077692:1077693
@@ -36,6 +36,11 @@
 		eof_seq_nr = -1;
 		local_wnd = new LocalWindow();
 		remote_wnd = new RemoteWindow();
+		rtt = 100;
+		rtt_var = 0;
+		last_ack_nr = 0;
+		timeout = 1000;
+		packet_size = 1400;
 		if (type == OUTGOING)
 		{
 			send_connection_id = recv_connection_id + 1;
@@ -80,8 +85,9 @@
 		DumpHeader(*hdr);
 		
 		updateDelayMeasurement(hdr);
-		remote_wnd->packetReceived(hdr);
+		remote_wnd->packetReceived(hdr,this);
 		ack_nr = hdr->seq_nr;
+		last_ack_nr = hdr->ack_nr;
 		switch (state)
 		{
 			case CS_SYN_SENT:
@@ -121,8 +127,8 @@
 					local_wnd->write((const bt::Uint8*)packet.data() + data_off,s);
 					data_ready.wakeAll();
 					
-					// send back an ACK
-					sendState();
+					// send back an ACK 
+					sendStateOrData();
 				}
 				else if (hdr->type == ST_STATE)
 				{
@@ -163,7 +169,17 @@
 		
 		return state;
 	}
+	
+	
+	void Connection::updateRTT(const utp::Header* hdr,bt::Uint32 packet_rtt)
+	{
+		int delta = rtt - packet_rtt;
+		rtt_var += (qAbs(delta) - rtt_var) / 4;
+		rtt += (packet_rtt - rtt) / 8;
+		timeout = qMin(rtt + rtt_var * 4, (bt::Uint32)500);
+	}
 
+
 	void Connection::sendSYN()
 	{
 		seq_nr = 1;
@@ -185,7 +201,6 @@
 		hdr->seq_nr = seq_nr;
 		hdr->ack_nr = ack_nr;
 		
-		remote_wnd->addPacket(ba);
 		srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
 	}
 	
@@ -206,7 +221,6 @@
 		hdr->seq_nr = seq_nr;
 		hdr->ack_nr = ack_nr;
 		
-		remote_wnd->addPacket(ba);
 		srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
 	}
 
@@ -227,7 +241,6 @@
 		hdr->seq_nr = seq_nr;
 		hdr->ack_nr = ack_nr;
 		
-		remote_wnd->addPacket(ba);
 		srv->sendTo((const bt::Uint8*)ba.data(),ba.size(),remote);
 	}
 
@@ -289,68 +302,64 @@
 		return data_off;
 	}
 
-	bool Connection::send(const QByteArray & packet)
+	int Connection::send(const bt::Uint8* data, Uint32 len)
 	{
 		QMutexLocker lock(&mutex);
-		if (state != CS_CONNECTED || !remote_wnd->allowedToSend(packet.size()))
-			return false;
+		if (state != CS_CONNECTED)
+			return -1;
 		
-		struct timeval tv;
-		gettimeofday(&tv,NULL);
-		
-		QByteArray ba(sizeof(Header) + packet.size(),0);
-		Header* hdr = (Header*)ba.data();
-		hdr->version = 1;
-		hdr->type = ST_DATA;
-		hdr->extension = 0;
-		hdr->connection_id = send_connection_id;
-		hdr->timestamp_microseconds = tv.tv_usec;
-		hdr->timestamp_difference_microseconds = reply_micro;
-		hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
-		hdr->seq_nr = seq_nr + 1;
-		hdr->ack_nr = ack_nr;
-		
-		memcpy(ba.data() + sizeof(Header),packet.data(),packet.size());
-		if (!srv->sendTo(ba,remote))
-			return false;
-		
-		seq_nr++;
-		remote_wnd->addPacket(packet);
-		return true;
+		// first put data in the output buffer then send packets
+		bt::Uint32 ret = output_buffer.write(data,len);
+		sendPackets();
+		return ret;
 	}
 	
-	Uint32 Connection::send(const bt::Uint8* data, Uint32 len)
+	void Connection::sendPackets()
 	{
-		QMutexLocker lock(&mutex);
-		if (state != CS_CONNECTED)
-			return 0;
+		// chop output_buffer data in packets and keep sending
+		// until we are no longer allowed or the buffer is empty
+		while (output_buffer.fill() > 0 && remote_wnd->availableSpace() > 0)
+		{
+			bt::Uint32 to_read = qMin(output_buffer.fill(),remote_wnd->availableSpace());
+			to_read = qMin(to_read,packet_size);
+			
+			QByteArray packet(to_read,0);
+			output_buffer.read((bt::Uint8*)packet.data(),to_read);
+			doSend(packet);
+		}
+	}
+
+	void Connection::sendStateOrData()
+	{
+		if (output_buffer.fill() > 0 && remote_wnd->availableSpace() > 0)
+			sendPackets();
+		else
+			sendState();
+	}
+
+	int Connection::doSend(const QByteArray& packet)
+	{
+		bt::Uint32 to_send = packet.size();
+		TimeValue now;
 		
-		bt::Uint32 space = remote_wnd->availableSpace();
-		if (space == 0)
-			return 0;
-		
-		bt::Uint32 to_send = space <= len ? space : len;
-		struct timeval tv;
-		gettimeofday(&tv,NULL);
-		
-		QByteArray ba(sizeof(Header) + to_send,0);
+		QByteArray ba(sizeof(Header) + packet.size(),0);
 		Header* hdr = (Header*)ba.data();
 		hdr->version = 1;
 		hdr->type = ST_DATA;
 		hdr->extension = 0;
 		hdr->connection_id = send_connection_id;
-		hdr->timestamp_microseconds = tv.tv_usec;
+		hdr->timestamp_microseconds = now.microseconds;
 		hdr->timestamp_difference_microseconds = reply_micro;
 		hdr->wnd_size = local_wnd->maxWindow() - local_wnd->currentWindow();
 		hdr->seq_nr = seq_nr + 1;
 		hdr->ack_nr = ack_nr;
 		
-		memcpy(ba.data() + sizeof(Header),data,to_send);
+		memcpy(ba.data() + sizeof(Header),packet.data(),to_send);
 		if (!srv->sendTo(ba,remote))
-			return 0;
+			return -1;
 		
 		seq_nr++;
-		remote_wnd->addPacket(QByteArray((const char*)data,to_send));
+		remote_wnd->addPacket(packet,seq_nr,now);
 		return to_send;
 	}
 
--- trunk/extragear/network/ktorrent/libbtcore/utp/connection.h #1077692:1077693
@@ -26,6 +26,7 @@
 #include <btcore_export.h>
 #include <net/address.h>
 #include "utpprotocol.h"
+#include <util/circularbuffer.h>
 
 
 
@@ -58,12 +59,9 @@
 		/// Get the receive connection id
 		bt::Uint16 receiveConnectionID() const {return recv_connection_id;}
 		
-		/// Send a packet, will return false if there is no room in the remote window
-		bool send(const QByteArray & packet);
+		/// Send some data, returns the amount of bytes sent (or -1 on error)
+		int send(const bt::Uint8* data,bt::Uint32 len);
 		
-		/// Send some data, returns the amount of bytes sent
-		bt::Uint32 send(const bt::Uint8* data,bt::Uint32 len);
-		
 		/// Read available data from local window, returns the amount of bytes read
 		bt::Uint32 recv(bt::Uint8* buf,bt::Uint32 max_len);
 		
@@ -88,12 +86,18 @@
 		/// Close the socket
 		void close();
 		
+		/// Update the RTT time
+		void updateRTT(const Header* hdr,bt::Uint32 packet_rtt);
+		
 	private:
 		void sendSYN();
 		void waitForSYN();
 		void sendState();
 		void sendFIN();
 		void updateDelayMeasurement(const Header* hdr);
+		void sendStateOrData();
+		int doSend(const QByteArray & packet);
+		void sendPackets();
 		
 		/** 
 			Parses the packet, and retrieves pointer to the header, the SelectiveAck extension (if present)
@@ -116,11 +120,18 @@
 		bt::Uint16 recv_connection_id;
 		LocalWindow* local_wnd;
 		RemoteWindow* remote_wnd;
+		bt::CircularBuffer output_buffer;
 		
 		bt::Uint16 seq_nr;
 		bt::Uint16 ack_nr;
+		bt::Uint16 last_ack_nr;
 		int eof_seq_nr;
+		bt::Uint32 timeout;
 		
+		bt::Uint32 rtt;
+		bt::Uint32 rtt_var;
+		bt::Uint32 packet_size;
+		
 		mutable QMutex mutex;
 		QWaitCondition connected;
 		QWaitCondition data_ready;
--- trunk/extragear/network/ktorrent/libbtcore/utp/localwindow.h #1077692:1077693
@@ -40,12 +40,8 @@
 		LocalWindow(bt::Uint32 cap = DEFAULT_CAPACITY);
 		virtual ~LocalWindow();
 		
-		bt::Uint32 maxWindow() const {return capacity;}
+		bt::Uint32 maxWindow() const {return buffer_capacity;}
 		bt::Uint32 currentWindow() const {return size;}
-		
-		
-		
-	
 	};
 
 }
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.cpp #1077692:1077693
@@ -20,10 +20,21 @@
 
 #include "remotewindow.h"
 #include "utpprotocol.h"
+#include "connection.h"
 
 namespace utp
 {
 	
+	UnackedPacket::UnackedPacket(const QByteArray& data, bt::Uint16 seq_nr, const TimeValue& send_time) 
+		: data(data),seq_nr(seq_nr),send_time(send_time)
+	{
+	}
+
+	UnackedPacket::~UnackedPacket()
+	{
+	}
+
+	
 	RemoteWindow::RemoteWindow() : cur_window(0),max_window(64 * 1024),wnd_size(0)
 	{
 
@@ -31,20 +42,24 @@
 
 	RemoteWindow::~RemoteWindow()
 	{
-
+		qDeleteAll(unacked_packets);
 	}
 
-	void RemoteWindow::packetReceived(const utp::Header* hdr)
+	void RemoteWindow::packetReceived(const utp::Header* hdr,Connection* conn)
 	{
 		wnd_size = hdr->wnd_size;
 		
+		TimeValue now;
 		// everything up until the ack_nr in the header should now have been acked
-		QList<QByteArray>::iterator i = unacked_packets.begin();
+		QList<UnackedPacket*>::iterator i = unacked_packets.begin();
 		while (i != unacked_packets.end())
 		{
-			if (((utp::Header*)i->data())->seq_nr <= hdr->ack_nr)
+			UnackedPacket* up = *i;
+			if (up->seq_nr <= hdr->ack_nr)
 			{
-				cur_window -= i->size();
+				conn->updateRTT(hdr,now - up->send_time);
+				cur_window -= up->data.size();
+				delete up;
 				i = unacked_packets.erase(i);
 			}
 			else
@@ -52,10 +67,10 @@
 		}
 	}
 
-	void RemoteWindow::addPacket(const QByteArray& data)
+	void RemoteWindow::addPacket(const QByteArray& data,bt::Uint16 seq_nr,const TimeValue & send_time)
 	{
 		cur_window += data.size();
-		unacked_packets.append(data);
+		unacked_packets.append(new UnackedPacket(data,seq_nr,send_time));
 	}
 
 }
--- trunk/extragear/network/ktorrent/libbtcore/utp/remotewindow.h #1077692:1077693
@@ -26,10 +26,23 @@
 #include <QMutex>
 #include <btcore_export.h>
 #include <util/constants.h>
+#include <utp/timevalue.h>
 
 namespace utp
 {
+	class Connection;
 	struct Header;
+	
+	struct UnackedPacket
+	{
+		UnackedPacket(const QByteArray & data,bt::Uint16 seq_nr,const TimeValue & send_time);
+		~UnackedPacket();
+					  
+		QByteArray data;
+		bt::Uint16 seq_nr;
+		TimeValue send_time;
+	};
+	
 	/**
 		Keeps track of the remote sides window including all packets inflight.
 	*/
@@ -40,10 +53,10 @@
 		virtual ~RemoteWindow();
 		
 		/// A packet was received (update window size and check for acks)
-		void packetReceived(const Header* hdr);
+		void packetReceived(const Header* hdr,Connection* conn);
 		
 		/// Add a packet to the remote window (should include headers)
-		void addPacket(const QByteArray & data);
+		void addPacket(const QByteArray & data,bt::Uint16 seq_nr,const TimeValue & send_time);
 		
 		/// Are we allowed to send
 		bool allowedToSend(bt::Uint32 packet_size) const
@@ -68,7 +81,7 @@
 		bt::Uint32 cur_window;
 		bt::Uint32 max_window;
 		bt::Uint32 wnd_size; // advertised window size from the other side
-		QList<QByteArray> unacked_packets;
+		QList<UnackedPacket*> unacked_packets;
 	};
 
 }
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpprotocol.h #1077692:1077693
@@ -98,6 +98,8 @@
 		CS_FINISHED,
 		CS_CLOSED
 	};
+	
+	const quint32 MIN_PACKET_SIZE = 150;
 }
 
 #endif // UTP_UTPPROTOCOL_H
--- trunk/extragear/network/ktorrent/libbtcore/utp/utpsocket.cpp #1077692:1077693
@@ -129,11 +129,7 @@
 		if (!conn)
 			return -1;
 		
-		bt::Uint32 ret = conn->send(buf,len);
-		if (!ret)
-			return -1;
-		
-		return ret;
+		return conn->send(buf,len);
 	}
 
 	void UTPSocket::setBlocking(bool on)
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic