From 32b56c6c9132920fac4d8bff76c9d97746f3d27f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Bastian?= Date: Sun, 27 Nov 2016 09:49:28 +0100 Subject: [PATCH] Aggregate packets before sending. --- main.cpp | 1 + nw_constants.h | 1 + protocol.cpp | 46 +++++++++++++++++++++++++++++++++------------- protocol.h | 14 ++++++++++++-- 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/main.cpp b/main.cpp index cf7fd25..dc0891f 100644 --- a/main.cpp +++ b/main.cpp @@ -83,6 +83,7 @@ int main(int argc, char** argv) { dataStore.update(); + proto.sendAllNow(); sleep(1); } diff --git a/nw_constants.h b/nw_constants.h index 5591e3d..8e39454 100644 --- a/nw_constants.h +++ b/nw_constants.h @@ -38,6 +38,7 @@ const int TIME_RESEND_FLOOD = 3; // s const int SYM_COUNT_BEFORE_PEEK = 5; const int NB_NEIGH_PER_NR = 5; +const int DFT_MAX_MTU = 1460; // bytes const int FLOOD_RETRIES = 3; diff --git a/protocol.cpp b/protocol.cpp index c06ed96..ba6b173 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -65,22 +65,18 @@ Bytes Protocol::getPacket(SockAddr* from) { return pck.data; } -void Protocol::sendBody(const Bytes& body, const SockAddr& to) { - Bytes pck; - pck << csts::MAGIC << csts::VERSION << (u16)body.size() << selfId - << body; - char buffer[MAX_MTU]; - pck.writeToBuffer(buffer, MAX_MTU); - ssize_t rc = - sendto(sock, buffer, pck.size(), 0, (struct sockaddr*)&to, sizeof(to)); - if(rc != (ssize_t)pck.size()) { - fprintf(stderr, "[WARNING] Whole packet not sent\n"); - } -} void Protocol::sendBody(const Bytes& body, const u64& id) { fprintf(stderr, "[INFO] sending packet [%d] to %lX\n", body.size() > 0 ? body[0] : -1, id); - return sendBody(body, addrOfId(id)); + + if(aggregatedTLVs.find(id) != aggregatedTLVs.end()) { // TLVs are waiting + if(aggregatedTLVs[id].size() + body.size() + 12 > csts::DFT_MAX_MTU) { + // Can't wait + sendNow(id); + } + } + + aggregatedTLVs[id] << body; } void Protocol::sendEmpty(u64 to) { @@ -132,6 +128,11 @@ void Protocol::sendIHave(u64 to, u64 datId, u32 seqno) { sendBody(pck, to); } +void Protocol::sendAllNow() { + for(auto& waiting : aggregatedTLVs) + sendNow(waiting.first); +} + void Protocol::startPollNetwork() { pollThread = std::thread([this] { this->pollNetwork(); }); } @@ -216,3 +217,22 @@ SockAddr Protocol::addrOfV4(const sockaddr_in& addrv4) { return out; } +void Protocol::sendNow(u64 id) { + if(aggregatedTLVs.find(id) == aggregatedTLVs.end()) + return; // Nothing to send. + Bytes aggregated = aggregatedTLVs[id]; + aggregatedTLVs.erase(id); + + Bytes pck; + pck << csts::MAGIC << csts::VERSION << (u16)aggregated.size() << selfId + << aggregated; + char buffer[MAX_MTU]; + pck.writeToBuffer(buffer, MAX_MTU); + SockAddr destAddr = addrOfId(id); + ssize_t rc = sendto(sock, buffer, pck.size(), 0, + (struct sockaddr*)&destAddr, sizeof(destAddr)); + if(rc != (ssize_t)pck.size()) { + fprintf(stderr, "[WARNING] Whole packet not sent\n"); + } +} + diff --git a/protocol.h b/protocol.h index d945e20..949f1f0 100644 --- a/protocol.h +++ b/protocol.h @@ -40,9 +40,10 @@ class Protocol { Bytes getPacket(SockAddr* from); /** Appends the body of a full packet to `out`. */ - void sendBody(const Bytes& body, const SockAddr& to); void sendBody(const Bytes& body, const u64& id); - /** Sends the given `body` (wrapped in headers) */ + /** Sends the given `body` (wrapped in headers) + * Actually, this puts the `body` in an aggregated TLVs bytes and + * waits up to 500ms before sending. */ void sendEmpty(u64 to); @@ -64,12 +65,19 @@ class Protocol { u64 getSelfId() const { return selfId; } /** Returns the ID of this node. */ + void sendAllNow(); + /** Notifies the object that it should send every waiting packet now. + */ + private: //meth void startPollNetwork(); void pollNetwork(); const SockAddr& addrOfId(u64 id); SockAddr addrOfV4(const sockaddr_in& addrv4); + void sendNow(u64 id); + /** Sends the aggregated TLVs right now. */ + private: struct AvailPacket { SockAddr from; @@ -85,5 +93,7 @@ class Protocol { std::mutex availPacketsMutex; std::queue availPackets; std::unordered_map addrMap; + + std::unordered_map aggregatedTLVs; };