[prev in list] [next in list] [prev in thread] [next in thread]
List: pidgin-commits
Subject: /pidgin/main: 56d191003b34: facebook: Port from sslconn to Gio
From: Mike Ruprecht <cmaiku () gmail ! com>
Date: 2016-09-30 14:52:02
Message-ID: hg.56d191003b34.1475247122.-874508059 () rock ! pidgin ! im
[Download RAW message or body]
Changeset: 56d191003b34845f5dbc27b82defa665b644fdd5
Author: Mike Ruprecht <cmaiku@gmail.com>
Date: 2016-09-15 13:31 -0500
Branch: default
URL: https://hg.pidgin.im/pidgin/main/rev/56d191003b34
Description:
facebook: Port from sslconn to Gio
This patch ports usage of libpurple's sslconn.[ch] to native Gio.
There is potential to improve this by allowing for MqttMessages to
use external bytes without copying (provided they're consumed soon
enough) and to "steal" the bytes from a MqttMessage for sending.
Unfortunately GByteArray doesn't have all the memory management API
GBytes has.
diffstat:
libpurple/protocols/facebook/facebook.c | 4 +-
libpurple/protocols/facebook/mqtt.c | 334 ++++++++++++++++++-------------
libpurple/protocols/facebook/mqtt.h | 17 -
3 files changed, 198 insertions(+), 157 deletions(-)
diffs (truncated from 535 to 300 lines):
diff --git a/libpurple/protocols/facebook/facebook.c b/libpurple/protocols/facebook/facebook.c
--- a/libpurple/protocols/facebook/facebook.c
+++ b/libpurple/protocols/facebook/facebook.c
@@ -332,8 +332,8 @@ fb_cb_api_error(FbApi *api, GError *erro
gc = fb_data_get_connection(fata);
- if (error->domain == FB_MQTT_SSL_ERROR) {
- purple_connection_ssl_error(gc, error->code);
+ if (error->domain == G_IO_ERROR) {
+ purple_connection_g_error(gc, error);
return;
}
diff --git a/libpurple/protocols/facebook/mqtt.c b/libpurple/protocols/facebook/mqtt.c
--- a/libpurple/protocols/facebook/mqtt.c
+++ b/libpurple/protocols/facebook/mqtt.c
@@ -28,7 +28,8 @@
#include "account.h"
#include "eventloop.h"
#include "glibcompat.h"
-#include "sslconn.h"
+#include "purple-gio.h"
+#include "queuedoutputstream.h"
#include "marshal.h"
#include "mqtt.h"
@@ -37,17 +38,17 @@
struct _FbMqttPrivate
{
PurpleConnection *gc;
- PurpleSslConnection *gsc;
+ GIOStream *conn;
+ GBufferedInputStream *input;
+ PurpleQueuedOutputStream *output;
+ GCancellable *cancellable;
gboolean connected;
guint16 mid;
GByteArray *rbuf;
- GByteArray *wbuf;
gsize remz;
gint tev;
- gint rev;
- gint wev;
};
struct _FbMqttMessagePrivate
@@ -65,6 +66,8 @@ struct _FbMqttMessagePrivate
G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
+static void fb_mqtt_read_packet(FbMqtt *mqtt);
+
static void
fb_mqtt_dispose(GObject *obj)
{
@@ -73,7 +76,6 @@ fb_mqtt_dispose(GObject *obj)
fb_mqtt_close(mqtt);
g_byte_array_free(priv->rbuf, TRUE);
- g_byte_array_free(priv->wbuf, TRUE);
}
static void
@@ -161,7 +163,6 @@ fb_mqtt_init(FbMqtt *mqtt)
mqtt->priv = priv;
priv->rbuf = g_byte_array_new();
- priv->wbuf = g_byte_array_new();
}
static void
@@ -205,18 +206,6 @@ fb_mqtt_error_quark(void)
return q;
}
-GQuark
-fb_mqtt_ssl_error_quark(void)
-{
- static GQuark q = 0;
-
- if (G_UNLIKELY(q == 0)) {
- q = g_quark_from_static_string("fb-mqtt-ssl-error-quark");
- }
-
- return q;
-}
-
FbMqtt *
fb_mqtt_new(PurpleConnection *gc)
{
@@ -240,33 +229,47 @@ fb_mqtt_close(FbMqtt *mqtt)
g_return_if_fail(FB_IS_MQTT(mqtt));
priv = mqtt->priv;
- if (priv->wev > 0) {
- purple_input_remove(priv->wev);
- priv->wev = 0;
- }
-
- if (priv->rev > 0) {
- purple_input_remove(priv->rev);
- priv->rev = 0;
- }
-
if (priv->tev > 0) {
purple_timeout_remove(priv->tev);
priv->tev = 0;
}
- if (priv->gsc != NULL) {
- purple_ssl_close(priv->gsc);
- priv->gsc = NULL;
+ if (priv->cancellable != NULL) {
+ g_cancellable_cancel(priv->cancellable);
+ g_clear_object(&priv->cancellable);
}
- if (priv->wbuf->len > 0) {
- fb_util_debug_warning("Closing with unwritten data");
+ if (priv->conn != NULL) {
+ purple_gio_graceful_close(priv->conn,
+ G_INPUT_STREAM(priv->input),
+ G_OUTPUT_STREAM(priv->output));
+ g_clear_object(&priv->input);
+ g_clear_object(&priv->output);
+ g_clear_object(&priv->conn);
}
priv->connected = FALSE;
g_byte_array_set_size(priv->rbuf, 0);
- g_byte_array_set_size(priv->wbuf, 0);
+}
+
+static void
+fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
+{
+ if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ /* Return as cancelled means the connection is closing */
+ g_error_free(err);
+ return;
+ }
+
+ /* Now we can check for programming errors */
+ g_return_if_fail(FB_IS_MQTT(mqtt));
+
+ if (prefix != NULL) {
+ g_prefix_error(&err, "%s: ", prefix);
+ }
+
+ g_signal_emit_by_name(mqtt, "error", err);
+ g_error_free(err);
}
void
@@ -344,74 +347,130 @@ fb_mqtt_ping(FbMqtt *mqtt)
}
static void
-fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
+{
+ GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
+ FbMqtt *mqtt = data;
+ gssize ret;
+ GError *err = NULL;
+
+ ret = g_buffered_input_stream_fill_finish(input, res, &err);
+
+ if (ret < 1) {
+ if (ret == 0) {
+ err = g_error_new_literal(G_IO_ERROR,
+ G_IO_ERROR_CONNECTION_CLOSED,
+ _("Connection closed"));
+ }
+
+ fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
+ return;
+ }
+
+ fb_mqtt_read_packet(mqtt);
+}
+
+static void
+fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
{
FbMqtt *mqtt = data;
+ FbMqttPrivate *priv;
+ gssize ret;
FbMqttMessage *msg;
- FbMqttPrivate *priv = mqtt->priv;
- gint res;
- guint mult;
- guint8 buf[1024];
- guint8 byte;
- gsize size;
- gssize rize;
+ GError *err = NULL;
- if (priv->remz < 1) {
- /* Reset the read buffer */
- g_byte_array_set_size(priv->rbuf, 0);
+ ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
- res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
- g_byte_array_append(priv->rbuf, &byte, sizeof byte);
-
- if (res != sizeof byte) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read fixed header"));
- return;
+ if (ret < 1) {
+ if (ret == 0) {
+ err = g_error_new_literal(G_IO_ERROR,
+ G_IO_ERROR_CONNECTION_CLOSED,
+ _("Connection closed"));
}
- mult = 1;
-
- do {
- res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
- g_byte_array_append(priv->rbuf, &byte, sizeof byte);
-
- if (res != sizeof byte) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read packet size"));
- return;
- }
-
- priv->remz += (byte & 127) * mult;
- mult *= 128;
- } while ((byte & 128) != 0);
+ fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
+ return;
}
+ priv = mqtt->priv;
+ priv->remz -= ret;
+
if (priv->remz > 0) {
- size = MIN(priv->remz, sizeof buf);
- rize = purple_ssl_read(priv->gsc, buf, size);
+ g_input_stream_read_async(G_INPUT_STREAM(source),
+ priv->rbuf->data +
+ priv->rbuf->len - priv->remz, priv->remz,
+ G_PRIORITY_DEFAULT, priv->cancellable,
+ fb_mqtt_cb_read_packet, mqtt);
+ return;
+ }
- if (rize < 1) {
- fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
- _("Failed to read packet data"));
- return;
+ msg = fb_mqtt_message_new_bytes(priv->rbuf);
+
+ if (G_UNLIKELY(msg == NULL)) {
+ fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
+ _("Failed to parse message"));
+ return;
+ }
+
+ fb_mqtt_read(mqtt, msg);
+ g_object_unref(msg);
+
+ /* Read another packet if connection wasn't reset in fb_mqtt_read() */
+ if (fb_mqtt_connected(mqtt, FALSE)) {
+ fb_mqtt_read_packet(mqtt);
+ }
+}
+
+static void
+fb_mqtt_read_packet(FbMqtt *mqtt)
+{
+ FbMqttPrivate *priv = mqtt->priv;
+ const guint8 const *buf;
+ gsize count = 0;
+ gsize pos;
+ guint mult = 1;
+ guint8 byte;
+ gsize size = 0;
+
+ buf = g_buffered_input_stream_peek_buffer(priv->input, &count);
+
+ /* Start at 1 to skip the first byte */
+ pos = 1;
+
+ do {
+ if (pos >= count) {
+ /* Not enough data yet, try again later */
+ size = 0;
+ break;
}
- g_byte_array_append(priv->rbuf, buf, rize);
- priv->remz -= rize;
- }
_______________________________________________
Commits mailing list
Commits@pidgin.im
https://pidgin.im/cgi-bin/mailman/listinfo/commits
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic