Now flooding data — a bit too much

This commit is contained in:
Théophile Bastian 2016-11-26 16:20:20 +01:00
parent c0f1c55e03
commit 98924f4ca6
11 changed files with 204 additions and 68 deletions

View file

@ -53,12 +53,12 @@ Bytes& Bytes::operator<<(u8 v) {
return *this;
}
Bytes& Bytes::operator<<(u16 v) {
insertData<u16>(htons(v));
insertData<u16>(v);
// insertData<u16>(htons(v));
return *this;
}
Bytes& Bytes::operator<<(u32 v) {
insertData<u32>(htonl(v));
insertData<u32>(v);
// insertData<u32>(htonl(v));
return *this;
}
@ -103,6 +103,12 @@ Bytes& Bytes::operator>>(char& v) {
return *this;
}
Bytes Bytes::extract(size_t len) {
Bytes out = sub(0, len); // Handles out of range
firstIndex += len;
return out;
}
void Bytes::operator=(const Bytes& oth) {
firstIndex=0;
data.clear();
@ -112,7 +118,7 @@ void Bytes::operator=(const Bytes& oth) {
}
Bytes Bytes::sub(size_t beg, size_t len) const {
if(beg+len >= size())
if(beg+len > size())
throw OutOfRange();
Bytes out;
for(size_t byte=0; byte < len; byte++)

View file

@ -55,6 +55,11 @@ class Bytes {
/// Extracts the given data type from the vector. Returns *this to
/// allow chaining.
Bytes extract(size_t len);
/** Extracts the first `len` bytes and returns them. Throws
* `OutOfRange` if `len` > `size()`.
*/
void operator=(const Bytes& oth);
/// Copies the given Bytes object into its own data.

View file

@ -1,8 +1,9 @@
CXX=g++
CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O2
CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O0 -g
CXXLIBS=-lpthread
OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o
OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o \
dataStore.o flooder.o
TARGET = jeanhubert
all: $(TARGET)

View file

@ -10,6 +10,7 @@
#include "neighbours.h"
#include "packetParser.h"
#include "configFile.h"
#include "dataStore.h"
#include <cstring>
#include <cstdio>
#include <cstdlib>
@ -43,7 +44,9 @@ int main(int argc, char** argv) {
Protocol proto(addr, cfg.getSelfId());
Neighbours neighboursManager(&proto);
DataStore dataStore(&proto);
Neighbours neighboursManager(&proto, &dataStore);
for(const Neighbour& nei : cfg.getBootstrapNodes()) {
char addr[54];
inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54);
@ -52,7 +55,7 @@ int main(int argc, char** argv) {
neighboursManager.addPotentialNei(nei);
}
PacketParser pckParser(&neighboursManager, &proto);
PacketParser pckParser(&neighboursManager, &proto, &dataStore);
while(true) {
neighboursManager.fullUpdate();
@ -60,10 +63,12 @@ int main(int argc, char** argv) {
while(proto.readyRead()) {
SockAddr fromAddr;
Bytes pck = proto.getPacket(&fromAddr);
pckParser.parse(pck);
pckParser.parse(pck, fromAddr);
}
sleep(2);
dataStore.update();
sleep(1);
}
return 0;

View file

@ -11,8 +11,8 @@
using namespace std;
Neighbours::Neighbours(Protocol* proto) :
proto(proto), lastPeerPeek(0)
Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) :
proto(proto), dataStore(dataStore), lastPeerPeek(0), lastSentNR(0)
{}
void Neighbours::fullCheck() {
@ -46,17 +46,46 @@ void Neighbours::fullUpdate() {
if(potentialNei.size() > 0
&& symNei.size() < csts::SYM_COUNT_BEFORE_PEEK
&& time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK) {
int nPeerId = rand() % potentialNei.size();
auto it = potentialNei.begin();
for(int at=0; at < nPeerId; ++at, ++it);
&& time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK)
{
auto it = randPeer(&potentialNei);
proto->sendEmpty(it->id);
lastPckSent[it->id] = time(NULL);
lastPeerPeek = time(NULL);
}
if(symNei.size() > 0 && potentialNei.size() < 5
&& time(NULL) - lastSentNR >= csts::TIME_SEND_NR)
{
auto it = randPeer(&symNei);
proto->sendNReq(it->id);
lastPckSent[it->id] = time(NULL);
lastSentNR = time(NULL);
}
for(auto flooder=dataFlooder.begin(); flooder != dataFlooder.end(); )
{
if(flooder->second.done())
flooder = dataFlooder.erase(flooder);
else {
flooder->second.update();
++flooder;
}
}
for(u64 datId : dataStore->toFlood()) {
dataFlooder.insert({datId,
Flooder(
(*dataStore)[datId], datId, dataStore->getSeqno(datId),
proto, symNei)});
dataStore->setFlooded(datId);
}
}
void Neighbours::addPotentialNei(const Neighbour& nei) {
if(neiType.find(nei.id) != neiType.end())
return; // We already know him
potentialNei.push_back(nei);
lastRecv.insert({nei.id, 0});
lastIHU.insert({nei.id, 0});
@ -64,7 +93,10 @@ void Neighbours::addPotentialNei(const Neighbour& nei) {
proto->addIdAddr(nei.addr, nei.id);
}
void Neighbours::receivedFrom(u64 id) {
void Neighbours::receivedFrom(u64 id, const SockAddr& addr) {
if(neiType.find(id) == neiType.end())
addPotentialNei(Neighbour(id, addr));
NeiType typ = neiType[id];
lastRecv[id] = time(NULL);
@ -72,7 +104,13 @@ void Neighbours::receivedFrom(u64 id) {
changeNeiType(id, NEI_UNIDIR);
}
void Neighbours::hadIHU(u64 id) {
void Neighbours::hadIHU(u64 id, const SockAddr& addr) {
if(neiType.find(id) == neiType.end()) {
fprintf(stderr, "[WARNING] %lX heard us — yet we did not send "
"anything?\n", id);
addPotentialNei(Neighbour(id, addr));
}
NeiType typ = neiType[id];
lastRecv[id] = time(NULL);
lastIHU[id] = time(NULL);
@ -80,6 +118,26 @@ void Neighbours::hadIHU(u64 id) {
changeNeiType(id, NEI_SYM);
}
void Neighbours::getNeighbours(vector<Neighbour>& out, u64 except, int count) {
vector<list<Neighbour>::iterator > permutation;
for(auto it = symNei.begin(); it != symNei.end(); ++it)
permutation.push_back(it);
for(size_t i=0; i < permutation.size(); i++) {
int swapWith = rand() % permutation.size();
auto tmp = permutation[i];
permutation[i] = permutation[swapWith];
permutation[swapWith] = tmp;
}
for(int cur=0, pos=0; cur < count && (size_t)pos < permutation.size();
pos++) {
if(permutation[pos]->id != except) {
out.push_back(*permutation[pos]);
cur++;
}
}
}
list<Neighbour>* Neighbours::listOfType(NeiType typ) {
switch(typ) {
case NEI_POTENTIAL:
@ -94,8 +152,13 @@ list<Neighbour>* Neighbours::listOfType(NeiType typ) {
throw WrongNeiType();
}
void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) {
if(dataFlooder.find(datId) != dataFlooder.end()) {
dataFlooder.at(datId).gotIHave(from, seqno);
}
}
void Neighbours::changeNeiType(u64 id, NeiType nType) {
printf("TYPE %lX %d\n", id, neiType[id]);
NeiType cType = neiType[id];
if(cType == nType)
return;
@ -106,11 +169,8 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
bool wasSpliced=false;
for(auto it=fromList->begin(); it != fromList->end(); ++it) {
if(it->id == id) {
printf("%ld %ld\n", fromList->size(), toList->size());
toList->push_back(*it); // splice() doesn't work?!
fromList->erase(it);
printf("%ld %ld %d\n", fromList->size(), toList->size(),
toList == &unidirNei);
wasSpliced=true;
break;
}
@ -118,7 +178,6 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
if(!wasSpliced) {
fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id);
}
printf("TYPE %lX %d\n", id, neiType[id]);
}
void Neighbours::updateSendPackets(const Neighbour& nei) {
@ -128,7 +187,6 @@ void Neighbours::updateSendPackets(const Neighbour& nei) {
bool Neighbours::sendEmpty(u64 id) {
if(time(NULL) - lastPckSent[id] >= csts::TIME_RESEND_EMPTY) {
printf("[DBG] sending empty packet to %lX.\n", id);
lastPckSent[id] = time(NULL);
proto->sendEmpty(id);
return true;
@ -138,7 +196,6 @@ bool Neighbours::sendEmpty(u64 id) {
bool Neighbours::sendIHU(u64 id) {
if(time(NULL) - lastIHUSent[id] >= csts::TIME_RESEND_IHU) {
printf("[DBG] sending IHU packet to %lX.\n", id);
lastIHUSent[id] = time(NULL);
lastPckSent[id] = time(NULL);
proto->sendIHU(id);
@ -147,3 +204,10 @@ bool Neighbours::sendIHU(u64 id) {
return false;
}
list<Neighbour>::iterator Neighbours::randPeer(list<Neighbour>* list) {
int nPeerId = rand() % list->size();
auto it = list->begin();
for(int at=0; at < nPeerId; ++at, ++it);
return it;
}

View file

@ -7,14 +7,17 @@
#pragma once
#include <list>
#include <set>
#include <ctime>
#include <unordered_map>
#include "data.h"
#include "protocol.h"
#include "flooder.h"
#include "dataStore.h"
class Neighbours {
public:
Neighbours(Protocol* proto);
Neighbours(Protocol* proto, DataStore* dataStore);
void fullCheck();
/** Cleans the peers lists by removing the expired entries. */
@ -28,16 +31,26 @@ class Neighbours {
void addPotentialNei(const Neighbour& nei);
/** Adds a `Neighbour` to the list of potential neighbours. */
void receivedFrom(u64 id);
void receivedFrom(u64 id, const SockAddr& addr);
/** Signals that a packet was received from `id`, performs the
* appropriate bookkeeping actions.
*/
void hadIHU(u64 id);
void hadIHU(u64 id, const SockAddr& addr);
/** Signals that a IHU was received from `id`, performs the
* appropriate bookkeeping actions.
*/
void getNeighbours(std::vector<Neighbour>& out, u64 except, int count);
/** Fills `out` with at most `count` symetric neighbours, not
* including `except`.
*/
void gotIHave(u64 from, u64 datId, u32 seqno);
/** Notifies the flooders that a `IHave` packet has been received for
* this data from the peer `from`.
*/
private: //meth
class WrongNeiType : public std::exception {};
enum NeiType {
@ -50,13 +63,16 @@ class Neighbours {
bool sendEmpty(u64 id);
bool sendIHU(u64 id);
std::list<Neighbour>::iterator randPeer(std::list<Neighbour>* list);
private:
Protocol* proto;
DataStore* dataStore;
std::list<Neighbour> potentialNei, unidirNei, symNei;
std::unordered_map<u64, time_t> lastRecv, lastIHU;
std::unordered_map<u64, time_t> lastPckSent, lastIHUSent;
std::unordered_map<u64, NeiType> neiType;
time_t lastPeerPeek;
std::unordered_map<u64, Flooder> dataFlooder;
time_t lastPeerPeek, lastSentNR;
};

View file

@ -28,11 +28,18 @@ const u8 TLV_DATA_JPG = 34;
const int TIMEOUT_UNIDIR = 100; // s
const int TIMEOUT_SYM_RECV = 150; // s
const int TIMEOUT_SYM_IHU = 300; // s
const int TIMEOUT_DATA = 35*60; // s
const int TIME_RESEND_IHU = 90; // s
const int TIME_RESEND_EMPTY = 30; // s
const int TIME_REPUBLISH_DATA = 25*60; // s
const int TIME_PEER_PEEK = 30; // s
const int TIME_SEND_NR = 60; // s
const int TIME_RESEND_FLOOD = 3; // s
const int SYM_COUNT_BEFORE_PEEK = 5;
const int NB_NEIGH_PER_NR = 5;
const int FLOOD_RETRIES = 3;
const u16 DEFAULT_PORT = 1192;

View file

@ -7,24 +7,35 @@
#include "packetParser.h"
#include <cstring>
PacketParser::PacketParser(Neighbours* nei, Protocol* proto) :
neighbours(nei), protocol(proto)
PacketParser::PacketParser(Neighbours* nei, Protocol* proto,
DataStore* dataStore) :
neighbours(nei), protocol(proto), dataStore(dataStore)
{}
void PacketParser::parse(Bytes pck) {
void PacketParser::parse(Bytes pck, const SockAddr& addr) {
u64 peerId;
pck >> peerId;
neighbours->receivedFrom(peerId);
neighbours->receivedFrom(peerId, addr);
while(pck.size() > 0) {
readTLV(pck, peerId);
readTLV(pck, peerId, addr);
}
}
void PacketParser::readTLV(Bytes& pck, u64 nei) {
void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) {
u8 type, len;
pck >> type >> len;
printf("[INFO] Analyzing %d\n", type);
if(pck.size() < len) {
fprintf(stderr, "[WARNING] Advertised TLV does not fit in the packet"
".\n");
pck.extract(pck.size());
return;
}
Bytes subpacket = pck.extract(len);
switch(type) {
case csts::TLV_PAD1:
case csts::TLV_PADN:
@ -32,34 +43,34 @@ void PacketParser::readTLV(Bytes& pck, u64 nei) {
case csts::TLV_IHU: {
u64 ihuId;
pck >> ihuId;
subpacket >> ihuId;
if(ihuId != protocol->getSelfId())
break;
neighbours->hadIHU(nei);
neighbours->hadIHU(nei, addr);
break;
}
case csts::TLV_NR:
//TODO
handleNR(nei);
break;
case csts::TLV_NEIGH:
receiveNeigh(pck, len);
receiveNeigh(subpacket);
break;
case csts::TLV_DATA:
receiveData(pck, len, nei);
receiveData(subpacket, nei);
break;
case csts::TLV_IHAVE:
//TODO
receiveIHave(subpacket, nei);
break;
}
}
void PacketParser::receiveNeigh(Bytes& pck, u8 length) {
while(length >= 8+16+2) { /* enough to read one peer */
void PacketParser::receiveNeigh(Bytes& pck) {
while(pck.size() >= 8+16+2) { /* enough to read one peer */
u64 id;
u16 port;
SockAddr addr;
@ -72,17 +83,40 @@ void PacketParser::receiveNeigh(Bytes& pck, u8 length) {
pck >> port;
addr.sin6_port = htons(port);
if(id == protocol->getSelfId())
continue;
fprintf(stderr, "[INFO] Adding neighbour %lX\n", id);
Neighbour nei(id, addr);
neighbours->addPotentialNei(nei);
}
}
void PacketParser::receiveData(Bytes& pck, u8 length, u64 from) {
void PacketParser::receiveData(Bytes& pck, u64 from) {
u32 seqNo;
u64 datId;
pck >> seqNo >> datId;
neighbours->gotIHave(from, datId, seqNo);
protocol->sendIHave(from, datId, seqNo);
//TODO
length++;
dataStore->addData(pck, seqNo, datId, false);
}
void PacketParser::receiveIHave(Bytes& pck, u64 from) {
u32 seqno;
u64 datId;
if(pck.size() < 8+4) {
fprintf(stderr, "[WARNING] IHave too short from %lX\n", from);
return;
}
pck >> seqno >> datId;
neighbours->gotIHave(from, datId, seqno);
}
void PacketParser::handleNR(u64 from) {
std::vector<Neighbour> neigh;
neighbours->getNeighbours(neigh, from, csts::NB_NEIGH_PER_NR);
protocol->sendNeighbours(from, neigh);
}

View file

@ -8,20 +8,24 @@
#include "neighbours.h"
#include "protocol.h"
#include "dataStore.h"
class PacketParser {
public:
PacketParser(Neighbours* nei, Protocol* proto);
PacketParser(Neighbours* nei, Protocol* proto, DataStore* dataStore);
void parse(Bytes pck);
void parse(Bytes pck, const SockAddr& addr);
private: //meth
void readTLV(Bytes& pck, u64 nei);
void receiveNeigh(Bytes& pck, u8 length);
void receiveData(Bytes& pck, u8 length, u64 from);
void readTLV(Bytes& pck, u64 nei, const SockAddr& addr);
void receiveNeigh(Bytes& pck);
void receiveData(Bytes& pck, u64 from);
void receiveIHave(Bytes& pck, u64 from);
void handleNR(u64 from);
private:
Neighbours* neighbours;
Protocol* protocol;
DataStore* dataStore;
};

View file

@ -79,7 +79,8 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
if(rc != (ssize_t)pck.size()) {
//TODO log warning.
}
fprintf(stderr, "[INFO] sending packet to port %hu\n", ntohs(to.sin6_port));
fprintf(stderr, "[INFO] sending packet [%d] to port %hu\n",
body.size() > 0 ? body[0] : -1, ntohs(to.sin6_port));
}
void Protocol::sendBody(const Bytes& body, const u64& id) {
return sendBody(body, addrOfId(id));
@ -130,20 +131,20 @@ void Protocol::sendNeighbours(u64 id, const std::vector<Neighbour>& neigh) {
sendNeighbours(addrOfId(id), neigh);
}
void Protocol::sendData(const SockAddr& to, const char* data, size_t len,
void Protocol::sendData(const SockAddr& to, const Bytes& data,
u32 seqno, u64 datId)
{
Bytes pck;
if(len + 12 >= 1<<8)
if(data.size() + 12 >= 1<<8)
throw DataTooLongError();
pck << csts::TLV_DATA << (u8) (12 + len) << seqno << datId
<< Bytes(data, len);
pck << csts::TLV_DATA << (u8) (12 + data.size()) << seqno << datId
<< data;
sendBody(pck, to);
}
void Protocol::sendData(u64 id, const char* data, size_t len, u32 seqno,
void Protocol::sendData(u64 id, const Bytes& data, u32 seqno,
u64 datId)
{
sendData(addrOfId(id), data, len, seqno, datId);
sendData(addrOfId(id), data, seqno, datId);
}
void Protocol::sendIHave(const SockAddr& to, u64 datId, u32 seqno) {
@ -205,17 +206,11 @@ void Protocol::pollNetwork() {
else {
fprintf(stderr, "[ERROR] Unknown address family %d.\n",
fromAddr.sa_family);
fprintf(stderr, "\t%hu\n", ((SockAddr*)&fromAddr)->sin6_port);
while(data.size() > 0) {
u8 c;
data >> c;
fprintf(stderr, "%02X|", c);
}
fprintf(stderr, "\n");
continue;
}
puts("Received packet");
fprintf(stderr, "[INFO] Received packet [%d]\n",
data.size() > 8 ? data[8] : -1);
AvailPacket pck;
pck.from = convFromAddr;
pck.data = data;

View file

@ -61,10 +61,9 @@ class Protocol {
const std::vector<Neighbour>& neigh);
/** Sends a neighbours list packet */
void sendData(const SockAddr& to, const char* data, size_t len,
u32 seqno, u64 datId);
void sendData(u64 id, const char* data, size_t len,
void sendData(const SockAddr& to, const Bytes& data,
u32 seqno, u64 datId);
void sendData(u64 id, const Bytes& data, u32 seqno, u64 datId);
void sendIHave(const SockAddr& to, u64 datId, u32 seqno);
void sendIHave(u64 id, u64 datId, u32 seqno);