Compare commits

...

2 commits

15 changed files with 515 additions and 68 deletions

View file

@ -53,12 +53,12 @@ Bytes& Bytes::operator<<(u8 v) {
return *this; return *this;
} }
Bytes& Bytes::operator<<(u16 v) { Bytes& Bytes::operator<<(u16 v) {
insertData<u16>(htons(v)); insertData<u16>(v);
// insertData<u16>(htons(v)); // insertData<u16>(htons(v));
return *this; return *this;
} }
Bytes& Bytes::operator<<(u32 v) { Bytes& Bytes::operator<<(u32 v) {
insertData<u32>(htonl(v)); insertData<u32>(v);
// insertData<u32>(htonl(v)); // insertData<u32>(htonl(v));
return *this; return *this;
} }
@ -103,6 +103,12 @@ Bytes& Bytes::operator>>(char& v) {
return *this; return *this;
} }
Bytes Bytes::extract(size_t len) {
Bytes out = sub(0, len); // Handles out of range
firstIndex += len;
return out;
}
void Bytes::operator=(const Bytes& oth) { void Bytes::operator=(const Bytes& oth) {
firstIndex=0; firstIndex=0;
data.clear(); data.clear();
@ -112,7 +118,7 @@ void Bytes::operator=(const Bytes& oth) {
} }
Bytes Bytes::sub(size_t beg, size_t len) const { Bytes Bytes::sub(size_t beg, size_t len) const {
if(beg+len >= size()) if(beg+len > size())
throw OutOfRange(); throw OutOfRange();
Bytes out; Bytes out;
for(size_t byte=0; byte < len; byte++) for(size_t byte=0; byte < len; byte++)

View file

@ -55,6 +55,11 @@ class Bytes {
/// Extracts the given data type from the vector. Returns *this to /// Extracts the given data type from the vector. Returns *this to
/// allow chaining. /// allow chaining.
Bytes extract(size_t len);
/** Extracts the first `len` bytes and returns them. Throws
* `OutOfRange` if `len` > `size()`.
*/
void operator=(const Bytes& oth); void operator=(const Bytes& oth);
/// Copies the given Bytes object into its own data. /// Copies the given Bytes object into its own data.

View file

@ -1,8 +1,9 @@
CXX=g++ CXX=g++
CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O2 CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O0 -g
CXXLIBS=-lpthread CXXLIBS=-lpthread
OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o \
dataStore.o flooder.o
TARGET = jeanhubert TARGET = jeanhubert
all: $(TARGET) all: $(TARGET)

110
dataStore.cpp Normal file
View file

@ -0,0 +1,110 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#include "dataStore.h"
#include <cstring>
#include <algorithm>
using namespace std;
DataStore::DataStore(Protocol* proto) : proto(proto)
{}
DataStore::~DataStore() {
}
void DataStore::update() {
while(!timeEvents.empty()) {
const TimeEvent& evt = timeEvents.top();
if(evt.time > time(NULL)) // We're done for now.
break;
timeEvents.pop();
switch(evt.type) {
case EV_REPUBLISH:
handleRepublish(evt.id);
break;
case EV_EXPIRES:
handleExpire(evt.id, evt.seqno);
break;
}
}
}
void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) {
if(curSeqno.find(id) != curSeqno.end() && curSeqno[id] >= seqno)
return;
data[id] = Data(pck, mine);
curSeqno[id] = seqno;
if(mine) {
timeEvents.push(TimeEvent(
time(NULL) + csts::TIME_REPUBLISH_DATA,
seqno, id, EV_REPUBLISH));
}
else {
timeEvents.push(TimeEvent(
time(NULL) + csts::TIMEOUT_DATA,
seqno, id, EV_EXPIRES));
// If it is not renewed, it will expire.
}
recvTime[id] = time(NULL);
fprintf(stderr, "[INFO] Storing data %lX (%u)\n", id, seqno);
handleFlood(id);
}
ssize_t DataStore::dataSize(u64 id) {
if(data.find(id) == data.end())
return -1;
return data[id].data.size();
}
ssize_t DataStore::readData(u64 id, char* buffer, size_t size) {
if(data.find(id) == data.end())
return -1;
ssize_t len = min((ssize_t)size, dataSize(id));
data[id].data.writeToBuffer(buffer, size);
return len;
}
void DataStore::ids(std::vector<u64>& out) {
for(auto& dat : data)
out.push_back(dat.first);
}
void DataStore::setFlooded(u64 id) {
toFlood_.erase(id);
}
void DataStore::handleExpire(u64 id, u32 seqno) {
if(seqno < curSeqno[id])
return; // Was updated in time
cleanData(id);
}
void DataStore::handleRepublish(u64 id) {
if(data.find(id) == data.end() || !data[id].isMine)
return;
curSeqno[id] = time(NULL);
handleFlood(id);
}
void DataStore::handleFlood(u64 id) {
toFlood_.insert(id);
}
void DataStore::cleanData(u64 id) {
curSeqno.erase(id);
recvTime.erase(id);
data.erase(id); // Deletes the Bytes as well
}

98
dataStore.h Normal file
View file

@ -0,0 +1,98 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#pragma once
#include <queue>
#include <unordered_map>
#include <set>
#include "data.h"
#include "nw_constants.h"
#include "protocol.h"
#include "Bytes.h"
class DataStore {
public:
DataStore(Protocol* proto);
~DataStore();
void update();
/** Performs bookkeeping actions. Sends packets. This function should
* be called often enough.
*/
void addData(Bytes pck, u32 seqno, u64 id, bool mine=false);
/** Adds a data in the data storage. If `mine` is `true`, the current
* node considers itself responsible for the data and will republish
* it when needed.
*/
ssize_t dataSize(u64 id);
/** Gets the size of the data with id `id`. Returns -1 if there is no
* such data stored.
*/
ssize_t readData(u64 id, char* buffer, size_t size);
/** Fills `buffer` with the data of the given `id`.
* If the data does not exist, returs -1; else, returns the length of
* data actually retrieved.
*/
const Bytes& operator[](u64 id) {
return data[id].data;
}
u32 getSeqno(u64 id) const { return curSeqno.at(id); }
void ids(std::vector<u64>& out);
/** Fills `out` with the IDs of the stored data. */
const std::set<u64> toFlood() const { return toFlood_; }
/** Returns a list of data IDs that should be flooded. */
void setFlooded(u64 id);
/** Marks a data as flooded */
private: //meth
void handleExpire(u64 id, u32 seqno);
void handleRepublish(u64 id);
void handleFlood(u64 id);
void cleanData(u64 id);
private:
enum EvType {
EV_REPUBLISH, EV_EXPIRES
};
struct TimeEvent {
TimeEvent(time_t time, u32 seqno, u64 id, EvType type) :
time(time), seqno(seqno), id(id), type(type) {}
time_t time;
u32 seqno;
u64 id;
EvType type;
bool operator<(const TimeEvent& e) const {
return time > e.time; // Max-priority queue
}
};
struct Data {
Data() : data(), isMine(false) {}
Data(Bytes b) : data(b), isMine(false) {}
Data(Bytes b, bool mine) : data(b), isMine(mine) {}
Bytes data;
bool isMine;
};
std::priority_queue<TimeEvent> timeEvents;
Protocol* proto;
std::unordered_map<u64, Data> data;
std::unordered_map<u64, u32> curSeqno;
std::unordered_map<u64, time_t> recvTime;
std::set<u64> toFlood_;
};

54
flooder.cpp Normal file
View file

@ -0,0 +1,54 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#include "flooder.h"
Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers) :
data(data), datId(datId), seqno(seqno), proto(proto)
{
fprintf(stderr, "[DBG] Created flooder for %lX (%u)\n", datId, seqno);
for(auto it=peers.begin(); it != peers.end(); ++it) {
triesCount[it->id] = 0;
toFlood.push(FloodEvt(time(NULL), it->id));
}
update();
}
void Flooder::update() {
while(!toFlood.empty()) {
const FloodEvt& evt = toFlood.top();
if(evt.time > time(NULL))
break;
toFlood.pop();
if(triesCount[evt.id] > csts::FLOOD_RETRIES) {
fprintf(stderr, "[WARNING] Could not flood to %lX: no IHave.\n",
evt.id);
continue;
}
else if(triesCount[evt.id] < 0) {
fprintf(stderr, "[DBG] Got your IHave, %lX!\n", evt.id);
continue; // IHave received
}
sendTo(evt.id);
triesCount[evt.id]++;
toFlood.push(FloodEvt(time(NULL) + csts::TIME_RESEND_FLOOD, evt.id));
}
}
void Flooder::gotIHave(u64 id, u32 withSeqno) {
if(seqno > withSeqno)
return;
triesCount[id] = -1;
}
void Flooder::sendTo(u64 id) {
proto->sendData(id, data, seqno, datId);
}

49
flooder.h Normal file
View file

@ -0,0 +1,49 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#pragma once
#include <ctime>
#include <queue>
#include <list>
#include <unordered_map>
#include "data.h"
#include "nw_constants.h"
#include "protocol.h"
class Flooder {
public:
Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers);
void update();
/** Call often enough (ie ~1s) */
void gotIHave(u64 id, u32 withSeqno);
/** Acknoledges the reception of the resource by peer `id`. */
bool done() const { return toFlood.empty(); }
private: //meth
void sendTo(u64 id);
private:
struct FloodEvt {
FloodEvt(time_t time, u64 id) : time(time), id(id) {}
time_t time;
u64 id;
bool operator<(const FloodEvt& e) const {
return time > e.time; // Max priority queue.
}
};
std::unordered_map<u64, int> triesCount;
std::priority_queue<FloodEvt> toFlood;
Bytes data;
u64 datId;
u32 seqno;
Protocol* proto;
};

View file

@ -10,6 +10,7 @@
#include "neighbours.h" #include "neighbours.h"
#include "packetParser.h" #include "packetParser.h"
#include "configFile.h" #include "configFile.h"
#include "dataStore.h"
#include <cstring> #include <cstring>
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
@ -43,7 +44,9 @@ int main(int argc, char** argv) {
Protocol proto(addr, cfg.getSelfId()); Protocol proto(addr, cfg.getSelfId());
Neighbours neighboursManager(&proto); DataStore dataStore(&proto);
Neighbours neighboursManager(&proto, &dataStore);
for(const Neighbour& nei : cfg.getBootstrapNodes()) { for(const Neighbour& nei : cfg.getBootstrapNodes()) {
char addr[54]; char addr[54];
inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54); inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54);
@ -52,7 +55,7 @@ int main(int argc, char** argv) {
neighboursManager.addPotentialNei(nei); neighboursManager.addPotentialNei(nei);
} }
PacketParser pckParser(&neighboursManager, &proto); PacketParser pckParser(&neighboursManager, &proto, &dataStore);
while(true) { while(true) {
neighboursManager.fullUpdate(); neighboursManager.fullUpdate();
@ -60,10 +63,12 @@ int main(int argc, char** argv) {
while(proto.readyRead()) { while(proto.readyRead()) {
SockAddr fromAddr; SockAddr fromAddr;
Bytes pck = proto.getPacket(&fromAddr); Bytes pck = proto.getPacket(&fromAddr);
pckParser.parse(pck); pckParser.parse(pck, fromAddr);
} }
sleep(2); dataStore.update();
sleep(1);
} }
return 0; return 0;

View file

@ -11,8 +11,8 @@
using namespace std; using namespace std;
Neighbours::Neighbours(Protocol* proto) : Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) :
proto(proto), lastPeerPeek(0) proto(proto), dataStore(dataStore), lastPeerPeek(0), lastSentNR(0)
{} {}
void Neighbours::fullCheck() { void Neighbours::fullCheck() {
@ -46,17 +46,46 @@ void Neighbours::fullUpdate() {
if(potentialNei.size() > 0 if(potentialNei.size() > 0
&& symNei.size() < csts::SYM_COUNT_BEFORE_PEEK && symNei.size() < csts::SYM_COUNT_BEFORE_PEEK
&& time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK) { && time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK)
int nPeerId = rand() % potentialNei.size(); {
auto it = potentialNei.begin(); auto it = randPeer(&potentialNei);
for(int at=0; at < nPeerId; ++at, ++it);
proto->sendEmpty(it->id); proto->sendEmpty(it->id);
lastPckSent[it->id] = time(NULL); lastPckSent[it->id] = time(NULL);
lastPeerPeek = time(NULL); lastPeerPeek = time(NULL);
} }
if(symNei.size() > 0 && potentialNei.size() < 5
&& time(NULL) - lastSentNR >= csts::TIME_SEND_NR)
{
auto it = randPeer(&symNei);
proto->sendNReq(it->id);
lastPckSent[it->id] = time(NULL);
lastSentNR = time(NULL);
}
for(auto flooder=dataFlooder.begin(); flooder != dataFlooder.end(); )
{
if(flooder->second.done())
flooder = dataFlooder.erase(flooder);
else {
flooder->second.update();
++flooder;
}
}
for(u64 datId : dataStore->toFlood()) {
dataFlooder.insert({datId,
Flooder(
(*dataStore)[datId], datId, dataStore->getSeqno(datId),
proto, symNei)});
dataStore->setFlooded(datId);
}
} }
void Neighbours::addPotentialNei(const Neighbour& nei) { void Neighbours::addPotentialNei(const Neighbour& nei) {
if(neiType.find(nei.id) != neiType.end())
return; // We already know him
potentialNei.push_back(nei); potentialNei.push_back(nei);
lastRecv.insert({nei.id, 0}); lastRecv.insert({nei.id, 0});
lastIHU.insert({nei.id, 0}); lastIHU.insert({nei.id, 0});
@ -64,7 +93,10 @@ void Neighbours::addPotentialNei(const Neighbour& nei) {
proto->addIdAddr(nei.addr, nei.id); proto->addIdAddr(nei.addr, nei.id);
} }
void Neighbours::receivedFrom(u64 id) { void Neighbours::receivedFrom(u64 id, const SockAddr& addr) {
if(neiType.find(id) == neiType.end())
addPotentialNei(Neighbour(id, addr));
NeiType typ = neiType[id]; NeiType typ = neiType[id];
lastRecv[id] = time(NULL); lastRecv[id] = time(NULL);
@ -72,7 +104,13 @@ void Neighbours::receivedFrom(u64 id) {
changeNeiType(id, NEI_UNIDIR); changeNeiType(id, NEI_UNIDIR);
} }
void Neighbours::hadIHU(u64 id) { void Neighbours::hadIHU(u64 id, const SockAddr& addr) {
if(neiType.find(id) == neiType.end()) {
fprintf(stderr, "[WARNING] %lX heard us — yet we did not send "
"anything?\n", id);
addPotentialNei(Neighbour(id, addr));
}
NeiType typ = neiType[id]; NeiType typ = neiType[id];
lastRecv[id] = time(NULL); lastRecv[id] = time(NULL);
lastIHU[id] = time(NULL); lastIHU[id] = time(NULL);
@ -80,6 +118,26 @@ void Neighbours::hadIHU(u64 id) {
changeNeiType(id, NEI_SYM); changeNeiType(id, NEI_SYM);
} }
void Neighbours::getNeighbours(vector<Neighbour>& out, u64 except, int count) {
vector<list<Neighbour>::iterator > permutation;
for(auto it = symNei.begin(); it != symNei.end(); ++it)
permutation.push_back(it);
for(size_t i=0; i < permutation.size(); i++) {
int swapWith = rand() % permutation.size();
auto tmp = permutation[i];
permutation[i] = permutation[swapWith];
permutation[swapWith] = tmp;
}
for(int cur=0, pos=0; cur < count && (size_t)pos < permutation.size();
pos++) {
if(permutation[pos]->id != except) {
out.push_back(*permutation[pos]);
cur++;
}
}
}
list<Neighbour>* Neighbours::listOfType(NeiType typ) { list<Neighbour>* Neighbours::listOfType(NeiType typ) {
switch(typ) { switch(typ) {
case NEI_POTENTIAL: case NEI_POTENTIAL:
@ -94,8 +152,13 @@ list<Neighbour>* Neighbours::listOfType(NeiType typ) {
throw WrongNeiType(); throw WrongNeiType();
} }
void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) {
if(dataFlooder.find(datId) != dataFlooder.end()) {
dataFlooder.at(datId).gotIHave(from, seqno);
}
}
void Neighbours::changeNeiType(u64 id, NeiType nType) { void Neighbours::changeNeiType(u64 id, NeiType nType) {
printf("TYPE %lX %d\n", id, neiType[id]);
NeiType cType = neiType[id]; NeiType cType = neiType[id];
if(cType == nType) if(cType == nType)
return; return;
@ -106,11 +169,8 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
bool wasSpliced=false; bool wasSpliced=false;
for(auto it=fromList->begin(); it != fromList->end(); ++it) { for(auto it=fromList->begin(); it != fromList->end(); ++it) {
if(it->id == id) { if(it->id == id) {
printf("%ld %ld\n", fromList->size(), toList->size());
toList->push_back(*it); // splice() doesn't work?! toList->push_back(*it); // splice() doesn't work?!
fromList->erase(it); fromList->erase(it);
printf("%ld %ld %d\n", fromList->size(), toList->size(),
toList == &unidirNei);
wasSpliced=true; wasSpliced=true;
break; break;
} }
@ -118,7 +178,6 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
if(!wasSpliced) { if(!wasSpliced) {
fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id); fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id);
} }
printf("TYPE %lX %d\n", id, neiType[id]);
} }
void Neighbours::updateSendPackets(const Neighbour& nei) { void Neighbours::updateSendPackets(const Neighbour& nei) {
@ -128,7 +187,6 @@ void Neighbours::updateSendPackets(const Neighbour& nei) {
bool Neighbours::sendEmpty(u64 id) { bool Neighbours::sendEmpty(u64 id) {
if(time(NULL) - lastPckSent[id] >= csts::TIME_RESEND_EMPTY) { if(time(NULL) - lastPckSent[id] >= csts::TIME_RESEND_EMPTY) {
printf("[DBG] sending empty packet to %lX.\n", id);
lastPckSent[id] = time(NULL); lastPckSent[id] = time(NULL);
proto->sendEmpty(id); proto->sendEmpty(id);
return true; return true;
@ -138,7 +196,6 @@ bool Neighbours::sendEmpty(u64 id) {
bool Neighbours::sendIHU(u64 id) { bool Neighbours::sendIHU(u64 id) {
if(time(NULL) - lastIHUSent[id] >= csts::TIME_RESEND_IHU) { if(time(NULL) - lastIHUSent[id] >= csts::TIME_RESEND_IHU) {
printf("[DBG] sending IHU packet to %lX.\n", id);
lastIHUSent[id] = time(NULL); lastIHUSent[id] = time(NULL);
lastPckSent[id] = time(NULL); lastPckSent[id] = time(NULL);
proto->sendIHU(id); proto->sendIHU(id);
@ -147,3 +204,10 @@ bool Neighbours::sendIHU(u64 id) {
return false; return false;
} }
list<Neighbour>::iterator Neighbours::randPeer(list<Neighbour>* list) {
int nPeerId = rand() % list->size();
auto it = list->begin();
for(int at=0; at < nPeerId; ++at, ++it);
return it;
}

View file

@ -7,14 +7,17 @@
#pragma once #pragma once
#include <list> #include <list>
#include <set>
#include <ctime> #include <ctime>
#include <unordered_map> #include <unordered_map>
#include "data.h" #include "data.h"
#include "protocol.h" #include "protocol.h"
#include "flooder.h"
#include "dataStore.h"
class Neighbours { class Neighbours {
public: public:
Neighbours(Protocol* proto); Neighbours(Protocol* proto, DataStore* dataStore);
void fullCheck(); void fullCheck();
/** Cleans the peers lists by removing the expired entries. */ /** Cleans the peers lists by removing the expired entries. */
@ -28,16 +31,26 @@ class Neighbours {
void addPotentialNei(const Neighbour& nei); void addPotentialNei(const Neighbour& nei);
/** Adds a `Neighbour` to the list of potential neighbours. */ /** Adds a `Neighbour` to the list of potential neighbours. */
void receivedFrom(u64 id); void receivedFrom(u64 id, const SockAddr& addr);
/** Signals that a packet was received from `id`, performs the /** Signals that a packet was received from `id`, performs the
* appropriate bookkeeping actions. * appropriate bookkeeping actions.
*/ */
void hadIHU(u64 id); void hadIHU(u64 id, const SockAddr& addr);
/** Signals that a IHU was received from `id`, performs the /** Signals that a IHU was received from `id`, performs the
* appropriate bookkeeping actions. * appropriate bookkeeping actions.
*/ */
void getNeighbours(std::vector<Neighbour>& out, u64 except, int count);
/** Fills `out` with at most `count` symetric neighbours, not
* including `except`.
*/
void gotIHave(u64 from, u64 datId, u32 seqno);
/** Notifies the flooders that a `IHave` packet has been received for
* this data from the peer `from`.
*/
private: //meth private: //meth
class WrongNeiType : public std::exception {}; class WrongNeiType : public std::exception {};
enum NeiType { enum NeiType {
@ -50,13 +63,16 @@ class Neighbours {
bool sendEmpty(u64 id); bool sendEmpty(u64 id);
bool sendIHU(u64 id); bool sendIHU(u64 id);
std::list<Neighbour>::iterator randPeer(std::list<Neighbour>* list);
private: private:
Protocol* proto; Protocol* proto;
DataStore* dataStore;
std::list<Neighbour> potentialNei, unidirNei, symNei; std::list<Neighbour> potentialNei, unidirNei, symNei;
std::unordered_map<u64, time_t> lastRecv, lastIHU; std::unordered_map<u64, time_t> lastRecv, lastIHU;
std::unordered_map<u64, time_t> lastPckSent, lastIHUSent; std::unordered_map<u64, time_t> lastPckSent, lastIHUSent;
std::unordered_map<u64, NeiType> neiType; std::unordered_map<u64, NeiType> neiType;
time_t lastPeerPeek; std::unordered_map<u64, Flooder> dataFlooder;
time_t lastPeerPeek, lastSentNR;
}; };

View file

@ -28,11 +28,18 @@ const u8 TLV_DATA_JPG = 34;
const int TIMEOUT_UNIDIR = 100; // s const int TIMEOUT_UNIDIR = 100; // s
const int TIMEOUT_SYM_RECV = 150; // s const int TIMEOUT_SYM_RECV = 150; // s
const int TIMEOUT_SYM_IHU = 300; // s const int TIMEOUT_SYM_IHU = 300; // s
const int TIMEOUT_DATA = 35*60; // s
const int TIME_RESEND_IHU = 90; // s const int TIME_RESEND_IHU = 90; // s
const int TIME_RESEND_EMPTY = 30; // s const int TIME_RESEND_EMPTY = 30; // s
const int TIME_REPUBLISH_DATA = 25*60; // s
const int TIME_PEER_PEEK = 30; // s const int TIME_PEER_PEEK = 30; // s
const int TIME_SEND_NR = 60; // s
const int TIME_RESEND_FLOOD = 3; // s
const int SYM_COUNT_BEFORE_PEEK = 5; const int SYM_COUNT_BEFORE_PEEK = 5;
const int NB_NEIGH_PER_NR = 5;
const int FLOOD_RETRIES = 3;
const u16 DEFAULT_PORT = 1192; const u16 DEFAULT_PORT = 1192;

View file

@ -7,24 +7,35 @@
#include "packetParser.h" #include "packetParser.h"
#include <cstring> #include <cstring>
PacketParser::PacketParser(Neighbours* nei, Protocol* proto) : PacketParser::PacketParser(Neighbours* nei, Protocol* proto,
neighbours(nei), protocol(proto) DataStore* dataStore) :
neighbours(nei), protocol(proto), dataStore(dataStore)
{} {}
void PacketParser::parse(Bytes pck) { void PacketParser::parse(Bytes pck, const SockAddr& addr) {
u64 peerId; u64 peerId;
pck >> peerId; pck >> peerId;
neighbours->receivedFrom(peerId); neighbours->receivedFrom(peerId, addr);
while(pck.size() > 0) { while(pck.size() > 0) {
readTLV(pck, peerId); readTLV(pck, peerId, addr);
} }
} }
void PacketParser::readTLV(Bytes& pck, u64 nei) { void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) {
u8 type, len; u8 type, len;
pck >> type >> len; pck >> type >> len;
printf("[INFO] Analyzing %d\n", type);
if(pck.size() < len) {
fprintf(stderr, "[WARNING] Advertised TLV does not fit in the packet"
".\n");
pck.extract(pck.size());
return;
}
Bytes subpacket = pck.extract(len);
switch(type) { switch(type) {
case csts::TLV_PAD1: case csts::TLV_PAD1:
case csts::TLV_PADN: case csts::TLV_PADN:
@ -32,34 +43,34 @@ void PacketParser::readTLV(Bytes& pck, u64 nei) {
case csts::TLV_IHU: { case csts::TLV_IHU: {
u64 ihuId; u64 ihuId;
pck >> ihuId; subpacket >> ihuId;
if(ihuId != protocol->getSelfId()) if(ihuId != protocol->getSelfId())
break; break;
neighbours->hadIHU(nei); neighbours->hadIHU(nei, addr);
break; break;
} }
case csts::TLV_NR: case csts::TLV_NR:
//TODO handleNR(nei);
break; break;
case csts::TLV_NEIGH: case csts::TLV_NEIGH:
receiveNeigh(pck, len); receiveNeigh(subpacket);
break; break;
case csts::TLV_DATA: case csts::TLV_DATA:
receiveData(pck, len, nei); receiveData(subpacket, nei);
break; break;
case csts::TLV_IHAVE: case csts::TLV_IHAVE:
//TODO receiveIHave(subpacket, nei);
break; break;
} }
} }
void PacketParser::receiveNeigh(Bytes& pck, u8 length) { void PacketParser::receiveNeigh(Bytes& pck) {
while(length >= 8+16+2) { /* enough to read one peer */ while(pck.size() >= 8+16+2) { /* enough to read one peer */
u64 id; u64 id;
u16 port; u16 port;
SockAddr addr; SockAddr addr;
@ -72,17 +83,40 @@ void PacketParser::receiveNeigh(Bytes& pck, u8 length) {
pck >> port; pck >> port;
addr.sin6_port = htons(port); addr.sin6_port = htons(port);
if(id == protocol->getSelfId())
continue;
fprintf(stderr, "[INFO] Adding neighbour %lX\n", id);
Neighbour nei(id, addr); Neighbour nei(id, addr);
neighbours->addPotentialNei(nei); neighbours->addPotentialNei(nei);
} }
} }
void PacketParser::receiveData(Bytes& pck, u8 length, u64 from) { void PacketParser::receiveData(Bytes& pck, u64 from) {
u32 seqNo; u32 seqNo;
u64 datId; u64 datId;
pck >> seqNo >> datId; pck >> seqNo >> datId;
neighbours->gotIHave(from, datId, seqNo);
protocol->sendIHave(from, datId, seqNo); protocol->sendIHave(from, datId, seqNo);
//TODO dataStore->addData(pck, seqNo, datId, false);
length++; }
void PacketParser::receiveIHave(Bytes& pck, u64 from) {
u32 seqno;
u64 datId;
if(pck.size() < 8+4) {
fprintf(stderr, "[WARNING] IHave too short from %lX\n", from);
return;
}
pck >> seqno >> datId;
neighbours->gotIHave(from, datId, seqno);
}
void PacketParser::handleNR(u64 from) {
std::vector<Neighbour> neigh;
neighbours->getNeighbours(neigh, from, csts::NB_NEIGH_PER_NR);
protocol->sendNeighbours(from, neigh);
} }

View file

@ -8,20 +8,24 @@
#include "neighbours.h" #include "neighbours.h"
#include "protocol.h" #include "protocol.h"
#include "dataStore.h"
class PacketParser { class PacketParser {
public: public:
PacketParser(Neighbours* nei, Protocol* proto); PacketParser(Neighbours* nei, Protocol* proto, DataStore* dataStore);
void parse(Bytes pck); void parse(Bytes pck, const SockAddr& addr);
private: //meth private: //meth
void readTLV(Bytes& pck, u64 nei); void readTLV(Bytes& pck, u64 nei, const SockAddr& addr);
void receiveNeigh(Bytes& pck, u8 length); void receiveNeigh(Bytes& pck);
void receiveData(Bytes& pck, u8 length, u64 from); void receiveData(Bytes& pck, u64 from);
void receiveIHave(Bytes& pck, u64 from);
void handleNR(u64 from);
private: private:
Neighbours* neighbours; Neighbours* neighbours;
Protocol* protocol; Protocol* protocol;
DataStore* dataStore;
}; };

View file

@ -79,7 +79,8 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
if(rc != (ssize_t)pck.size()) { if(rc != (ssize_t)pck.size()) {
//TODO log warning. //TODO log warning.
} }
fprintf(stderr, "[INFO] sending packet to port %hu\n", ntohs(to.sin6_port)); fprintf(stderr, "[INFO] sending packet [%d] to port %hu\n",
body.size() > 0 ? body[0] : -1, ntohs(to.sin6_port));
} }
void Protocol::sendBody(const Bytes& body, const u64& id) { void Protocol::sendBody(const Bytes& body, const u64& id) {
return sendBody(body, addrOfId(id)); return sendBody(body, addrOfId(id));
@ -130,20 +131,20 @@ void Protocol::sendNeighbours(u64 id, const std::vector<Neighbour>& neigh) {
sendNeighbours(addrOfId(id), neigh); sendNeighbours(addrOfId(id), neigh);
} }
void Protocol::sendData(const SockAddr& to, const char* data, size_t len, void Protocol::sendData(const SockAddr& to, const Bytes& data,
u32 seqno, u64 datId) u32 seqno, u64 datId)
{ {
Bytes pck; Bytes pck;
if(len + 12 >= 1<<8) if(data.size() + 12 >= 1<<8)
throw DataTooLongError(); throw DataTooLongError();
pck << csts::TLV_DATA << (u8) (12 + len) << seqno << datId pck << csts::TLV_DATA << (u8) (12 + data.size()) << seqno << datId
<< Bytes(data, len); << data;
sendBody(pck, to); sendBody(pck, to);
} }
void Protocol::sendData(u64 id, const char* data, size_t len, u32 seqno, void Protocol::sendData(u64 id, const Bytes& data, u32 seqno,
u64 datId) u64 datId)
{ {
sendData(addrOfId(id), data, len, seqno, datId); sendData(addrOfId(id), data, seqno, datId);
} }
void Protocol::sendIHave(const SockAddr& to, u64 datId, u32 seqno) { void Protocol::sendIHave(const SockAddr& to, u64 datId, u32 seqno) {
@ -205,17 +206,11 @@ void Protocol::pollNetwork() {
else { else {
fprintf(stderr, "[ERROR] Unknown address family %d.\n", fprintf(stderr, "[ERROR] Unknown address family %d.\n",
fromAddr.sa_family); fromAddr.sa_family);
fprintf(stderr, "\t%hu\n", ((SockAddr*)&fromAddr)->sin6_port);
while(data.size() > 0) {
u8 c;
data >> c;
fprintf(stderr, "%02X|", c);
}
fprintf(stderr, "\n");
continue; continue;
} }
puts("Received packet"); fprintf(stderr, "[INFO] Received packet [%d]\n",
data.size() > 8 ? data[8] : -1);
AvailPacket pck; AvailPacket pck;
pck.from = convFromAddr; pck.from = convFromAddr;
pck.data = data; pck.data = data;

View file

@ -61,10 +61,9 @@ class Protocol {
const std::vector<Neighbour>& neigh); const std::vector<Neighbour>& neigh);
/** Sends a neighbours list packet */ /** Sends a neighbours list packet */
void sendData(const SockAddr& to, const char* data, size_t len, void sendData(const SockAddr& to, const Bytes& data,
u32 seqno, u64 datId);
void sendData(u64 id, const char* data, size_t len,
u32 seqno, u64 datId); u32 seqno, u64 datId);
void sendData(u64 id, const Bytes& data, u32 seqno, u64 datId);
void sendIHave(const SockAddr& to, u64 datId, u32 seqno); void sendIHave(const SockAddr& to, u64 datId, u32 seqno);
void sendIHave(u64 id, u64 datId, u32 seqno); void sendIHave(u64 id, u64 datId, u32 seqno);