From 98924f4ca6f0dd6eaa3c1a8276f9b74198d1acec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Bastian?= Date: Sat, 26 Nov 2016 16:20:20 +0100 Subject: [PATCH] =?UTF-8?q?Now=20flooding=20data=20=E2=80=94=20a=20bit=20t?= =?UTF-8?q?oo=20much?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Bytes.cpp | 12 +++++-- Bytes.h | 5 +++ Makefile | 5 +-- main.cpp | 13 ++++--- neighbours.cpp | 94 ++++++++++++++++++++++++++++++++++++++++-------- neighbours.h | 24 ++++++++++--- nw_constants.h | 7 ++++ packetParser.cpp | 68 ++++++++++++++++++++++++++--------- packetParser.h | 14 +++++--- protocol.cpp | 25 ++++++------- protocol.h | 5 ++- 11 files changed, 204 insertions(+), 68 deletions(-) diff --git a/Bytes.cpp b/Bytes.cpp index d26a4ee..1cb794a 100644 --- a/Bytes.cpp +++ b/Bytes.cpp @@ -53,12 +53,12 @@ Bytes& Bytes::operator<<(u8 v) { return *this; } Bytes& Bytes::operator<<(u16 v) { - insertData(htons(v)); + insertData(v); // insertData(htons(v)); return *this; } Bytes& Bytes::operator<<(u32 v) { - insertData(htonl(v)); + insertData(v); // insertData(htonl(v)); return *this; } @@ -103,6 +103,12 @@ Bytes& Bytes::operator>>(char& v) { return *this; } +Bytes Bytes::extract(size_t len) { + Bytes out = sub(0, len); // Handles out of range + firstIndex += len; + return out; +} + void Bytes::operator=(const Bytes& oth) { firstIndex=0; data.clear(); @@ -112,7 +118,7 @@ void Bytes::operator=(const Bytes& oth) { } Bytes Bytes::sub(size_t beg, size_t len) const { - if(beg+len >= size()) + if(beg+len > size()) throw OutOfRange(); Bytes out; for(size_t byte=0; byte < len; byte++) diff --git a/Bytes.h b/Bytes.h index 474c74d..a74d105 100644 --- a/Bytes.h +++ b/Bytes.h @@ -55,6 +55,11 @@ class Bytes { /// Extracts the given data type from the vector. Returns *this to /// allow chaining. + Bytes extract(size_t len); + /** Extracts the first `len` bytes and returns them. Throws + * `OutOfRange` if `len` > `size()`. + */ + void operator=(const Bytes& oth); /// Copies the given Bytes object into its own data. diff --git a/Makefile b/Makefile index 952266b..9d48549 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,9 @@ CXX=g++ -CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O2 +CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O0 -g CXXLIBS=-lpthread -OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o +OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o \ + dataStore.o flooder.o TARGET = jeanhubert all: $(TARGET) diff --git a/main.cpp b/main.cpp index 4c81bd9..597bf17 100644 --- a/main.cpp +++ b/main.cpp @@ -10,6 +10,7 @@ #include "neighbours.h" #include "packetParser.h" #include "configFile.h" +#include "dataStore.h" #include #include #include @@ -43,7 +44,9 @@ int main(int argc, char** argv) { Protocol proto(addr, cfg.getSelfId()); - Neighbours neighboursManager(&proto); + DataStore dataStore(&proto); + Neighbours neighboursManager(&proto, &dataStore); + for(const Neighbour& nei : cfg.getBootstrapNodes()) { char addr[54]; inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54); @@ -52,7 +55,7 @@ int main(int argc, char** argv) { neighboursManager.addPotentialNei(nei); } - PacketParser pckParser(&neighboursManager, &proto); + PacketParser pckParser(&neighboursManager, &proto, &dataStore); while(true) { neighboursManager.fullUpdate(); @@ -60,10 +63,12 @@ int main(int argc, char** argv) { while(proto.readyRead()) { SockAddr fromAddr; Bytes pck = proto.getPacket(&fromAddr); - pckParser.parse(pck); + pckParser.parse(pck, fromAddr); } - sleep(2); + dataStore.update(); + + sleep(1); } return 0; diff --git a/neighbours.cpp b/neighbours.cpp index a75a1ff..311f4a4 100644 --- a/neighbours.cpp +++ b/neighbours.cpp @@ -11,8 +11,8 @@ using namespace std; -Neighbours::Neighbours(Protocol* proto) : - proto(proto), lastPeerPeek(0) +Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) : + proto(proto), dataStore(dataStore), lastPeerPeek(0), lastSentNR(0) {} void Neighbours::fullCheck() { @@ -46,17 +46,46 @@ void Neighbours::fullUpdate() { if(potentialNei.size() > 0 && symNei.size() < csts::SYM_COUNT_BEFORE_PEEK - && time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK) { - int nPeerId = rand() % potentialNei.size(); - auto it = potentialNei.begin(); - for(int at=0; at < nPeerId; ++at, ++it); + && time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK) + { + auto it = randPeer(&potentialNei); proto->sendEmpty(it->id); lastPckSent[it->id] = time(NULL); lastPeerPeek = time(NULL); } + + + if(symNei.size() > 0 && potentialNei.size() < 5 + && time(NULL) - lastSentNR >= csts::TIME_SEND_NR) + { + auto it = randPeer(&symNei); + proto->sendNReq(it->id); + lastPckSent[it->id] = time(NULL); + lastSentNR = time(NULL); + } + + for(auto flooder=dataFlooder.begin(); flooder != dataFlooder.end(); ) + { + if(flooder->second.done()) + flooder = dataFlooder.erase(flooder); + else { + flooder->second.update(); + ++flooder; + } + } + + for(u64 datId : dataStore->toFlood()) { + dataFlooder.insert({datId, + Flooder( + (*dataStore)[datId], datId, dataStore->getSeqno(datId), + proto, symNei)}); + dataStore->setFlooded(datId); + } } void Neighbours::addPotentialNei(const Neighbour& nei) { + if(neiType.find(nei.id) != neiType.end()) + return; // We already know him potentialNei.push_back(nei); lastRecv.insert({nei.id, 0}); lastIHU.insert({nei.id, 0}); @@ -64,7 +93,10 @@ void Neighbours::addPotentialNei(const Neighbour& nei) { proto->addIdAddr(nei.addr, nei.id); } -void Neighbours::receivedFrom(u64 id) { +void Neighbours::receivedFrom(u64 id, const SockAddr& addr) { + if(neiType.find(id) == neiType.end()) + addPotentialNei(Neighbour(id, addr)); + NeiType typ = neiType[id]; lastRecv[id] = time(NULL); @@ -72,7 +104,13 @@ void Neighbours::receivedFrom(u64 id) { changeNeiType(id, NEI_UNIDIR); } -void Neighbours::hadIHU(u64 id) { +void Neighbours::hadIHU(u64 id, const SockAddr& addr) { + if(neiType.find(id) == neiType.end()) { + fprintf(stderr, "[WARNING] %lX heard us — yet we did not send " + "anything?\n", id); + addPotentialNei(Neighbour(id, addr)); + } + NeiType typ = neiType[id]; lastRecv[id] = time(NULL); lastIHU[id] = time(NULL); @@ -80,6 +118,26 @@ void Neighbours::hadIHU(u64 id) { changeNeiType(id, NEI_SYM); } +void Neighbours::getNeighbours(vector& out, u64 except, int count) { + vector::iterator > permutation; + for(auto it = symNei.begin(); it != symNei.end(); ++it) + permutation.push_back(it); + for(size_t i=0; i < permutation.size(); i++) { + int swapWith = rand() % permutation.size(); + auto tmp = permutation[i]; + permutation[i] = permutation[swapWith]; + permutation[swapWith] = tmp; + } + + for(int cur=0, pos=0; cur < count && (size_t)pos < permutation.size(); + pos++) { + if(permutation[pos]->id != except) { + out.push_back(*permutation[pos]); + cur++; + } + } +} + list* Neighbours::listOfType(NeiType typ) { switch(typ) { case NEI_POTENTIAL: @@ -94,8 +152,13 @@ list* Neighbours::listOfType(NeiType typ) { throw WrongNeiType(); } +void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) { + if(dataFlooder.find(datId) != dataFlooder.end()) { + dataFlooder.at(datId).gotIHave(from, seqno); + } +} + void Neighbours::changeNeiType(u64 id, NeiType nType) { - printf("TYPE %lX %d\n", id, neiType[id]); NeiType cType = neiType[id]; if(cType == nType) return; @@ -106,11 +169,8 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) { bool wasSpliced=false; for(auto it=fromList->begin(); it != fromList->end(); ++it) { if(it->id == id) { - printf("%ld %ld\n", fromList->size(), toList->size()); toList->push_back(*it); // splice() doesn't work?! fromList->erase(it); - printf("%ld %ld %d\n", fromList->size(), toList->size(), - toList == &unidirNei); wasSpliced=true; break; } @@ -118,7 +178,6 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) { if(!wasSpliced) { fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id); } - printf("TYPE %lX %d\n", id, neiType[id]); } void Neighbours::updateSendPackets(const Neighbour& nei) { @@ -128,7 +187,6 @@ void Neighbours::updateSendPackets(const Neighbour& nei) { bool Neighbours::sendEmpty(u64 id) { if(time(NULL) - lastPckSent[id] >= csts::TIME_RESEND_EMPTY) { - printf("[DBG] sending empty packet to %lX.\n", id); lastPckSent[id] = time(NULL); proto->sendEmpty(id); return true; @@ -138,7 +196,6 @@ bool Neighbours::sendEmpty(u64 id) { bool Neighbours::sendIHU(u64 id) { if(time(NULL) - lastIHUSent[id] >= csts::TIME_RESEND_IHU) { - printf("[DBG] sending IHU packet to %lX.\n", id); lastIHUSent[id] = time(NULL); lastPckSent[id] = time(NULL); proto->sendIHU(id); @@ -147,3 +204,10 @@ bool Neighbours::sendIHU(u64 id) { return false; } +list::iterator Neighbours::randPeer(list* list) { + int nPeerId = rand() % list->size(); + auto it = list->begin(); + for(int at=0; at < nPeerId; ++at, ++it); + return it; +} + diff --git a/neighbours.h b/neighbours.h index b10f7f1..2efc540 100644 --- a/neighbours.h +++ b/neighbours.h @@ -7,14 +7,17 @@ #pragma once #include +#include #include #include #include "data.h" #include "protocol.h" +#include "flooder.h" +#include "dataStore.h" class Neighbours { public: - Neighbours(Protocol* proto); + Neighbours(Protocol* proto, DataStore* dataStore); void fullCheck(); /** Cleans the peers lists by removing the expired entries. */ @@ -28,16 +31,26 @@ class Neighbours { void addPotentialNei(const Neighbour& nei); /** Adds a `Neighbour` to the list of potential neighbours. */ - void receivedFrom(u64 id); + void receivedFrom(u64 id, const SockAddr& addr); /** Signals that a packet was received from `id`, performs the * appropriate bookkeeping actions. */ - void hadIHU(u64 id); + void hadIHU(u64 id, const SockAddr& addr); /** Signals that a IHU was received from `id`, performs the * appropriate bookkeeping actions. */ + void getNeighbours(std::vector& out, u64 except, int count); + /** Fills `out` with at most `count` symetric neighbours, not + * including `except`. + */ + + void gotIHave(u64 from, u64 datId, u32 seqno); + /** Notifies the flooders that a `IHave` packet has been received for + * this data from the peer `from`. + */ + private: //meth class WrongNeiType : public std::exception {}; enum NeiType { @@ -50,13 +63,16 @@ class Neighbours { bool sendEmpty(u64 id); bool sendIHU(u64 id); + std::list::iterator randPeer(std::list* list); private: Protocol* proto; + DataStore* dataStore; std::list potentialNei, unidirNei, symNei; std::unordered_map lastRecv, lastIHU; std::unordered_map lastPckSent, lastIHUSent; std::unordered_map neiType; - time_t lastPeerPeek; + std::unordered_map dataFlooder; + time_t lastPeerPeek, lastSentNR; }; diff --git a/nw_constants.h b/nw_constants.h index 7d3b5e1..5591e3d 100644 --- a/nw_constants.h +++ b/nw_constants.h @@ -28,11 +28,18 @@ const u8 TLV_DATA_JPG = 34; const int TIMEOUT_UNIDIR = 100; // s const int TIMEOUT_SYM_RECV = 150; // s const int TIMEOUT_SYM_IHU = 300; // s +const int TIMEOUT_DATA = 35*60; // s const int TIME_RESEND_IHU = 90; // s const int TIME_RESEND_EMPTY = 30; // s +const int TIME_REPUBLISH_DATA = 25*60; // s const int TIME_PEER_PEEK = 30; // s +const int TIME_SEND_NR = 60; // s +const int TIME_RESEND_FLOOD = 3; // s const int SYM_COUNT_BEFORE_PEEK = 5; +const int NB_NEIGH_PER_NR = 5; + +const int FLOOD_RETRIES = 3; const u16 DEFAULT_PORT = 1192; diff --git a/packetParser.cpp b/packetParser.cpp index 2ee9224..cdd9c90 100644 --- a/packetParser.cpp +++ b/packetParser.cpp @@ -7,24 +7,35 @@ #include "packetParser.h" #include -PacketParser::PacketParser(Neighbours* nei, Protocol* proto) : - neighbours(nei), protocol(proto) +PacketParser::PacketParser(Neighbours* nei, Protocol* proto, + DataStore* dataStore) : + neighbours(nei), protocol(proto), dataStore(dataStore) {} -void PacketParser::parse(Bytes pck) { +void PacketParser::parse(Bytes pck, const SockAddr& addr) { u64 peerId; pck >> peerId; - neighbours->receivedFrom(peerId); + neighbours->receivedFrom(peerId, addr); while(pck.size() > 0) { - readTLV(pck, peerId); + readTLV(pck, peerId, addr); } } -void PacketParser::readTLV(Bytes& pck, u64 nei) { +void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) { u8 type, len; pck >> type >> len; + printf("[INFO] Analyzing %d\n", type); + + if(pck.size() < len) { + fprintf(stderr, "[WARNING] Advertised TLV does not fit in the packet" + ".\n"); + pck.extract(pck.size()); + return; + } + Bytes subpacket = pck.extract(len); + switch(type) { case csts::TLV_PAD1: case csts::TLV_PADN: @@ -32,34 +43,34 @@ void PacketParser::readTLV(Bytes& pck, u64 nei) { case csts::TLV_IHU: { u64 ihuId; - pck >> ihuId; + subpacket >> ihuId; if(ihuId != protocol->getSelfId()) break; - neighbours->hadIHU(nei); + neighbours->hadIHU(nei, addr); break; } case csts::TLV_NR: - //TODO + handleNR(nei); break; case csts::TLV_NEIGH: - receiveNeigh(pck, len); + receiveNeigh(subpacket); break; case csts::TLV_DATA: - receiveData(pck, len, nei); + receiveData(subpacket, nei); break; case csts::TLV_IHAVE: - //TODO + receiveIHave(subpacket, nei); break; } } -void PacketParser::receiveNeigh(Bytes& pck, u8 length) { - while(length >= 8+16+2) { /* enough to read one peer */ +void PacketParser::receiveNeigh(Bytes& pck) { + while(pck.size() >= 8+16+2) { /* enough to read one peer */ u64 id; u16 port; SockAddr addr; @@ -72,17 +83,40 @@ void PacketParser::receiveNeigh(Bytes& pck, u8 length) { pck >> port; addr.sin6_port = htons(port); + if(id == protocol->getSelfId()) + continue; + + fprintf(stderr, "[INFO] Adding neighbour %lX\n", id); + Neighbour nei(id, addr); neighbours->addPotentialNei(nei); } } -void PacketParser::receiveData(Bytes& pck, u8 length, u64 from) { +void PacketParser::receiveData(Bytes& pck, u64 from) { u32 seqNo; u64 datId; pck >> seqNo >> datId; + neighbours->gotIHave(from, datId, seqNo); protocol->sendIHave(from, datId, seqNo); - //TODO - length++; + dataStore->addData(pck, seqNo, datId, false); +} + +void PacketParser::receiveIHave(Bytes& pck, u64 from) { + u32 seqno; + u64 datId; + if(pck.size() < 8+4) { + fprintf(stderr, "[WARNING] IHave too short from %lX\n", from); + return; + } + pck >> seqno >> datId; + neighbours->gotIHave(from, datId, seqno); +} + +void PacketParser::handleNR(u64 from) { + std::vector neigh; + neighbours->getNeighbours(neigh, from, csts::NB_NEIGH_PER_NR); + + protocol->sendNeighbours(from, neigh); } diff --git a/packetParser.h b/packetParser.h index cacf50c..7fff987 100644 --- a/packetParser.h +++ b/packetParser.h @@ -8,20 +8,24 @@ #include "neighbours.h" #include "protocol.h" +#include "dataStore.h" class PacketParser { public: - PacketParser(Neighbours* nei, Protocol* proto); + PacketParser(Neighbours* nei, Protocol* proto, DataStore* dataStore); - void parse(Bytes pck); + void parse(Bytes pck, const SockAddr& addr); private: //meth - void readTLV(Bytes& pck, u64 nei); - void receiveNeigh(Bytes& pck, u8 length); - void receiveData(Bytes& pck, u8 length, u64 from); + void readTLV(Bytes& pck, u64 nei, const SockAddr& addr); + void receiveNeigh(Bytes& pck); + void receiveData(Bytes& pck, u64 from); + void receiveIHave(Bytes& pck, u64 from); + void handleNR(u64 from); private: Neighbours* neighbours; Protocol* protocol; + DataStore* dataStore; }; diff --git a/protocol.cpp b/protocol.cpp index 83bb1d7..392b67c 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -79,7 +79,8 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) { if(rc != (ssize_t)pck.size()) { //TODO log warning. } - fprintf(stderr, "[INFO] sending packet to port %hu\n", ntohs(to.sin6_port)); + fprintf(stderr, "[INFO] sending packet [%d] to port %hu\n", + body.size() > 0 ? body[0] : -1, ntohs(to.sin6_port)); } void Protocol::sendBody(const Bytes& body, const u64& id) { return sendBody(body, addrOfId(id)); @@ -130,20 +131,20 @@ void Protocol::sendNeighbours(u64 id, const std::vector& neigh) { sendNeighbours(addrOfId(id), neigh); } -void Protocol::sendData(const SockAddr& to, const char* data, size_t len, +void Protocol::sendData(const SockAddr& to, const Bytes& data, u32 seqno, u64 datId) { Bytes pck; - if(len + 12 >= 1<<8) + if(data.size() + 12 >= 1<<8) throw DataTooLongError(); - pck << csts::TLV_DATA << (u8) (12 + len) << seqno << datId - << Bytes(data, len); + pck << csts::TLV_DATA << (u8) (12 + data.size()) << seqno << datId + << data; sendBody(pck, to); } -void Protocol::sendData(u64 id, const char* data, size_t len, u32 seqno, +void Protocol::sendData(u64 id, const Bytes& data, u32 seqno, u64 datId) { - sendData(addrOfId(id), data, len, seqno, datId); + sendData(addrOfId(id), data, seqno, datId); } void Protocol::sendIHave(const SockAddr& to, u64 datId, u32 seqno) { @@ -205,17 +206,11 @@ void Protocol::pollNetwork() { else { fprintf(stderr, "[ERROR] Unknown address family %d.\n", fromAddr.sa_family); - fprintf(stderr, "\t%hu\n", ((SockAddr*)&fromAddr)->sin6_port); - while(data.size() > 0) { - u8 c; - data >> c; - fprintf(stderr, "%02X|", c); - } - fprintf(stderr, "\n"); continue; } - puts("Received packet"); + fprintf(stderr, "[INFO] Received packet [%d]\n", + data.size() > 8 ? data[8] : -1); AvailPacket pck; pck.from = convFromAddr; pck.data = data; diff --git a/protocol.h b/protocol.h index 6e74194..15778d8 100644 --- a/protocol.h +++ b/protocol.h @@ -61,10 +61,9 @@ class Protocol { const std::vector& neigh); /** Sends a neighbours list packet */ - void sendData(const SockAddr& to, const char* data, size_t len, - u32 seqno, u64 datId); - void sendData(u64 id, const char* data, size_t len, + void sendData(const SockAddr& to, const Bytes& data, u32 seqno, u64 datId); + void sendData(u64 id, const Bytes& data, u32 seqno, u64 datId); void sendIHave(const SockAddr& to, u64 datId, u32 seqno); void sendIHave(u64 id, u64 datId, u32 seqno);