diff --git a/configFile.cpp b/configFile.cpp index fc98087..fdb5171 100644 --- a/configFile.cpp +++ b/configFile.cpp @@ -11,11 +11,7 @@ using namespace std; ConfigFile::ConfigFile() { - selfId=0; - for(int i=0; i < 8; i++) { - selfId <<= 8; - selfId += rand() % 0xFF; - } + selfId = randId(); } bool ConfigFile::read(const char* path) { @@ -44,7 +40,6 @@ bool ConfigFile::read(const char* path) { addr.sin6_port = htons(port); if(inet_pton(AF_INET6, addrStr.c_str(), &(addr.sin6_addr)) != 1) { - //TODO proper log fprintf(stderr, "Could not convert '%s' to IPv6 address\n", addrStr.c_str()); continue; @@ -52,10 +47,27 @@ bool ConfigFile::read(const char* path) { bootstrapNodes.push_back(Neighbour(id, addr)); } + else if(attr == "data") { + u64 id; + handle >> hex >> id >> dec; + if(id == 0) + id = randId(); + + string dataStr; + getline(handle, dataStr); + + size_t nonWSPos = 0; + for(; nonWSPos < dataStr.size(); nonWSPos++) + if(dataStr[nonWSPos] != ' ' && dataStr[nonWSPos] != '\t') + break; + if(nonWSPos == dataStr.size()) + continue; + + data.push_back(make_pair(id, dataStr.substr(nonWSPos))); + } else if(attr.empty()) continue; else { - //TODO proper log fprintf(stderr, "Unknown configuration item: '%s'\n", attr.c_str()); continue; @@ -83,8 +95,22 @@ bool ConfigFile::write(const char* path) { << addr << ' ' << ntohs(nei.addr.sin6_port) << '\n'; } + for(auto dat : data) { + handle << "data " << hex << dat.first << dec << " " + << dat.second << '\n'; + } + handle.close(); return true; } +u64 ConfigFile::randId() const { + u64 out=0; + for(int i=0; i < 8; i++) { + out <<= 8; + out += rand() % 0xFF; + } + return out; +} + diff --git a/configFile.h b/configFile.h index de09ef3..f7c8237 100644 --- a/configFile.h +++ b/configFile.h @@ -26,8 +26,16 @@ class ConfigFile { }; /** Bootstrap nodes. No setter: wouldn't be useful. */ + const std::vector >& getData() const { + return data; + } + /** Data to publish. */ + private: + u64 randId() const; + u64 selfId; std::vector bootstrapNodes; + std::vector > data; }; diff --git a/dataStore.cpp b/dataStore.cpp index a801788..053484d 100644 --- a/dataStore.cpp +++ b/dataStore.cpp @@ -45,7 +45,6 @@ void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) { timeEvents.push(TimeEvent( time(NULL) + csts::TIME_REPUBLISH_DATA, seqno, id, EV_REPUBLISH)); - } else { timeEvents.push(TimeEvent( diff --git a/flooder.cpp b/flooder.cpp index 6d54477..e9072b6 100644 --- a/flooder.cpp +++ b/flooder.cpp @@ -10,7 +10,6 @@ Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, const std::list& peers) : data(data), datId(datId), seqno(seqno), proto(proto) { - fprintf(stderr, "[DBG] Created flooder for %lX (%u)\n", datId, seqno); for(auto it=peers.begin(); it != peers.end(); ++it) { triesCount[it->id] = 0; toFlood.push(FloodEvt(time(NULL), it->id)); @@ -18,11 +17,23 @@ Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, update(); } +Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, + u64 peer) : + data(data), datId(datId), seqno(seqno), proto(proto) +{ + addPeer(peer); + update(); +} + +Flooder::~Flooder() { +} + void Flooder::update() { while(!toFlood.empty()) { const FloodEvt& evt = toFlood.top(); - if(evt.time > time(NULL)) + if(evt.time > time(NULL)) { break; + } toFlood.pop(); if(triesCount[evt.id] > csts::FLOOD_RETRIES) { @@ -31,7 +42,6 @@ void Flooder::update() { continue; } else if(triesCount[evt.id] < 0) { - fprintf(stderr, "[DBG] Got your IHave, %lX!\n", evt.id); continue; // IHave received } @@ -48,6 +58,11 @@ void Flooder::gotIHave(u64 id, u32 withSeqno) { triesCount[id] = -1; } +void Flooder::addPeer(u64 peer) { + triesCount[peer] = 0; + toFlood.push(FloodEvt(time(NULL), peer)); +} + void Flooder::sendTo(u64 id) { proto->sendData(id, data, seqno, datId); } diff --git a/flooder.h b/flooder.h index ffeca8e..b947214 100644 --- a/flooder.h +++ b/flooder.h @@ -18,6 +18,9 @@ class Flooder { public: Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, const std::list& peers); + Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, + u64 peer); + ~Flooder(); void update(); /** Call often enough (ie ~1s) */ @@ -25,6 +28,9 @@ class Flooder { void gotIHave(u64 id, u32 withSeqno); /** Acknoledges the reception of the resource by peer `id`. */ + void addPeer(u64 peer); + /** Notifies that a new peer must be flooded. */ + bool done() const { return toFlood.empty(); } private: //meth diff --git a/main.cpp b/main.cpp index 597bf17..df6ec88 100644 --- a/main.cpp +++ b/main.cpp @@ -16,6 +16,9 @@ #include #include #include +#include + +bool terminate=false; int main(int argc, char** argv) { bool hasConfig = false; @@ -27,6 +30,8 @@ int main(int argc, char** argv) { srand(time(NULL)+42); + signal(SIGINT, [](int) { terminate = true; }); + ConfigFile cfg; if(hasConfig) { if(!cfg.read(configFilePath) || !cfg.write(configFilePath)) { @@ -40,24 +45,33 @@ int main(int argc, char** argv) { addr.sin6_family = AF_INET6; addr.sin6_port = htons(csts::DEFAULT_PORT); - printf("%lX\n", cfg.getSelfId()); + fprintf(stderr, "[INFO] Starting with ID %lX\n", cfg.getSelfId()); Protocol proto(addr, cfg.getSelfId()); DataStore dataStore(&proto); + for(auto dat : cfg.getData()) { + Bytes pck; + pck << csts::TLV_DATA_TEXT << (u8)dat.second.size(); + for(u8 chr : dat.second) + pck << chr; + dataStore.addData(pck, time(NULL), dat.first, true); + fprintf(stderr, "[INFO] Adding data `%s`\n", dat.second.c_str()); + } + Neighbours neighboursManager(&proto, &dataStore); for(const Neighbour& nei : cfg.getBootstrapNodes()) { char addr[54]; inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54); - printf("Neigh: %lX [%s]:%hu\n", nei.id, addr, - ntohs(nei.addr.sin6_port)); + fprintf(stderr, "[INFO] Bootstrap neighbour: %lX [%s]:%hu\n", + nei.id, addr, ntohs(nei.addr.sin6_port)); neighboursManager.addPotentialNei(nei); } PacketParser pckParser(&neighboursManager, &proto, &dataStore); - while(true) { + while(!terminate) { neighboursManager.fullUpdate(); while(proto.readyRead()) { diff --git a/neighbours.cpp b/neighbours.cpp index 311f4a4..8693360 100644 --- a/neighbours.cpp +++ b/neighbours.cpp @@ -15,6 +15,12 @@ Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) : proto(proto), dataStore(dataStore), lastPeerPeek(0), lastSentNR(0) {} +Neighbours::~Neighbours() { + for(auto fl : dataFlooder) { + delete fl.second; + } +} + void Neighbours::fullCheck() { for(auto nei : neiType) { switch(nei.second) { @@ -66,17 +72,19 @@ void Neighbours::fullUpdate() { for(auto flooder=dataFlooder.begin(); flooder != dataFlooder.end(); ) { - if(flooder->second.done()) + if(flooder->second->done()) { + delete flooder->second; flooder = dataFlooder.erase(flooder); + } else { - flooder->second.update(); + flooder->second->update(); ++flooder; } } for(u64 datId : dataStore->toFlood()) { dataFlooder.insert({datId, - Flooder( + new Flooder( (*dataStore)[datId], datId, dataStore->getSeqno(datId), proto, symNei)}); dataStore->setFlooded(datId); @@ -154,7 +162,7 @@ list* Neighbours::listOfType(NeiType typ) { void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) { if(dataFlooder.find(datId) != dataFlooder.end()) { - dataFlooder.at(datId).gotIHave(from, seqno); + dataFlooder.at(datId)->gotIHave(from, seqno); } } @@ -177,6 +185,11 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) { } if(!wasSpliced) { fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id); + return; + } + + if(nType == NEI_SYM) { + floodTo(id); } } @@ -211,3 +224,19 @@ list::iterator Neighbours::randPeer(list* list) { return it; } +void Neighbours::floodTo(u64 peer) { + vector datIds; + dataStore->ids(datIds); + for(u64 datId : datIds) { + auto curFlooder = dataFlooder.find(datId); + if(curFlooder != dataFlooder.end() && !curFlooder->second->done()) { + curFlooder->second->addPeer(peer); + } + else { + dataFlooder.insert({datId, new Flooder( + (*dataStore)[datId], datId, + dataStore->getSeqno(datId), proto, peer)}); + } + } +} + diff --git a/neighbours.h b/neighbours.h index 2efc540..391733d 100644 --- a/neighbours.h +++ b/neighbours.h @@ -18,6 +18,7 @@ class Neighbours { public: Neighbours(Protocol* proto, DataStore* dataStore); + ~Neighbours(); void fullCheck(); /** Cleans the peers lists by removing the expired entries. */ @@ -65,6 +66,8 @@ class Neighbours { bool sendIHU(u64 id); std::list::iterator randPeer(std::list* list); + void floodTo(u64 peer); + private: Protocol* proto; DataStore* dataStore; @@ -72,7 +75,7 @@ class Neighbours { std::unordered_map lastRecv, lastIHU; std::unordered_map lastPckSent, lastIHUSent; std::unordered_map neiType; - std::unordered_map dataFlooder; + std::unordered_map dataFlooder; time_t lastPeerPeek, lastSentNR; }; diff --git a/packetParser.cpp b/packetParser.cpp index cdd9c90..c5ec304 100644 --- a/packetParser.cpp +++ b/packetParser.cpp @@ -26,7 +26,7 @@ void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) { u8 type, len; pck >> type >> len; - printf("[INFO] Analyzing %d\n", type); + fprintf(stderr, "[INFO] Parsing %d from %lX.\n", type, nei); if(pck.size() < len) { fprintf(stderr, "[WARNING] Advertised TLV does not fit in the packet" diff --git a/protocol.cpp b/protocol.cpp index 392b67c..801cee2 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -27,7 +27,6 @@ Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) : } // Set socket reception timeout - /* struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 100000; @@ -35,7 +34,6 @@ Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) : perror("Cannot set socket timeout"); throw NwError(); } - */ startPollNetwork(); } @@ -71,18 +69,17 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) { Bytes pck; pck << csts::MAGIC << csts::VERSION << (u16)body.size() << selfId << body; - //TODO check size < MTU char buffer[MAX_MTU]; pck.writeToBuffer(buffer, MAX_MTU); ssize_t rc = sendto(sock, buffer, pck.size(), 0, (struct sockaddr*)&to, sizeof(to)); if(rc != (ssize_t)pck.size()) { - //TODO log warning. + fprintf(stderr, "[WARNING] Whole packet not sent\n"); } - fprintf(stderr, "[INFO] sending packet [%d] to port %hu\n", - body.size() > 0 ? body[0] : -1, ntohs(to.sin6_port)); } void Protocol::sendBody(const Bytes& body, const u64& id) { + fprintf(stderr, "[INFO] sending packet [%d] to %lX\n", + body.size() > 0 ? body[0] : -1, id); return sendBody(body, addrOfId(id)); } @@ -169,10 +166,12 @@ void Protocol::pollNetwork() { ssize_t readDat = recvfrom(sock, buffer, MAX_MTU, 0, &fromAddr, &fromAddrLen); if(readDat < 0) { + if(errno == 11) // Socket timeout - expected behaviour + continue; perror("[WARNING] Bad packet"); continue; } - if(readDat <= 0) { + if(readDat == 0) { fprintf(stderr, "[WARNING] Empty packet.\n"); continue; } @@ -209,8 +208,6 @@ void Protocol::pollNetwork() { continue; } - fprintf(stderr, "[INFO] Received packet [%d]\n", - data.size() > 8 ? data[8] : -1); AvailPacket pck; pck.from = convFromAddr; pck.data = data;