Aggregate packets before sending.

This commit is contained in:
Théophile Bastian 2016-11-27 09:49:28 +01:00
parent 574bef34e2
commit 32b56c6c91
4 changed files with 47 additions and 15 deletions

View file

@ -83,6 +83,7 @@ int main(int argc, char** argv) {
dataStore.update(); dataStore.update();
proto.sendAllNow();
sleep(1); sleep(1);
} }

View file

@ -38,6 +38,7 @@ const int TIME_RESEND_FLOOD = 3; // s
const int SYM_COUNT_BEFORE_PEEK = 5; const int SYM_COUNT_BEFORE_PEEK = 5;
const int NB_NEIGH_PER_NR = 5; const int NB_NEIGH_PER_NR = 5;
const int DFT_MAX_MTU = 1460; // bytes
const int FLOOD_RETRIES = 3; const int FLOOD_RETRIES = 3;

View file

@ -65,22 +65,18 @@ Bytes Protocol::getPacket(SockAddr* from) {
return pck.data; return pck.data;
} }
void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
Bytes pck;
pck << csts::MAGIC << csts::VERSION << (u16)body.size() << selfId
<< body;
char buffer[MAX_MTU];
pck.writeToBuffer(buffer, MAX_MTU);
ssize_t rc =
sendto(sock, buffer, pck.size(), 0, (struct sockaddr*)&to, sizeof(to));
if(rc != (ssize_t)pck.size()) {
fprintf(stderr, "[WARNING] Whole packet not sent\n");
}
}
void Protocol::sendBody(const Bytes& body, const u64& id) { void Protocol::sendBody(const Bytes& body, const u64& id) {
fprintf(stderr, "[INFO] sending packet [%d] to %lX\n", fprintf(stderr, "[INFO] sending packet [%d] to %lX\n",
body.size() > 0 ? body[0] : -1, id); body.size() > 0 ? body[0] : -1, id);
return sendBody(body, addrOfId(id));
if(aggregatedTLVs.find(id) != aggregatedTLVs.end()) { // TLVs are waiting
if(aggregatedTLVs[id].size() + body.size() + 12 > csts::DFT_MAX_MTU) {
// Can't wait
sendNow(id);
}
}
aggregatedTLVs[id] << body;
} }
void Protocol::sendEmpty(u64 to) { void Protocol::sendEmpty(u64 to) {
@ -132,6 +128,11 @@ void Protocol::sendIHave(u64 to, u64 datId, u32 seqno) {
sendBody(pck, to); sendBody(pck, to);
} }
void Protocol::sendAllNow() {
for(auto& waiting : aggregatedTLVs)
sendNow(waiting.first);
}
void Protocol::startPollNetwork() { void Protocol::startPollNetwork() {
pollThread = std::thread([this] { this->pollNetwork(); }); pollThread = std::thread([this] { this->pollNetwork(); });
} }
@ -216,3 +217,22 @@ SockAddr Protocol::addrOfV4(const sockaddr_in& addrv4) {
return out; return out;
} }
void Protocol::sendNow(u64 id) {
if(aggregatedTLVs.find(id) == aggregatedTLVs.end())
return; // Nothing to send.
Bytes aggregated = aggregatedTLVs[id];
aggregatedTLVs.erase(id);
Bytes pck;
pck << csts::MAGIC << csts::VERSION << (u16)aggregated.size() << selfId
<< aggregated;
char buffer[MAX_MTU];
pck.writeToBuffer(buffer, MAX_MTU);
SockAddr destAddr = addrOfId(id);
ssize_t rc = sendto(sock, buffer, pck.size(), 0,
(struct sockaddr*)&destAddr, sizeof(destAddr));
if(rc != (ssize_t)pck.size()) {
fprintf(stderr, "[WARNING] Whole packet not sent\n");
}
}

View file

@ -40,9 +40,10 @@ class Protocol {
Bytes getPacket(SockAddr* from); Bytes getPacket(SockAddr* from);
/** Appends the body of a full packet to `out`. */ /** Appends the body of a full packet to `out`. */
void sendBody(const Bytes& body, const SockAddr& to);
void sendBody(const Bytes& body, const u64& id); void sendBody(const Bytes& body, const u64& id);
/** Sends the given `body` (wrapped in headers) */ /** Sends the given `body` (wrapped in headers)
* Actually, this puts the `body` in an aggregated TLVs bytes and
* waits up to 500ms before sending. */
void sendEmpty(u64 to); void sendEmpty(u64 to);
@ -64,12 +65,19 @@ class Protocol {
u64 getSelfId() const { return selfId; } u64 getSelfId() const { return selfId; }
/** Returns the ID of this node. */ /** Returns the ID of this node. */
void sendAllNow();
/** Notifies the object that it should send every waiting packet now.
*/
private: //meth private: //meth
void startPollNetwork(); void startPollNetwork();
void pollNetwork(); void pollNetwork();
const SockAddr& addrOfId(u64 id); const SockAddr& addrOfId(u64 id);
SockAddr addrOfV4(const sockaddr_in& addrv4); SockAddr addrOfV4(const sockaddr_in& addrv4);
void sendNow(u64 id);
/** Sends the aggregated TLVs right now. */
private: private:
struct AvailPacket { struct AvailPacket {
SockAddr from; SockAddr from;
@ -85,5 +93,7 @@ class Protocol {
std::mutex availPacketsMutex; std::mutex availPacketsMutex;
std::queue<AvailPacket> availPackets; std::queue<AvailPacket> availPackets;
std::unordered_map<u64, SockAddr> addrMap; std::unordered_map<u64, SockAddr> addrMap;
std::unordered_map<u64, Bytes> aggregatedTLVs;
}; };