From 1527c1c6fea76969aa9b63a338052664e9ee7ef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Bastian?= Date: Wed, 23 Nov 2016 23:49:48 +0100 Subject: [PATCH] Proper handling of neighbours updates. Still have to interface it cleanly with the inbound network packets --- .gitignore | 1 + Makefile | 2 +- main.cpp | 39 +++++++++++++-- neighbours.cpp | 126 +++++++++++++++++++++++++++++++++++++++++++++++++ neighbours.h | 59 +++++++++++++++++++++++ nw_constants.h | 11 +++-- protocol.cpp | 30 +++++++++--- protocol.h | 9 +++- 8 files changed, 263 insertions(+), 14 deletions(-) create mode 100644 neighbours.cpp create mode 100644 neighbours.h diff --git a/.gitignore b/.gitignore index e5e1e4a..44138b9 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ *.app projet.pdf +jeanhubert diff --git a/Makefile b/Makefile index 61d6259..33ca1f5 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CXX=g++ CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O2 CXXLIBS=-lpthread -OBJS = Bytes.o main.o protocol.o +OBJS = Bytes.o main.o protocol.o neighbours.o TARGET = jeanhubert all: $(TARGET) diff --git a/main.cpp b/main.cpp index 349603f..3e2aaea 100644 --- a/main.cpp +++ b/main.cpp @@ -7,9 +7,12 @@ #include "data.h" #include "protocol.h" #include "nw_constants.h" +#include "neighbours.h" #include #include #include +#include +#include int main(int /*argc*/, char** /*argv*/) { srand(time(NULL)+42); @@ -17,7 +20,7 @@ int main(int /*argc*/, char** /*argv*/) { SockAddr addr; memset(&addr, 0, sizeof(addr)); addr.sin6_family = AF_INET6; - addr.sin6_port = csts::DEFAULT_PORT; + addr.sin6_port = htons(csts::DEFAULT_PORT); u64 myId=0; for(int i=0; i < 8; i++) { @@ -25,10 +28,12 @@ int main(int /*argc*/, char** /*argv*/) { myId += rand() % (1<<8); } + printf("%lu\n", myId); + SockAddr jch_addr; memset(&jch_addr, 0, sizeof(jch_addr)); jch_addr.sin6_family = AF_INET6; - jch_addr.sin6_port = csts::DEFAULT_PORT; + jch_addr.sin6_port = htons(1212); int rc = inet_pton(AF_INET6, "::FFFF:81.194.27.155", &jch_addr.sin6_addr); if(rc != 1) { if(rc == 0) @@ -37,9 +42,37 @@ int main(int /*argc*/, char** /*argv*/) { perror("Cannot convert JCh address"); exit(1); } + u64 jch_id = 0x43e3a5e0; + jch_id <<= 32; + jch_id += 0x10095a0f; Protocol proto(addr, myId); - proto.sendEmpty(jch_addr); + + Neighbours neighboursManager(&proto); + neighboursManager.addPotentialNei(Neighbour(jch_id, jch_addr)); + +// proto.sendEmpty(loc_addr); +// proto.sendEmpty(jch_addr); + + while(true) { + neighboursManager.fullUpdate(); + sleep(2); + } + + /* + SockAddr loc_addr; + memset(&loc_addr, 0, sizeof(loc_addr)); + loc_addr.sin6_family = AF_INET6; + loc_addr.sin6_port = htons(1212); + rc = inet_pton(AF_INET6, "::FFFF:127.0.0.1", &loc_addr.sin6_addr); + if(rc != 1) { + fprintf(stderr, "Error."); + exit(1); + } + + + while(true); + */ return 0; } diff --git a/neighbours.cpp b/neighbours.cpp new file mode 100644 index 0000000..92cd130 --- /dev/null +++ b/neighbours.cpp @@ -0,0 +1,126 @@ +/*************************************************************************** + * By Théophile Bastian, 2017 + * M1 Network course project at ENS Cachan, Juliusz Chroboczek. + * License: WTFPL v2 + **************************************************************************/ + +#include "neighbours.h" +#include "nw_constants.h" + +#include + +using namespace std; + +Neighbours::Neighbours(Protocol* proto) : + proto(proto), lastPeerPeek(0) +{} + +void Neighbours::fullCheck() { + for(auto nei : neiType) { + switch(nei.second) { + case NEI_SYM: + if(time(NULL) - lastRecv[nei.first] > csts::TIMEOUT_SYM_RECV + || (time(NULL) - lastIHU[nei.first] + > csts::TIMEOUT_SYM_IHU)) + { + changeNeiType(nei.first, NEI_POTENTIAL); + } + break; + case NEI_UNIDIR: + if(time(NULL) - lastRecv[nei.first] > csts::TIMEOUT_UNIDIR) + changeNeiType(nei.first, NEI_POTENTIAL); + break; + default: + break; + } + } +} + +void Neighbours::fullUpdate() { + fullCheck(); + + for(auto nei : unidirNei) + updateSendPackets(nei, proto); + for(auto nei : symNei) + updateSendPackets(nei, proto); + + if(potentialNei.size() > 0 + && symNei.size() < csts::SYM_COUNT_BEFORE_PEEK + && time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK) { + int nPeerId = rand() % potentialNei.size(); + auto it = potentialNei.begin(); + for(int at=0; at < nPeerId; ++at, ++it); + proto->sendEmpty(it->id); + lastPeerPeek = time(NULL); + } +} + +void Neighbours::addPotentialNei(const Neighbour& nei) { + potentialNei.push_back(nei); + lastRecv.insert({nei.id, 0}); + lastIHU.insert({nei.id, 0}); + neiType.insert({nei.id, NEI_POTENTIAL}); + proto->addIdAddr(nei.addr, nei.id); +} + +void Neighbours::receivedFrom(u64 id) { + NeiType typ = neiType[id]; + lastRecv.insert({id, time(NULL)}); + if(typ == NEI_POTENTIAL) + changeNeiType(id, NEI_UNIDIR); +} + +void Neighbours::hadIHU(u64 id) { + NeiType typ = neiType[id]; + lastRecv.insert({id, time(NULL)}); + lastIHU.insert({id, time(NULL)}); + if(typ == NEI_POTENTIAL || typ == NEI_UNIDIR) + changeNeiType(id, NEI_SYM); +} + +list& Neighbours::listOfType(NeiType typ) { + switch(typ) { + case NEI_POTENTIAL: + return potentialNei; + case NEI_UNIDIR: + return unidirNei; + case NEI_SYM: + return symNei; + default: + break; + } + throw WrongNeiType(); +} + +void Neighbours::changeNeiType(u64 id, NeiType nType) { + NeiType cType = neiType[id]; + if(cType == nType) + return; + list& fromList = listOfType(cType), toList = listOfType(nType); + neiType.insert({id, nType}); + + bool wasSpliced=false; + for(auto it=fromList.begin(); it != fromList.end(); ++it) { + if(it->id == id) { + fromList.splice(it, toList); + wasSpliced=true; + break; + } + } + if(!wasSpliced) { + //TODO log error. + } +} + +void Neighbours::updateSendPackets(const Neighbour& nei, Protocol* proto) { + if(time(NULL) - lastIHUSent[nei.id] >= csts::TIME_RESEND_IHU) { + lastIHUSent[nei.id] = time(NULL); + lastPckSent[nei.id] = time(NULL); + proto->sendIHU(nei.id); + } + else if(time(NULL) - lastPckSent[nei.id] >= csts::TIME_RESEND_EMPTY) { + lastPckSent[nei.id] = time(NULL); + proto->sendEmpty(nei.id); + } +} + diff --git a/neighbours.h b/neighbours.h new file mode 100644 index 0000000..1c29bb7 --- /dev/null +++ b/neighbours.h @@ -0,0 +1,59 @@ +/*************************************************************************** + * By Théophile Bastian, 2017 + * M1 Network course project at ENS Cachan, Juliusz Chroboczek. + * License: WTFPL v2 + **************************************************************************/ + +#pragma once + +#include +#include +#include +#include "data.h" +#include "protocol.h" + +class Neighbours { + public: + Neighbours(Protocol* proto); + + void fullCheck(); + /** Cleans the peers lists by removing the expired entries. */ + + void fullUpdate(); + /** Triggers a full update of the peers lists: sends the appropriate + * packets to the peers (IHU, ...) when approaching expiracy, and + * performs a `fullCheck()`. + */ + + void addPotentialNei(const Neighbour& nei); + /** Adds a `Neighbour` to the list of potential neighbours. */ + + void receivedFrom(u64 id); + /** Signals that a packet was received from `id`, performs the + * appropriate bookkeeping actions. + */ + + void hadIHU(u64 id); + /** Signals that a IHU was received from `id`, performs the + * appropriate bookkeeping actions. + */ + + private: //meth + class WrongNeiType : public std::exception {}; + enum NeiType { + NEI_UNDEF, NEI_POTENTIAL, NEI_UNIDIR, NEI_SYM + }; + + std::list& listOfType(NeiType typ); + void changeNeiType(u64 id, NeiType nType); + void updateSendPackets(const Neighbour& nei, Protocol* proto); + + private: + Protocol* proto; + std::list potentialNei, unidirNei, symNei; + std::unordered_map lastRecv, lastIHU; + std::unordered_map lastPckSent, lastIHUSent; + std::unordered_map neiType; + time_t lastPeerPeek; +}; + diff --git a/nw_constants.h b/nw_constants.h index 27bfe01..7d3b5e1 100644 --- a/nw_constants.h +++ b/nw_constants.h @@ -25,9 +25,14 @@ const u8 TLV_DATA_TEXT = 32; const u8 TLV_DATA_PNG = 33; const u8 TLV_DATA_JPG = 34; -const int TIMEOUT_UNIDIR = 100*1000; // ms -const int TIMEOUT_SYM_RECV = 150*1000; // ms -const int TIMEOUT_SYM_IHU = 300*1000; // ms +const int TIMEOUT_UNIDIR = 100; // s +const int TIMEOUT_SYM_RECV = 150; // s +const int TIMEOUT_SYM_IHU = 300; // s +const int TIME_RESEND_IHU = 90; // s +const int TIME_RESEND_EMPTY = 30; // s +const int TIME_PEER_PEEK = 30; // s + +const int SYM_COUNT_BEFORE_PEEK = 5; const u16 DEFAULT_PORT = 1192; diff --git a/protocol.cpp b/protocol.cpp index 0e13887..eecb540 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -15,6 +15,12 @@ Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) : sock(socket(PF_INET6, SOCK_DGRAM, 0)), listenAddr(listenAddr), selfId(selfId), terminating(false) { + int reuseVal = 1; + if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseVal, sizeof(reuseVal))) + { + perror("Could not set SO_REUSEADDR for socket"); + throw NwError(); + } if(bind(sock, (struct sockaddr*)(&listenAddr), sizeof(listenAddr)) != 0) { perror("Cannot bind socket"); throw NwError(); @@ -71,6 +77,7 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) { if(rc != (ssize_t)pck.size()) { //TODO log warning. } + fprintf(stderr, "[INFO] sending packet to port %hu\n", ntohs(to.sin6_port)); } void Protocol::sendBody(const Bytes& body, const u64& id) { return sendBody(body, addrOfId(id)); @@ -121,12 +128,27 @@ void Protocol::sendNeighbours(u64 id, const std::vector& neigh) { sendNeighbours(addrOfId(id), neigh); } +void Protocol::sendData(const SockAddr& to, const char* data, size_t len, + u32 seqno, u64 datId) +{ + Bytes pck; + if(len + 12 >= 1<<8) + throw DataTooLongError(); + pck << csts::TLV_DATA << (u8) (12 + len) << seqno << datId + << Bytes(data, len); + sendBody(pck, to); +} +void Protocol::sendData(u64 id, const char* data, size_t len, u32 seqno, + u64 datId) +{ + sendData(addrOfId(id), data, len, seqno, datId); +} + void Protocol::sendIHave(const SockAddr& to, u64 datId, u32 seqno) { Bytes pck; pck << csts::TLV_IHAVE << (u8) 12 << seqno << datId; sendBody(pck, to); } - void Protocol::sendIHave(u64 id, u64 datId, u32 seqno) { sendIHave(addrOfId(id), datId, seqno); } @@ -144,11 +166,7 @@ void Protocol::pollNetwork() { //TODO is it blocking? ssize_t readDat = recvfrom(sock, buffer, MAX_MTU, 0, fromAddr, &fromAddrLen); - if(readDat < 0) { - perror("Warning: could not read from socket"); - continue; - } - else if(readDat == 0) + if(readDat <= 0) continue; Bytes data(buffer, readDat); u8 magic, version; diff --git a/protocol.h b/protocol.h index c641965..804b3a2 100644 --- a/protocol.h +++ b/protocol.h @@ -19,6 +19,7 @@ class Protocol { public: class NwError : public std::exception {}; class ThreadError : public std::exception {}; + class DataTooLongError : public std::exception {}; class UnknownId : public std::exception { public: UnknownId(u64 id) : _id(id) {} @@ -30,7 +31,8 @@ class Protocol { Protocol(const SockAddr& dest, u64 selfId); ~Protocol(); - void addIdAddr(const sockaddr_in6& addr, u64 id); + void addIdAddr(const SockAddr& addr, u64 id); + /** Maps internally `id` to `addr` for future use. */ bool readyRead() const; /** Returns whether a packet is available. */ @@ -59,6 +61,11 @@ class Protocol { const std::vector& neigh); /** Sends a neighbours list packet */ + void sendData(const SockAddr& to, const char* data, size_t len, + u32 seqno, u64 datId); + void sendData(u64 id, const char* data, size_t len, + u32 seqno, u64 datId); + void sendIHave(const SockAddr& to, u64 datId, u32 seqno); void sendIHave(u64 id, u64 datId, u32 seqno); /** Sends a IHave packet for `datId` */