]> git.zarvox.org Git - imoo.git/commitdiff
Length-coded framing for socket reads
authorDrew Fisher <drew.m.fisher@gmail.com>
Thu, 17 Apr 2014 04:35:32 +0000 (21:35 -0700)
committerDrew Fisher <drew.m.fisher@gmail.com>
Thu, 17 Apr 2014 04:42:30 +0000 (21:42 -0700)
Messages go on the wire as:

struct message {
    uint32_t length;
    uint8_t data[length];
}

and are correctly decoded.

ribbon/ribbonserver.cpp
ribbon/ribbonserver.h
ribbon/ribbonsocket.cpp
ribbon/ribbonsocket.h

index ec3d90264386a7a06b09d9d12be8ef3fb31e47b5..1c2ea15053abcb271deba81005d2bc96d8b5077f 100644 (file)
@@ -36,14 +36,24 @@ void RibbonServer::onNewConnection()
        }
        QObject::connect(rsock, SIGNAL(disconnected(RibbonSocket*)),
                        this, SLOT(onDisconnection(RibbonSocket*)));
+       QObject::connect(rsock, SIGNAL(messageReceived(RibbonSocket*,QByteArray)),
+                       this, SLOT(onMessageReceived(RibbonSocket*,QByteArray)));
 }
 
 void RibbonServer::onDisconnection(RibbonSocket* r)
 {
        sockets.removeOne(r);
        qDebug() << "Socket disconnected:" << r->s();
+       r->deleteLater();
        if (!server->isListening()) {
                server->listen(_addr, _port);
                qDebug() << "Server listening on" << _addr << ":" << _port;
        }
 }
+
+void RibbonServer::onMessageReceived(RibbonSocket* r, QByteArray buf)
+{
+       qDebug() << "server: got data from" << r->s()->peerAddress() << r->s()->peerPort();
+       qDebug() << "        message was:" << buf.constData();
+       // TODO: do something with the packet in buf.constData()
+}
index f5fd8f25930349f464c3b8ec2ceb2059d90ab27c..cadb6238d09a26558ee576d07739e33d93a69aa6 100644 (file)
@@ -28,6 +28,7 @@ private:
 private slots:
        void onNewConnection();
        void onDisconnection(RibbonSocket* r);
+       void onMessageReceived(RibbonSocket* r, QByteArray buf);
 };
 
 #endif /* RIBBONSERVER_H */
index 722cdd1f9409a504a38288d791115edaa81f46a0..8b7e2e5b6b0a51b1cb0f3d51fc156cfc4f60a7e8 100644 (file)
@@ -1,9 +1,12 @@
 #include "ribbonsocket.h"
+#include <QDebug>
 
-RibbonSocket::RibbonSocket(QTcpSocket* sock, QObject* parent) : QObject(parent), socket(sock)
+RibbonSocket::RibbonSocket(QTcpSocket* sock, QObject* parent) :
+       QObject(parent),
+       socket(sock),
+       state(WANT_LENGTH),
+       bytes_wanted(0)
 {
-       QObject::connect(socket, SIGNAL(connected()),
-                       this, SLOT(onConnect()));
        QObject::connect(socket, SIGNAL(disconnected()),
                        this, SLOT(onDisconnect()));
        QObject::connect(socket, SIGNAL(readyRead()),
@@ -19,17 +22,66 @@ const QTcpSocket* RibbonSocket::s()
        return socket;
 }
 
-void RibbonSocket::onConnect()
+void RibbonSocket::onDisconnect()
 {
-       emit connected(this);
+       emit disconnected(this);
 }
 
-void RibbonSocket::onDisconnect()
+/**
+ * Returns true if progress was made, false if no progress was made and we need
+ * more bytes from the socket
+ */
+bool RibbonSocket::tryProcess()
 {
-       emit disconnected(this);
+       switch(state) {
+               case WANT_LENGTH:
+                       qDebug() << "Want length, buf size before read:" << buf.length();
+                       if (readUntilBufHas(4)) {
+                               // Slice 4 octets off as big-endian integer
+                               QByteArray size_buf = buf.left(4);
+                               buf = buf.mid(4);
+                               QDataStream stream(size_buf);
+                               stream >> bytes_wanted;
+                               // Optimization: preallocate the right number of bytes
+                               // TODO: decide on a maximum acceptable packet length, to avoid
+                               // arbitrarily large packet buffer
+                               buf.reserve(bytes_wanted);
+                               qDebug() << "length:" << bytes_wanted;
+                               state = WANT_DATA;
+                               return true;
+                       }
+                       break;
+               case WANT_DATA:
+                       qDebug() << "Want data, buf size before read:" << buf.length();
+                       if (readUntilBufHas(bytes_wanted)) {
+                               QByteArray data = buf.left(bytes_wanted);
+                               buf = buf.mid(bytes_wanted);
+                               // Memory optimization: release the rest of the buffer
+                               buf.squeeze();
+                               qDebug() << "data:" << buf;
+                               emit messageReceived(this, data);
+                               state = WANT_LENGTH;
+                               return true;
+                       }
+                       break;
+       }
+       return false;
+}
+
+/**
+ * returns true if buf has enough bytes
+ * false otherwise
+ */
+bool RibbonSocket::readUntilBufHas(int desired_buffer_length)
+{
+       int size_to_read = desired_buffer_length - buf.length();
+       QByteArray tmp = socket->read(size_to_read);
+       qDebug() << "read" << tmp.length() << "from socket";
+       buf += tmp;
+       return buf.length() >= desired_buffer_length;
 }
 
 void RibbonSocket::onReadyRead()
 {
-       emit readyRead(this);
+       while (tryProcess()) {}
 }
index 32327ae2de6cfeda20cdd7c68f0ea5f036a5e9d6..e72ab6caaaca656ccdc3ac6c84ce566fcbdc3eb4 100644 (file)
@@ -1,23 +1,32 @@
 #ifndef RIBBONSOCKET_H
 #define RIBBONSOCKET_H
 
+#include <QtCore/QByteArray>
 #include <QtNetwork/QTcpSocket>
 
 class RibbonSocket : public QObject {
        Q_OBJECT
+       enum RibbonSocketState {
+               WANT_LENGTH = 0,
+               WANT_DATA = 1,
+       };
+
 public:
        RibbonSocket(QTcpSocket* socket, QObject* parent=0);
        // Consider s to be final.  Don't modify it.
        const QTcpSocket* s();
 signals:
-       void connected(RibbonSocket* s);
        void disconnected(RibbonSocket* s);
-       void readyRead(RibbonSocket* s);
+       void messageReceived(RibbonSocket* s, QByteArray buf);
 private:
        virtual ~RibbonSocket();
        QTcpSocket* socket;
+       QByteArray buf;
+       RibbonSocketState state;
+       quint32 bytes_wanted;
+       bool tryProcess();
+       bool readUntilBufHas(int desired_buffer_length);
 private slots:
-       void onConnect();
        void onDisconnect();
        void onReadyRead();
 };