[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pidgin-commits
Subject:    /pidgin/main: 56d191003b34: facebook: Port from sslconn to Gio
From:       Mike Ruprecht <cmaiku () gmail ! com>
Date:       2016-09-30 14:52:02
Message-ID: hg.56d191003b34.1475247122.-874508059 () rock ! pidgin ! im
[Download RAW message or body]

Changeset: 56d191003b34845f5dbc27b82defa665b644fdd5
Author:	 Mike Ruprecht <cmaiku@gmail.com>
Date:	 2016-09-15 13:31 -0500
Branch:	 default
URL: https://hg.pidgin.im/pidgin/main/rev/56d191003b34

Description:

facebook: Port from sslconn to Gio

This patch ports usage of libpurple's sslconn.[ch] to native Gio.
There is potential to improve this by allowing for MqttMessages to
use external bytes without copying (provided they're consumed soon
enough) and to "steal" the bytes from a MqttMessage for sending.
Unfortunately GByteArray doesn't have all the memory management API
GBytes has.

diffstat:

 libpurple/protocols/facebook/facebook.c |    4 +-
 libpurple/protocols/facebook/mqtt.c     |  334 ++++++++++++++++++-------------
 libpurple/protocols/facebook/mqtt.h     |   17 -
 3 files changed, 198 insertions(+), 157 deletions(-)

diffs (truncated from 535 to 300 lines):

diff --git a/libpurple/protocols/facebook/facebook.c b/libpurple/protocols/facebook/facebook.c
--- a/libpurple/protocols/facebook/facebook.c
+++ b/libpurple/protocols/facebook/facebook.c
@@ -332,8 +332,8 @@ fb_cb_api_error(FbApi *api, GError *erro
 
 	gc = fb_data_get_connection(fata);
 
-	if (error->domain == FB_MQTT_SSL_ERROR) {
-		purple_connection_ssl_error(gc, error->code);
+	if (error->domain == G_IO_ERROR) {
+		purple_connection_g_error(gc, error);
 		return;
 	}
 
diff --git a/libpurple/protocols/facebook/mqtt.c b/libpurple/protocols/facebook/mqtt.c
--- a/libpurple/protocols/facebook/mqtt.c
+++ b/libpurple/protocols/facebook/mqtt.c
@@ -28,7 +28,8 @@
 #include "account.h"
 #include "eventloop.h"
 #include "glibcompat.h"
-#include "sslconn.h"
+#include "purple-gio.h"
+#include "queuedoutputstream.h"
 
 #include "marshal.h"
 #include "mqtt.h"
@@ -37,17 +38,17 @@
 struct _FbMqttPrivate
 {
 	PurpleConnection *gc;
-	PurpleSslConnection *gsc;
+	GIOStream *conn;
+	GBufferedInputStream *input;
+	PurpleQueuedOutputStream *output;
+	GCancellable *cancellable;
 	gboolean connected;
 	guint16 mid;
 
 	GByteArray *rbuf;
-	GByteArray *wbuf;
 	gsize remz;
 
 	gint tev;
-	gint rev;
-	gint wev;
 };
 
 struct _FbMqttMessagePrivate
@@ -65,6 +66,8 @@ struct _FbMqttMessagePrivate
 G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
 G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
 
+static void fb_mqtt_read_packet(FbMqtt *mqtt);
+
 static void
 fb_mqtt_dispose(GObject *obj)
 {
@@ -73,7 +76,6 @@ fb_mqtt_dispose(GObject *obj)
 
 	fb_mqtt_close(mqtt);
 	g_byte_array_free(priv->rbuf, TRUE);
-	g_byte_array_free(priv->wbuf, TRUE);
 }
 
 static void
@@ -161,7 +163,6 @@ fb_mqtt_init(FbMqtt *mqtt)
 	mqtt->priv = priv;
 
 	priv->rbuf = g_byte_array_new();
-	priv->wbuf = g_byte_array_new();
 }
 
 static void
@@ -205,18 +206,6 @@ fb_mqtt_error_quark(void)
 	return q;
 }
 
-GQuark
-fb_mqtt_ssl_error_quark(void)
-{
-	static GQuark q = 0;
-
-	if (G_UNLIKELY(q == 0)) {
-		q = g_quark_from_static_string("fb-mqtt-ssl-error-quark");
-	}
-
-	return q;
-}
-
 FbMqtt *
 fb_mqtt_new(PurpleConnection *gc)
 {
@@ -240,33 +229,47 @@ fb_mqtt_close(FbMqtt *mqtt)
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	priv = mqtt->priv;
 
-	if (priv->wev > 0) {
-		purple_input_remove(priv->wev);
-		priv->wev = 0;
-	}
-
-	if (priv->rev > 0) {
-		purple_input_remove(priv->rev);
-		priv->rev = 0;
-	}
-
 	if (priv->tev > 0) {
 		purple_timeout_remove(priv->tev);
 		priv->tev = 0;
 	}
 
-	if (priv->gsc != NULL) {
-		purple_ssl_close(priv->gsc);
-		priv->gsc = NULL;
+	if (priv->cancellable != NULL) {
+		g_cancellable_cancel(priv->cancellable);
+		g_clear_object(&priv->cancellable);
 	}
 
-	if (priv->wbuf->len > 0) {
-		fb_util_debug_warning("Closing with unwritten data");
+	if (priv->conn != NULL) {
+		purple_gio_graceful_close(priv->conn,
+				G_INPUT_STREAM(priv->input),
+				G_OUTPUT_STREAM(priv->output));
+		g_clear_object(&priv->input);
+		g_clear_object(&priv->output);
+		g_clear_object(&priv->conn);
 	}
 
 	priv->connected = FALSE;
 	g_byte_array_set_size(priv->rbuf, 0);
-	g_byte_array_set_size(priv->wbuf, 0);
+}
+
+static void
+fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
+{
+	if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+		/* Return as cancelled means the connection is closing */
+		g_error_free(err);
+		return;
+	}
+
+	/* Now we can check for programming errors */
+	g_return_if_fail(FB_IS_MQTT(mqtt));
+
+	if (prefix != NULL) {
+		g_prefix_error(&err, "%s: ", prefix);
+	}
+
+	g_signal_emit_by_name(mqtt, "error", err);
+	g_error_free(err);
 }
 
 void
@@ -344,74 +347,130 @@ fb_mqtt_ping(FbMqtt *mqtt)
 }
 
 static void
-fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
+{
+	GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
+	FbMqtt *mqtt = data;
+	gssize ret;
+	GError *err = NULL;
+
+	ret = g_buffered_input_stream_fill_finish(input, res, &err);
+
+	if (ret < 1) {
+		if (ret == 0) {
+			err = g_error_new_literal(G_IO_ERROR,
+					G_IO_ERROR_CONNECTION_CLOSED,
+					_("Connection closed"));
+		}
+
+		fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
+		return;
+	}
+
+	fb_mqtt_read_packet(mqtt);
+}
+
+static void
+fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
 {
 	FbMqtt *mqtt = data;
+	FbMqttPrivate *priv;
+	gssize ret;
 	FbMqttMessage *msg;
-	FbMqttPrivate *priv = mqtt->priv;
-	gint res;
-	guint mult;
-	guint8 buf[1024];
-	guint8 byte;
-	gsize size;
-	gssize rize;
+	GError *err = NULL;
 
-	if (priv->remz < 1) {
-		/* Reset the read buffer */
-		g_byte_array_set_size(priv->rbuf, 0);
+	ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
 
-		res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
-		g_byte_array_append(priv->rbuf, &byte, sizeof byte);
-
-		if (res != sizeof byte) {
-			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-			              _("Failed to read fixed header"));
-			return;
+	if (ret < 1) {
+		if (ret == 0) {
+			err = g_error_new_literal(G_IO_ERROR,
+					G_IO_ERROR_CONNECTION_CLOSED,
+					_("Connection closed"));
 		}
 
-		mult = 1;
-
-		do {
-			res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
-			g_byte_array_append(priv->rbuf, &byte, sizeof byte);
-
-			if (res != sizeof byte) {
-				fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-				              _("Failed to read packet size"));
-				return;
-			}
-
-			priv->remz += (byte & 127) * mult;
-			mult *= 128;
-		} while ((byte & 128) != 0);
+		fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
+		return;
 	}
 
+	priv = mqtt->priv;
+	priv->remz -= ret;
+
 	if (priv->remz > 0) {
-		size = MIN(priv->remz, sizeof buf);
-		rize = purple_ssl_read(priv->gsc, buf, size);
+		g_input_stream_read_async(G_INPUT_STREAM(source),
+				priv->rbuf->data +
+				priv->rbuf->len - priv->remz, priv->remz,
+				G_PRIORITY_DEFAULT, priv->cancellable,
+				fb_mqtt_cb_read_packet, mqtt);
+		return;
+	}
 
-		if (rize < 1) {
-			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-			              _("Failed to read packet data"));
-			return;
+	msg = fb_mqtt_message_new_bytes(priv->rbuf);
+
+	if (G_UNLIKELY(msg == NULL)) {
+		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
+		              _("Failed to parse message"));
+		return;
+	}
+
+	fb_mqtt_read(mqtt, msg);
+	g_object_unref(msg);
+
+	/* Read another packet if connection wasn't reset in fb_mqtt_read() */
+	if (fb_mqtt_connected(mqtt, FALSE)) {
+		fb_mqtt_read_packet(mqtt);
+	}
+}
+
+static void
+fb_mqtt_read_packet(FbMqtt *mqtt)
+{
+	FbMqttPrivate *priv = mqtt->priv;
+	const guint8 const *buf;
+	gsize count = 0;
+	gsize pos;
+	guint mult = 1;
+	guint8 byte;
+	gsize size = 0;
+
+	buf = g_buffered_input_stream_peek_buffer(priv->input, &count);
+
+	/* Start at 1 to skip the first byte */
+	pos = 1;
+
+	do {
+		if (pos >= count) {
+			/* Not enough data yet, try again later */
+			size = 0;
+			break;
 		}
 
-		g_byte_array_append(priv->rbuf, buf, rize);
-		priv->remz -= rize;
-	}

_______________________________________________
Commits mailing list
Commits@pidgin.im
https://pidgin.im/cgi-bin/mailman/listinfo/commits
[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic