From: Drew Fisher <drew.m.fisher@gmail.com>
Date: Thu, 17 Apr 2014 04:35:32 +0000 (-0700)
Subject: Length-coded framing for socket reads
X-Git-Url: http://git.zarvox.org/shortlog/static/%24c%5B2%5D?a=commitdiff_plain;h=ddfb23ddb57d31558937d531677ad86ecca58d11;p=imoo.git

Length-coded framing for socket reads

Messages go on the wire as:

struct message {
    uint32_t length;
    uint8_t data[length];
}

and are correctly decoded.
---

diff --git a/ribbon/ribbonserver.cpp b/ribbon/ribbonserver.cpp
index ec3d902..1c2ea15 100644
--- a/ribbon/ribbonserver.cpp
+++ b/ribbon/ribbonserver.cpp
@@ -36,14 +36,24 @@ void RibbonServer::onNewConnection()
 	}
 	QObject::connect(rsock, SIGNAL(disconnected(RibbonSocket*)),
 			this, SLOT(onDisconnection(RibbonSocket*)));
+	QObject::connect(rsock, SIGNAL(messageReceived(RibbonSocket*,QByteArray)),
+			this, SLOT(onMessageReceived(RibbonSocket*,QByteArray)));
 }
 
 void RibbonServer::onDisconnection(RibbonSocket* r)
 {
 	sockets.removeOne(r);
 	qDebug() << "Socket disconnected:" << r->s();
+	r->deleteLater();
 	if (!server->isListening()) {
 		server->listen(_addr, _port);
 		qDebug() << "Server listening on" << _addr << ":" << _port;
 	}
 }
+
+void RibbonServer::onMessageReceived(RibbonSocket* r, QByteArray buf)
+{
+	qDebug() << "server: got data from" << r->s()->peerAddress() << r->s()->peerPort();
+	qDebug() << "        message was:" << buf.constData();
+	// TODO: do something with the packet in buf.constData()
+}
diff --git a/ribbon/ribbonserver.h b/ribbon/ribbonserver.h
index f5fd8f2..cadb623 100644
--- a/ribbon/ribbonserver.h
+++ b/ribbon/ribbonserver.h
@@ -28,6 +28,7 @@ private:
 private slots:
 	void onNewConnection();
 	void onDisconnection(RibbonSocket* r);
+	void onMessageReceived(RibbonSocket* r, QByteArray buf);
 };
 
 #endif /* RIBBONSERVER_H */
diff --git a/ribbon/ribbonsocket.cpp b/ribbon/ribbonsocket.cpp
index 722cdd1..8b7e2e5 100644
--- a/ribbon/ribbonsocket.cpp
+++ b/ribbon/ribbonsocket.cpp
@@ -1,9 +1,12 @@
 #include "ribbonsocket.h"
+#include <QDebug>
 
-RibbonSocket::RibbonSocket(QTcpSocket* sock, QObject* parent) : QObject(parent), socket(sock)
+RibbonSocket::RibbonSocket(QTcpSocket* sock, QObject* parent) :
+	QObject(parent),
+	socket(sock),
+	state(WANT_LENGTH),
+	bytes_wanted(0)
 {
-	QObject::connect(socket, SIGNAL(connected()),
-			this, SLOT(onConnect()));
 	QObject::connect(socket, SIGNAL(disconnected()),
 			this, SLOT(onDisconnect()));
 	QObject::connect(socket, SIGNAL(readyRead()),
@@ -19,17 +22,66 @@ const QTcpSocket* RibbonSocket::s()
 	return socket;
 }
 
-void RibbonSocket::onConnect()
+void RibbonSocket::onDisconnect()
 {
-	emit connected(this);
+	emit disconnected(this);
 }
 
-void RibbonSocket::onDisconnect()
+/**
+ * Returns true if progress was made, false if no progress was made and we need
+ * more bytes from the socket
+ */
+bool RibbonSocket::tryProcess()
 {
-	emit disconnected(this);
+	switch(state) {
+		case WANT_LENGTH:
+			qDebug() << "Want length, buf size before read:" << buf.length();
+			if (readUntilBufHas(4)) {
+				// Slice 4 octets off as big-endian integer
+				QByteArray size_buf = buf.left(4);
+				buf = buf.mid(4);
+				QDataStream stream(size_buf);
+				stream >> bytes_wanted;
+				// Optimization: preallocate the right number of bytes
+				// TODO: decide on a maximum acceptable packet length, to avoid
+				// arbitrarily large packet buffer
+				buf.reserve(bytes_wanted);
+				qDebug() << "length:" << bytes_wanted;
+				state = WANT_DATA;
+				return true;
+			}
+			break;
+		case WANT_DATA:
+			qDebug() << "Want data, buf size before read:" << buf.length();
+			if (readUntilBufHas(bytes_wanted)) {
+				QByteArray data = buf.left(bytes_wanted);
+				buf = buf.mid(bytes_wanted);
+				// Memory optimization: release the rest of the buffer
+				buf.squeeze();
+				qDebug() << "data:" << buf;
+				emit messageReceived(this, data);
+				state = WANT_LENGTH;
+				return true;
+			}
+			break;
+	}
+	return false;
+}
+
+/**
+ * returns true if buf has enough bytes
+ * false otherwise
+ */
+bool RibbonSocket::readUntilBufHas(int desired_buffer_length)
+{
+	int size_to_read = desired_buffer_length - buf.length();
+	QByteArray tmp = socket->read(size_to_read);
+	qDebug() << "read" << tmp.length() << "from socket";
+	buf += tmp;
+	return buf.length() >= desired_buffer_length;
 }
 
 void RibbonSocket::onReadyRead()
 {
-	emit readyRead(this);
+	while (tryProcess()) {}
 }
diff --git a/ribbon/ribbonsocket.h b/ribbon/ribbonsocket.h
index 32327ae..e72ab6c 100644
--- a/ribbon/ribbonsocket.h
+++ b/ribbon/ribbonsocket.h
@@ -1,23 +1,32 @@
 #ifndef RIBBONSOCKET_H
 #define RIBBONSOCKET_H
 
+#include <QtCore/QByteArray>
 #include <QtNetwork/QTcpSocket>
 
 class RibbonSocket : public QObject {
 	Q_OBJECT
+	enum RibbonSocketState {
+		WANT_LENGTH = 0,
+		WANT_DATA = 1,
+	};
+
 public:
 	RibbonSocket(QTcpSocket* socket, QObject* parent=0);
 	// Consider s to be final.  Don't modify it.
 	const QTcpSocket* s();
 signals:
-	void connected(RibbonSocket* s);
 	void disconnected(RibbonSocket* s);
-	void readyRead(RibbonSocket* s);
+	void messageReceived(RibbonSocket* s, QByteArray buf);
 private:
 	virtual ~RibbonSocket();
 	QTcpSocket* socket;
+	QByteArray buf;
+	RibbonSocketState state;
+	quint32 bytes_wanted;
+	bool tryProcess();
+	bool readUntilBufHas(int desired_buffer_length);
 private slots:
-	void onConnect();
 	void onDisconnect();
 	void onReadyRead();
 };