From: Drew Fisher Date: Thu, 17 Apr 2014 04:35:32 +0000 (-0700) Subject: Length-coded framing for socket reads X-Git-Url: http://git.zarvox.org/shortlog/2013?a=commitdiff_plain;h=ddfb23ddb57d31558937d531677ad86ecca58d11;p=imoo.git Length-coded framing for socket reads Messages go on the wire as: struct message { uint32_t length; uint8_t data[length]; } and are correctly decoded. --- diff --git a/ribbon/ribbonserver.cpp b/ribbon/ribbonserver.cpp index ec3d902..1c2ea15 100644 --- a/ribbon/ribbonserver.cpp +++ b/ribbon/ribbonserver.cpp @@ -36,14 +36,24 @@ void RibbonServer::onNewConnection() } QObject::connect(rsock, SIGNAL(disconnected(RibbonSocket*)), this, SLOT(onDisconnection(RibbonSocket*))); + QObject::connect(rsock, SIGNAL(messageReceived(RibbonSocket*,QByteArray)), + this, SLOT(onMessageReceived(RibbonSocket*,QByteArray))); } void RibbonServer::onDisconnection(RibbonSocket* r) { sockets.removeOne(r); qDebug() << "Socket disconnected:" << r->s(); + r->deleteLater(); if (!server->isListening()) { server->listen(_addr, _port); qDebug() << "Server listening on" << _addr << ":" << _port; } } + +void RibbonServer::onMessageReceived(RibbonSocket* r, QByteArray buf) +{ + qDebug() << "server: got data from" << r->s()->peerAddress() << r->s()->peerPort(); + qDebug() << " message was:" << buf.constData(); + // TODO: do something with the packet in buf.constData() +} diff --git a/ribbon/ribbonserver.h b/ribbon/ribbonserver.h index f5fd8f2..cadb623 100644 --- a/ribbon/ribbonserver.h +++ b/ribbon/ribbonserver.h @@ -28,6 +28,7 @@ private: private slots: void onNewConnection(); void onDisconnection(RibbonSocket* r); + void onMessageReceived(RibbonSocket* r, QByteArray buf); }; #endif /* RIBBONSERVER_H */ diff --git a/ribbon/ribbonsocket.cpp b/ribbon/ribbonsocket.cpp index 722cdd1..8b7e2e5 100644 --- a/ribbon/ribbonsocket.cpp +++ b/ribbon/ribbonsocket.cpp @@ -1,9 +1,12 @@ #include "ribbonsocket.h" +#include -RibbonSocket::RibbonSocket(QTcpSocket* sock, QObject* parent) : QObject(parent), socket(sock) +RibbonSocket::RibbonSocket(QTcpSocket* sock, QObject* parent) : + QObject(parent), + socket(sock), + state(WANT_LENGTH), + bytes_wanted(0) { - QObject::connect(socket, SIGNAL(connected()), - this, SLOT(onConnect())); QObject::connect(socket, SIGNAL(disconnected()), this, SLOT(onDisconnect())); QObject::connect(socket, SIGNAL(readyRead()), @@ -19,17 +22,66 @@ const QTcpSocket* RibbonSocket::s() return socket; } -void RibbonSocket::onConnect() +void RibbonSocket::onDisconnect() { - emit connected(this); + emit disconnected(this); } -void RibbonSocket::onDisconnect() +/** + * Returns true if progress was made, false if no progress was made and we need + * more bytes from the socket + */ +bool RibbonSocket::tryProcess() { - emit disconnected(this); + switch(state) { + case WANT_LENGTH: + qDebug() << "Want length, buf size before read:" << buf.length(); + if (readUntilBufHas(4)) { + // Slice 4 octets off as big-endian integer + QByteArray size_buf = buf.left(4); + buf = buf.mid(4); + QDataStream stream(size_buf); + stream >> bytes_wanted; + // Optimization: preallocate the right number of bytes + // TODO: decide on a maximum acceptable packet length, to avoid + // arbitrarily large packet buffer + buf.reserve(bytes_wanted); + qDebug() << "length:" << bytes_wanted; + state = WANT_DATA; + return true; + } + break; + case WANT_DATA: + qDebug() << "Want data, buf size before read:" << buf.length(); + if (readUntilBufHas(bytes_wanted)) { + QByteArray data = buf.left(bytes_wanted); + buf = buf.mid(bytes_wanted); + // Memory optimization: release the rest of the buffer + buf.squeeze(); + qDebug() << "data:" << buf; + emit messageReceived(this, data); + state = WANT_LENGTH; + return true; + } + break; + } + return false; +} + +/** + * returns true if buf has enough bytes + * false otherwise + */ +bool RibbonSocket::readUntilBufHas(int desired_buffer_length) +{ + int size_to_read = desired_buffer_length - buf.length(); + QByteArray tmp = socket->read(size_to_read); + qDebug() << "read" << tmp.length() << "from socket"; + buf += tmp; + return buf.length() >= desired_buffer_length; } void RibbonSocket::onReadyRead() { - emit readyRead(this); + while (tryProcess()) {} } diff --git a/ribbon/ribbonsocket.h b/ribbon/ribbonsocket.h index 32327ae..e72ab6c 100644 --- a/ribbon/ribbonsocket.h +++ b/ribbon/ribbonsocket.h @@ -1,23 +1,32 @@ #ifndef RIBBONSOCKET_H #define RIBBONSOCKET_H +#include #include class RibbonSocket : public QObject { Q_OBJECT + enum RibbonSocketState { + WANT_LENGTH = 0, + WANT_DATA = 1, + }; + public: RibbonSocket(QTcpSocket* socket, QObject* parent=0); // Consider s to be final. Don't modify it. const QTcpSocket* s(); signals: - void connected(RibbonSocket* s); void disconnected(RibbonSocket* s); - void readyRead(RibbonSocket* s); + void messageReceived(RibbonSocket* s, QByteArray buf); private: virtual ~RibbonSocket(); QTcpSocket* socket; + QByteArray buf; + RibbonSocketState state; + quint32 bytes_wanted; + bool tryProcess(); + bool readUntilBufHas(int desired_buffer_length); private slots: - void onConnect(); void onDisconnect(); void onReadyRead(); };