Compare commits
2 commits
143cff86f5
...
98924f4ca6
Author | SHA1 | Date | |
---|---|---|---|
98924f4ca6 | |||
c0f1c55e03 |
15 changed files with 515 additions and 68 deletions
12
Bytes.cpp
12
Bytes.cpp
|
@ -53,12 +53,12 @@ Bytes& Bytes::operator<<(u8 v) {
|
|||
return *this;
|
||||
}
|
||||
Bytes& Bytes::operator<<(u16 v) {
|
||||
insertData<u16>(htons(v));
|
||||
insertData<u16>(v);
|
||||
// insertData<u16>(htons(v));
|
||||
return *this;
|
||||
}
|
||||
Bytes& Bytes::operator<<(u32 v) {
|
||||
insertData<u32>(htonl(v));
|
||||
insertData<u32>(v);
|
||||
// insertData<u32>(htonl(v));
|
||||
return *this;
|
||||
}
|
||||
|
@ -103,6 +103,12 @@ Bytes& Bytes::operator>>(char& v) {
|
|||
return *this;
|
||||
}
|
||||
|
||||
Bytes Bytes::extract(size_t len) {
|
||||
Bytes out = sub(0, len); // Handles out of range
|
||||
firstIndex += len;
|
||||
return out;
|
||||
}
|
||||
|
||||
void Bytes::operator=(const Bytes& oth) {
|
||||
firstIndex=0;
|
||||
data.clear();
|
||||
|
@ -112,7 +118,7 @@ void Bytes::operator=(const Bytes& oth) {
|
|||
}
|
||||
|
||||
Bytes Bytes::sub(size_t beg, size_t len) const {
|
||||
if(beg+len >= size())
|
||||
if(beg+len > size())
|
||||
throw OutOfRange();
|
||||
Bytes out;
|
||||
for(size_t byte=0; byte < len; byte++)
|
||||
|
|
5
Bytes.h
5
Bytes.h
|
@ -55,6 +55,11 @@ class Bytes {
|
|||
/// Extracts the given data type from the vector. Returns *this to
|
||||
/// allow chaining.
|
||||
|
||||
Bytes extract(size_t len);
|
||||
/** Extracts the first `len` bytes and returns them. Throws
|
||||
* `OutOfRange` if `len` > `size()`.
|
||||
*/
|
||||
|
||||
void operator=(const Bytes& oth);
|
||||
/// Copies the given Bytes object into its own data.
|
||||
|
||||
|
|
5
Makefile
5
Makefile
|
@ -1,8 +1,9 @@
|
|||
CXX=g++
|
||||
CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O2
|
||||
CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O0 -g
|
||||
CXXLIBS=-lpthread
|
||||
|
||||
OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o
|
||||
OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o \
|
||||
dataStore.o flooder.o
|
||||
TARGET = jeanhubert
|
||||
|
||||
all: $(TARGET)
|
||||
|
|
110
dataStore.cpp
Normal file
110
dataStore.cpp
Normal file
|
@ -0,0 +1,110 @@
|
|||
/***************************************************************************
|
||||
* By Théophile Bastian, 2017
|
||||
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
|
||||
* License: WTFPL v2 <http://www.wtfpl.net/>
|
||||
**************************************************************************/
|
||||
|
||||
#include "dataStore.h"
|
||||
#include <cstring>
|
||||
#include <algorithm>
|
||||
using namespace std;
|
||||
|
||||
DataStore::DataStore(Protocol* proto) : proto(proto)
|
||||
{}
|
||||
|
||||
DataStore::~DataStore() {
|
||||
}
|
||||
|
||||
void DataStore::update() {
|
||||
while(!timeEvents.empty()) {
|
||||
const TimeEvent& evt = timeEvents.top();
|
||||
if(evt.time > time(NULL)) // We're done for now.
|
||||
break;
|
||||
|
||||
timeEvents.pop();
|
||||
switch(evt.type) {
|
||||
case EV_REPUBLISH:
|
||||
handleRepublish(evt.id);
|
||||
break;
|
||||
|
||||
case EV_EXPIRES:
|
||||
handleExpire(evt.id, evt.seqno);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) {
|
||||
if(curSeqno.find(id) != curSeqno.end() && curSeqno[id] >= seqno)
|
||||
return;
|
||||
|
||||
data[id] = Data(pck, mine);
|
||||
curSeqno[id] = seqno;
|
||||
|
||||
if(mine) {
|
||||
timeEvents.push(TimeEvent(
|
||||
time(NULL) + csts::TIME_REPUBLISH_DATA,
|
||||
seqno, id, EV_REPUBLISH));
|
||||
|
||||
}
|
||||
else {
|
||||
timeEvents.push(TimeEvent(
|
||||
time(NULL) + csts::TIMEOUT_DATA,
|
||||
seqno, id, EV_EXPIRES));
|
||||
// If it is not renewed, it will expire.
|
||||
}
|
||||
|
||||
recvTime[id] = time(NULL);
|
||||
fprintf(stderr, "[INFO] Storing data %lX (%u)\n", id, seqno);
|
||||
|
||||
handleFlood(id);
|
||||
}
|
||||
|
||||
ssize_t DataStore::dataSize(u64 id) {
|
||||
if(data.find(id) == data.end())
|
||||
return -1;
|
||||
return data[id].data.size();
|
||||
}
|
||||
|
||||
ssize_t DataStore::readData(u64 id, char* buffer, size_t size) {
|
||||
if(data.find(id) == data.end())
|
||||
return -1;
|
||||
ssize_t len = min((ssize_t)size, dataSize(id));
|
||||
data[id].data.writeToBuffer(buffer, size);
|
||||
return len;
|
||||
}
|
||||
|
||||
void DataStore::ids(std::vector<u64>& out) {
|
||||
for(auto& dat : data)
|
||||
out.push_back(dat.first);
|
||||
}
|
||||
|
||||
void DataStore::setFlooded(u64 id) {
|
||||
toFlood_.erase(id);
|
||||
}
|
||||
|
||||
void DataStore::handleExpire(u64 id, u32 seqno) {
|
||||
if(seqno < curSeqno[id])
|
||||
return; // Was updated in time
|
||||
|
||||
cleanData(id);
|
||||
}
|
||||
|
||||
void DataStore::handleRepublish(u64 id) {
|
||||
if(data.find(id) == data.end() || !data[id].isMine)
|
||||
return;
|
||||
|
||||
curSeqno[id] = time(NULL);
|
||||
handleFlood(id);
|
||||
}
|
||||
|
||||
void DataStore::handleFlood(u64 id) {
|
||||
toFlood_.insert(id);
|
||||
}
|
||||
|
||||
void DataStore::cleanData(u64 id) {
|
||||
curSeqno.erase(id);
|
||||
recvTime.erase(id);
|
||||
data.erase(id); // Deletes the Bytes as well
|
||||
}
|
||||
|
98
dataStore.h
Normal file
98
dataStore.h
Normal file
|
@ -0,0 +1,98 @@
|
|||
/***************************************************************************
|
||||
* By Théophile Bastian, 2017
|
||||
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
|
||||
* License: WTFPL v2 <http://www.wtfpl.net/>
|
||||
**************************************************************************/
|
||||
|
||||
#pragma once
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
#include <set>
|
||||
#include "data.h"
|
||||
#include "nw_constants.h"
|
||||
#include "protocol.h"
|
||||
#include "Bytes.h"
|
||||
|
||||
class DataStore {
|
||||
public:
|
||||
DataStore(Protocol* proto);
|
||||
~DataStore();
|
||||
|
||||
void update();
|
||||
/** Performs bookkeeping actions. Sends packets. This function should
|
||||
* be called often enough.
|
||||
*/
|
||||
|
||||
void addData(Bytes pck, u32 seqno, u64 id, bool mine=false);
|
||||
/** Adds a data in the data storage. If `mine` is `true`, the current
|
||||
* node considers itself responsible for the data and will republish
|
||||
* it when needed.
|
||||
*/
|
||||
|
||||
ssize_t dataSize(u64 id);
|
||||
/** Gets the size of the data with id `id`. Returns -1 if there is no
|
||||
* such data stored.
|
||||
*/
|
||||
|
||||
ssize_t readData(u64 id, char* buffer, size_t size);
|
||||
/** Fills `buffer` with the data of the given `id`.
|
||||
* If the data does not exist, returs -1; else, returns the length of
|
||||
* data actually retrieved.
|
||||
*/
|
||||
|
||||
const Bytes& operator[](u64 id) {
|
||||
return data[id].data;
|
||||
}
|
||||
|
||||
u32 getSeqno(u64 id) const { return curSeqno.at(id); }
|
||||
|
||||
void ids(std::vector<u64>& out);
|
||||
/** Fills `out` with the IDs of the stored data. */
|
||||
|
||||
const std::set<u64> toFlood() const { return toFlood_; }
|
||||
/** Returns a list of data IDs that should be flooded. */
|
||||
|
||||
void setFlooded(u64 id);
|
||||
/** Marks a data as flooded */
|
||||
|
||||
private: //meth
|
||||
void handleExpire(u64 id, u32 seqno);
|
||||
void handleRepublish(u64 id);
|
||||
void handleFlood(u64 id);
|
||||
|
||||
void cleanData(u64 id);
|
||||
|
||||
private:
|
||||
enum EvType {
|
||||
EV_REPUBLISH, EV_EXPIRES
|
||||
};
|
||||
struct TimeEvent {
|
||||
TimeEvent(time_t time, u32 seqno, u64 id, EvType type) :
|
||||
time(time), seqno(seqno), id(id), type(type) {}
|
||||
time_t time;
|
||||
u32 seqno;
|
||||
u64 id;
|
||||
EvType type;
|
||||
|
||||
bool operator<(const TimeEvent& e) const {
|
||||
return time > e.time; // Max-priority queue
|
||||
}
|
||||
};
|
||||
|
||||
struct Data {
|
||||
Data() : data(), isMine(false) {}
|
||||
Data(Bytes b) : data(b), isMine(false) {}
|
||||
Data(Bytes b, bool mine) : data(b), isMine(mine) {}
|
||||
Bytes data;
|
||||
bool isMine;
|
||||
};
|
||||
|
||||
std::priority_queue<TimeEvent> timeEvents;
|
||||
Protocol* proto;
|
||||
std::unordered_map<u64, Data> data;
|
||||
std::unordered_map<u64, u32> curSeqno;
|
||||
std::unordered_map<u64, time_t> recvTime;
|
||||
|
||||
std::set<u64> toFlood_;
|
||||
};
|
||||
|
54
flooder.cpp
Normal file
54
flooder.cpp
Normal file
|
@ -0,0 +1,54 @@
|
|||
/***************************************************************************
|
||||
* By Théophile Bastian, 2017
|
||||
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
|
||||
* License: WTFPL v2 <http://www.wtfpl.net/>
|
||||
**************************************************************************/
|
||||
|
||||
#include "flooder.h"
|
||||
|
||||
Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
|
||||
const std::list<Neighbour>& peers) :
|
||||
data(data), datId(datId), seqno(seqno), proto(proto)
|
||||
{
|
||||
fprintf(stderr, "[DBG] Created flooder for %lX (%u)\n", datId, seqno);
|
||||
for(auto it=peers.begin(); it != peers.end(); ++it) {
|
||||
triesCount[it->id] = 0;
|
||||
toFlood.push(FloodEvt(time(NULL), it->id));
|
||||
}
|
||||
update();
|
||||
}
|
||||
|
||||
void Flooder::update() {
|
||||
while(!toFlood.empty()) {
|
||||
const FloodEvt& evt = toFlood.top();
|
||||
if(evt.time > time(NULL))
|
||||
break;
|
||||
toFlood.pop();
|
||||
|
||||
if(triesCount[evt.id] > csts::FLOOD_RETRIES) {
|
||||
fprintf(stderr, "[WARNING] Could not flood to %lX: no IHave.\n",
|
||||
evt.id);
|
||||
continue;
|
||||
}
|
||||
else if(triesCount[evt.id] < 0) {
|
||||
fprintf(stderr, "[DBG] Got your IHave, %lX!\n", evt.id);
|
||||
continue; // IHave received
|
||||
}
|
||||
|
||||
sendTo(evt.id);
|
||||
triesCount[evt.id]++;
|
||||
toFlood.push(FloodEvt(time(NULL) + csts::TIME_RESEND_FLOOD, evt.id));
|
||||
}
|
||||
}
|
||||
|
||||
void Flooder::gotIHave(u64 id, u32 withSeqno) {
|
||||
if(seqno > withSeqno)
|
||||
return;
|
||||
|
||||
triesCount[id] = -1;
|
||||
}
|
||||
|
||||
void Flooder::sendTo(u64 id) {
|
||||
proto->sendData(id, data, seqno, datId);
|
||||
}
|
||||
|
49
flooder.h
Normal file
49
flooder.h
Normal file
|
@ -0,0 +1,49 @@
|
|||
/***************************************************************************
|
||||
* By Théophile Bastian, 2017
|
||||
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
|
||||
* License: WTFPL v2 <http://www.wtfpl.net/>
|
||||
**************************************************************************/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <ctime>
|
||||
#include <queue>
|
||||
#include <list>
|
||||
#include <unordered_map>
|
||||
#include "data.h"
|
||||
#include "nw_constants.h"
|
||||
#include "protocol.h"
|
||||
|
||||
class Flooder {
|
||||
public:
|
||||
Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
|
||||
const std::list<Neighbour>& peers);
|
||||
|
||||
void update();
|
||||
/** Call often enough (ie ~1s) */
|
||||
|
||||
void gotIHave(u64 id, u32 withSeqno);
|
||||
/** Acknoledges the reception of the resource by peer `id`. */
|
||||
|
||||
bool done() const { return toFlood.empty(); }
|
||||
|
||||
private: //meth
|
||||
void sendTo(u64 id);
|
||||
|
||||
private:
|
||||
struct FloodEvt {
|
||||
FloodEvt(time_t time, u64 id) : time(time), id(id) {}
|
||||
time_t time;
|
||||
u64 id;
|
||||
bool operator<(const FloodEvt& e) const {
|
||||
return time > e.time; // Max priority queue.
|
||||
}
|
||||
};
|
||||
std::unordered_map<u64, int> triesCount;
|
||||
std::priority_queue<FloodEvt> toFlood;
|
||||
Bytes data;
|
||||
u64 datId;
|
||||
u32 seqno;
|
||||
Protocol* proto;
|
||||
};
|
||||
|
13
main.cpp
13
main.cpp
|
@ -10,6 +10,7 @@
|
|||
#include "neighbours.h"
|
||||
#include "packetParser.h"
|
||||
#include "configFile.h"
|
||||
#include "dataStore.h"
|
||||
#include <cstring>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
|
@ -43,7 +44,9 @@ int main(int argc, char** argv) {
|
|||
|
||||
Protocol proto(addr, cfg.getSelfId());
|
||||
|
||||
Neighbours neighboursManager(&proto);
|
||||
DataStore dataStore(&proto);
|
||||
Neighbours neighboursManager(&proto, &dataStore);
|
||||
|
||||
for(const Neighbour& nei : cfg.getBootstrapNodes()) {
|
||||
char addr[54];
|
||||
inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54);
|
||||
|
@ -52,7 +55,7 @@ int main(int argc, char** argv) {
|
|||
neighboursManager.addPotentialNei(nei);
|
||||
}
|
||||
|
||||
PacketParser pckParser(&neighboursManager, &proto);
|
||||
PacketParser pckParser(&neighboursManager, &proto, &dataStore);
|
||||
|
||||
while(true) {
|
||||
neighboursManager.fullUpdate();
|
||||
|
@ -60,10 +63,12 @@ int main(int argc, char** argv) {
|
|||
while(proto.readyRead()) {
|
||||
SockAddr fromAddr;
|
||||
Bytes pck = proto.getPacket(&fromAddr);
|
||||
pckParser.parse(pck);
|
||||
pckParser.parse(pck, fromAddr);
|
||||
}
|
||||
|
||||
sleep(2);
|
||||
dataStore.update();
|
||||
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -11,8 +11,8 @@
|
|||
|
||||
using namespace std;
|
||||
|
||||
Neighbours::Neighbours(Protocol* proto) :
|
||||
proto(proto), lastPeerPeek(0)
|
||||
Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) :
|
||||
proto(proto), dataStore(dataStore), lastPeerPeek(0), lastSentNR(0)
|
||||
{}
|
||||
|
||||
void Neighbours::fullCheck() {
|
||||
|
@ -46,17 +46,46 @@ void Neighbours::fullUpdate() {
|
|||
|
||||
if(potentialNei.size() > 0
|
||||
&& symNei.size() < csts::SYM_COUNT_BEFORE_PEEK
|
||||
&& time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK) {
|
||||
int nPeerId = rand() % potentialNei.size();
|
||||
auto it = potentialNei.begin();
|
||||
for(int at=0; at < nPeerId; ++at, ++it);
|
||||
&& time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK)
|
||||
{
|
||||
auto it = randPeer(&potentialNei);
|
||||
proto->sendEmpty(it->id);
|
||||
lastPckSent[it->id] = time(NULL);
|
||||
lastPeerPeek = time(NULL);
|
||||
}
|
||||
|
||||
|
||||
if(symNei.size() > 0 && potentialNei.size() < 5
|
||||
&& time(NULL) - lastSentNR >= csts::TIME_SEND_NR)
|
||||
{
|
||||
auto it = randPeer(&symNei);
|
||||
proto->sendNReq(it->id);
|
||||
lastPckSent[it->id] = time(NULL);
|
||||
lastSentNR = time(NULL);
|
||||
}
|
||||
|
||||
for(auto flooder=dataFlooder.begin(); flooder != dataFlooder.end(); )
|
||||
{
|
||||
if(flooder->second.done())
|
||||
flooder = dataFlooder.erase(flooder);
|
||||
else {
|
||||
flooder->second.update();
|
||||
++flooder;
|
||||
}
|
||||
}
|
||||
|
||||
for(u64 datId : dataStore->toFlood()) {
|
||||
dataFlooder.insert({datId,
|
||||
Flooder(
|
||||
(*dataStore)[datId], datId, dataStore->getSeqno(datId),
|
||||
proto, symNei)});
|
||||
dataStore->setFlooded(datId);
|
||||
}
|
||||
}
|
||||
|
||||
void Neighbours::addPotentialNei(const Neighbour& nei) {
|
||||
if(neiType.find(nei.id) != neiType.end())
|
||||
return; // We already know him
|
||||
potentialNei.push_back(nei);
|
||||
lastRecv.insert({nei.id, 0});
|
||||
lastIHU.insert({nei.id, 0});
|
||||
|
@ -64,7 +93,10 @@ void Neighbours::addPotentialNei(const Neighbour& nei) {
|
|||
proto->addIdAddr(nei.addr, nei.id);
|
||||
}
|
||||
|
||||
void Neighbours::receivedFrom(u64 id) {
|
||||
void Neighbours::receivedFrom(u64 id, const SockAddr& addr) {
|
||||
if(neiType.find(id) == neiType.end())
|
||||
addPotentialNei(Neighbour(id, addr));
|
||||
|
||||
NeiType typ = neiType[id];
|
||||
lastRecv[id] = time(NULL);
|
||||
|
||||
|
@ -72,7 +104,13 @@ void Neighbours::receivedFrom(u64 id) {
|
|||
changeNeiType(id, NEI_UNIDIR);
|
||||
}
|
||||
|
||||
void Neighbours::hadIHU(u64 id) {
|
||||
void Neighbours::hadIHU(u64 id, const SockAddr& addr) {
|
||||
if(neiType.find(id) == neiType.end()) {
|
||||
fprintf(stderr, "[WARNING] %lX heard us — yet we did not send "
|
||||
"anything?\n", id);
|
||||
addPotentialNei(Neighbour(id, addr));
|
||||
}
|
||||
|
||||
NeiType typ = neiType[id];
|
||||
lastRecv[id] = time(NULL);
|
||||
lastIHU[id] = time(NULL);
|
||||
|
@ -80,6 +118,26 @@ void Neighbours::hadIHU(u64 id) {
|
|||
changeNeiType(id, NEI_SYM);
|
||||
}
|
||||
|
||||
void Neighbours::getNeighbours(vector<Neighbour>& out, u64 except, int count) {
|
||||
vector<list<Neighbour>::iterator > permutation;
|
||||
for(auto it = symNei.begin(); it != symNei.end(); ++it)
|
||||
permutation.push_back(it);
|
||||
for(size_t i=0; i < permutation.size(); i++) {
|
||||
int swapWith = rand() % permutation.size();
|
||||
auto tmp = permutation[i];
|
||||
permutation[i] = permutation[swapWith];
|
||||
permutation[swapWith] = tmp;
|
||||
}
|
||||
|
||||
for(int cur=0, pos=0; cur < count && (size_t)pos < permutation.size();
|
||||
pos++) {
|
||||
if(permutation[pos]->id != except) {
|
||||
out.push_back(*permutation[pos]);
|
||||
cur++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
list<Neighbour>* Neighbours::listOfType(NeiType typ) {
|
||||
switch(typ) {
|
||||
case NEI_POTENTIAL:
|
||||
|
@ -94,8 +152,13 @@ list<Neighbour>* Neighbours::listOfType(NeiType typ) {
|
|||
throw WrongNeiType();
|
||||
}
|
||||
|
||||
void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) {
|
||||
if(dataFlooder.find(datId) != dataFlooder.end()) {
|
||||
dataFlooder.at(datId).gotIHave(from, seqno);
|
||||
}
|
||||
}
|
||||
|
||||
void Neighbours::changeNeiType(u64 id, NeiType nType) {
|
||||
printf("TYPE %lX %d\n", id, neiType[id]);
|
||||
NeiType cType = neiType[id];
|
||||
if(cType == nType)
|
||||
return;
|
||||
|
@ -106,11 +169,8 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
|
|||
bool wasSpliced=false;
|
||||
for(auto it=fromList->begin(); it != fromList->end(); ++it) {
|
||||
if(it->id == id) {
|
||||
printf("%ld %ld\n", fromList->size(), toList->size());
|
||||
toList->push_back(*it); // splice() doesn't work?!
|
||||
fromList->erase(it);
|
||||
printf("%ld %ld %d\n", fromList->size(), toList->size(),
|
||||
toList == &unidirNei);
|
||||
wasSpliced=true;
|
||||
break;
|
||||
}
|
||||
|
@ -118,7 +178,6 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
|
|||
if(!wasSpliced) {
|
||||
fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id);
|
||||
}
|
||||
printf("TYPE %lX %d\n", id, neiType[id]);
|
||||
}
|
||||
|
||||
void Neighbours::updateSendPackets(const Neighbour& nei) {
|
||||
|
@ -128,7 +187,6 @@ void Neighbours::updateSendPackets(const Neighbour& nei) {
|
|||
|
||||
bool Neighbours::sendEmpty(u64 id) {
|
||||
if(time(NULL) - lastPckSent[id] >= csts::TIME_RESEND_EMPTY) {
|
||||
printf("[DBG] sending empty packet to %lX.\n", id);
|
||||
lastPckSent[id] = time(NULL);
|
||||
proto->sendEmpty(id);
|
||||
return true;
|
||||
|
@ -138,7 +196,6 @@ bool Neighbours::sendEmpty(u64 id) {
|
|||
|
||||
bool Neighbours::sendIHU(u64 id) {
|
||||
if(time(NULL) - lastIHUSent[id] >= csts::TIME_RESEND_IHU) {
|
||||
printf("[DBG] sending IHU packet to %lX.\n", id);
|
||||
lastIHUSent[id] = time(NULL);
|
||||
lastPckSent[id] = time(NULL);
|
||||
proto->sendIHU(id);
|
||||
|
@ -147,3 +204,10 @@ bool Neighbours::sendIHU(u64 id) {
|
|||
return false;
|
||||
}
|
||||
|
||||
list<Neighbour>::iterator Neighbours::randPeer(list<Neighbour>* list) {
|
||||
int nPeerId = rand() % list->size();
|
||||
auto it = list->begin();
|
||||
for(int at=0; at < nPeerId; ++at, ++it);
|
||||
return it;
|
||||
}
|
||||
|
||||
|
|
24
neighbours.h
24
neighbours.h
|
@ -7,14 +7,17 @@
|
|||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <set>
|
||||
#include <ctime>
|
||||
#include <unordered_map>
|
||||
#include "data.h"
|
||||
#include "protocol.h"
|
||||
#include "flooder.h"
|
||||
#include "dataStore.h"
|
||||
|
||||
class Neighbours {
|
||||
public:
|
||||
Neighbours(Protocol* proto);
|
||||
Neighbours(Protocol* proto, DataStore* dataStore);
|
||||
|
||||
void fullCheck();
|
||||
/** Cleans the peers lists by removing the expired entries. */
|
||||
|
@ -28,16 +31,26 @@ class Neighbours {
|
|||
void addPotentialNei(const Neighbour& nei);
|
||||
/** Adds a `Neighbour` to the list of potential neighbours. */
|
||||
|
||||
void receivedFrom(u64 id);
|
||||
void receivedFrom(u64 id, const SockAddr& addr);
|
||||
/** Signals that a packet was received from `id`, performs the
|
||||
* appropriate bookkeeping actions.
|
||||
*/
|
||||
|
||||
void hadIHU(u64 id);
|
||||
void hadIHU(u64 id, const SockAddr& addr);
|
||||
/** Signals that a IHU was received from `id`, performs the
|
||||
* appropriate bookkeeping actions.
|
||||
*/
|
||||
|
||||
void getNeighbours(std::vector<Neighbour>& out, u64 except, int count);
|
||||
/** Fills `out` with at most `count` symetric neighbours, not
|
||||
* including `except`.
|
||||
*/
|
||||
|
||||
void gotIHave(u64 from, u64 datId, u32 seqno);
|
||||
/** Notifies the flooders that a `IHave` packet has been received for
|
||||
* this data from the peer `from`.
|
||||
*/
|
||||
|
||||
private: //meth
|
||||
class WrongNeiType : public std::exception {};
|
||||
enum NeiType {
|
||||
|
@ -50,13 +63,16 @@ class Neighbours {
|
|||
|
||||
bool sendEmpty(u64 id);
|
||||
bool sendIHU(u64 id);
|
||||
std::list<Neighbour>::iterator randPeer(std::list<Neighbour>* list);
|
||||
|
||||
private:
|
||||
Protocol* proto;
|
||||
DataStore* dataStore;
|
||||
std::list<Neighbour> potentialNei, unidirNei, symNei;
|
||||
std::unordered_map<u64, time_t> lastRecv, lastIHU;
|
||||
std::unordered_map<u64, time_t> lastPckSent, lastIHUSent;
|
||||
std::unordered_map<u64, NeiType> neiType;
|
||||
time_t lastPeerPeek;
|
||||
std::unordered_map<u64, Flooder> dataFlooder;
|
||||
time_t lastPeerPeek, lastSentNR;
|
||||
};
|
||||
|
||||
|
|
|
@ -28,11 +28,18 @@ const u8 TLV_DATA_JPG = 34;
|
|||
const int TIMEOUT_UNIDIR = 100; // s
|
||||
const int TIMEOUT_SYM_RECV = 150; // s
|
||||
const int TIMEOUT_SYM_IHU = 300; // s
|
||||
const int TIMEOUT_DATA = 35*60; // s
|
||||
const int TIME_RESEND_IHU = 90; // s
|
||||
const int TIME_RESEND_EMPTY = 30; // s
|
||||
const int TIME_REPUBLISH_DATA = 25*60; // s
|
||||
const int TIME_PEER_PEEK = 30; // s
|
||||
const int TIME_SEND_NR = 60; // s
|
||||
const int TIME_RESEND_FLOOD = 3; // s
|
||||
|
||||
const int SYM_COUNT_BEFORE_PEEK = 5;
|
||||
const int NB_NEIGH_PER_NR = 5;
|
||||
|
||||
const int FLOOD_RETRIES = 3;
|
||||
|
||||
const u16 DEFAULT_PORT = 1192;
|
||||
|
||||
|
|
|
@ -7,24 +7,35 @@
|
|||
#include "packetParser.h"
|
||||
#include <cstring>
|
||||
|
||||
PacketParser::PacketParser(Neighbours* nei, Protocol* proto) :
|
||||
neighbours(nei), protocol(proto)
|
||||
PacketParser::PacketParser(Neighbours* nei, Protocol* proto,
|
||||
DataStore* dataStore) :
|
||||
neighbours(nei), protocol(proto), dataStore(dataStore)
|
||||
{}
|
||||
|
||||
void PacketParser::parse(Bytes pck) {
|
||||
void PacketParser::parse(Bytes pck, const SockAddr& addr) {
|
||||
u64 peerId;
|
||||
pck >> peerId;
|
||||
neighbours->receivedFrom(peerId);
|
||||
neighbours->receivedFrom(peerId, addr);
|
||||
|
||||
while(pck.size() > 0) {
|
||||
readTLV(pck, peerId);
|
||||
readTLV(pck, peerId, addr);
|
||||
}
|
||||
}
|
||||
|
||||
void PacketParser::readTLV(Bytes& pck, u64 nei) {
|
||||
void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) {
|
||||
u8 type, len;
|
||||
pck >> type >> len;
|
||||
|
||||
printf("[INFO] Analyzing %d\n", type);
|
||||
|
||||
if(pck.size() < len) {
|
||||
fprintf(stderr, "[WARNING] Advertised TLV does not fit in the packet"
|
||||
".\n");
|
||||
pck.extract(pck.size());
|
||||
return;
|
||||
}
|
||||
Bytes subpacket = pck.extract(len);
|
||||
|
||||
switch(type) {
|
||||
case csts::TLV_PAD1:
|
||||
case csts::TLV_PADN:
|
||||
|
@ -32,34 +43,34 @@ void PacketParser::readTLV(Bytes& pck, u64 nei) {
|
|||
|
||||
case csts::TLV_IHU: {
|
||||
u64 ihuId;
|
||||
pck >> ihuId;
|
||||
subpacket >> ihuId;
|
||||
if(ihuId != protocol->getSelfId())
|
||||
break;
|
||||
|
||||
neighbours->hadIHU(nei);
|
||||
neighbours->hadIHU(nei, addr);
|
||||
break;
|
||||
}
|
||||
|
||||
case csts::TLV_NR:
|
||||
//TODO
|
||||
handleNR(nei);
|
||||
break;
|
||||
|
||||
case csts::TLV_NEIGH:
|
||||
receiveNeigh(pck, len);
|
||||
receiveNeigh(subpacket);
|
||||
break;
|
||||
|
||||
case csts::TLV_DATA:
|
||||
receiveData(pck, len, nei);
|
||||
receiveData(subpacket, nei);
|
||||
break;
|
||||
|
||||
case csts::TLV_IHAVE:
|
||||
//TODO
|
||||
receiveIHave(subpacket, nei);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void PacketParser::receiveNeigh(Bytes& pck, u8 length) {
|
||||
while(length >= 8+16+2) { /* enough to read one peer */
|
||||
void PacketParser::receiveNeigh(Bytes& pck) {
|
||||
while(pck.size() >= 8+16+2) { /* enough to read one peer */
|
||||
u64 id;
|
||||
u16 port;
|
||||
SockAddr addr;
|
||||
|
@ -72,17 +83,40 @@ void PacketParser::receiveNeigh(Bytes& pck, u8 length) {
|
|||
pck >> port;
|
||||
addr.sin6_port = htons(port);
|
||||
|
||||
if(id == protocol->getSelfId())
|
||||
continue;
|
||||
|
||||
fprintf(stderr, "[INFO] Adding neighbour %lX\n", id);
|
||||
|
||||
Neighbour nei(id, addr);
|
||||
neighbours->addPotentialNei(nei);
|
||||
}
|
||||
}
|
||||
|
||||
void PacketParser::receiveData(Bytes& pck, u8 length, u64 from) {
|
||||
void PacketParser::receiveData(Bytes& pck, u64 from) {
|
||||
u32 seqNo;
|
||||
u64 datId;
|
||||
pck >> seqNo >> datId;
|
||||
neighbours->gotIHave(from, datId, seqNo);
|
||||
protocol->sendIHave(from, datId, seqNo);
|
||||
//TODO
|
||||
length++;
|
||||
dataStore->addData(pck, seqNo, datId, false);
|
||||
}
|
||||
|
||||
void PacketParser::receiveIHave(Bytes& pck, u64 from) {
|
||||
u32 seqno;
|
||||
u64 datId;
|
||||
if(pck.size() < 8+4) {
|
||||
fprintf(stderr, "[WARNING] IHave too short from %lX\n", from);
|
||||
return;
|
||||
}
|
||||
pck >> seqno >> datId;
|
||||
neighbours->gotIHave(from, datId, seqno);
|
||||
}
|
||||
|
||||
void PacketParser::handleNR(u64 from) {
|
||||
std::vector<Neighbour> neigh;
|
||||
neighbours->getNeighbours(neigh, from, csts::NB_NEIGH_PER_NR);
|
||||
|
||||
protocol->sendNeighbours(from, neigh);
|
||||
}
|
||||
|
||||
|
|
|
@ -8,20 +8,24 @@
|
|||
|
||||
#include "neighbours.h"
|
||||
#include "protocol.h"
|
||||
#include "dataStore.h"
|
||||
|
||||
class PacketParser {
|
||||
public:
|
||||
PacketParser(Neighbours* nei, Protocol* proto);
|
||||
PacketParser(Neighbours* nei, Protocol* proto, DataStore* dataStore);
|
||||
|
||||
void parse(Bytes pck);
|
||||
void parse(Bytes pck, const SockAddr& addr);
|
||||
|
||||
private: //meth
|
||||
void readTLV(Bytes& pck, u64 nei);
|
||||
void receiveNeigh(Bytes& pck, u8 length);
|
||||
void receiveData(Bytes& pck, u8 length, u64 from);
|
||||
void readTLV(Bytes& pck, u64 nei, const SockAddr& addr);
|
||||
void receiveNeigh(Bytes& pck);
|
||||
void receiveData(Bytes& pck, u64 from);
|
||||
void receiveIHave(Bytes& pck, u64 from);
|
||||
void handleNR(u64 from);
|
||||
|
||||
private:
|
||||
Neighbours* neighbours;
|
||||
Protocol* protocol;
|
||||
DataStore* dataStore;
|
||||
};
|
||||
|
||||
|
|
25
protocol.cpp
25
protocol.cpp
|
@ -79,7 +79,8 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
|
|||
if(rc != (ssize_t)pck.size()) {
|
||||
//TODO log warning.
|
||||
}
|
||||
fprintf(stderr, "[INFO] sending packet to port %hu\n", ntohs(to.sin6_port));
|
||||
fprintf(stderr, "[INFO] sending packet [%d] to port %hu\n",
|
||||
body.size() > 0 ? body[0] : -1, ntohs(to.sin6_port));
|
||||
}
|
||||
void Protocol::sendBody(const Bytes& body, const u64& id) {
|
||||
return sendBody(body, addrOfId(id));
|
||||
|
@ -130,20 +131,20 @@ void Protocol::sendNeighbours(u64 id, const std::vector<Neighbour>& neigh) {
|
|||
sendNeighbours(addrOfId(id), neigh);
|
||||
}
|
||||
|
||||
void Protocol::sendData(const SockAddr& to, const char* data, size_t len,
|
||||
void Protocol::sendData(const SockAddr& to, const Bytes& data,
|
||||
u32 seqno, u64 datId)
|
||||
{
|
||||
Bytes pck;
|
||||
if(len + 12 >= 1<<8)
|
||||
if(data.size() + 12 >= 1<<8)
|
||||
throw DataTooLongError();
|
||||
pck << csts::TLV_DATA << (u8) (12 + len) << seqno << datId
|
||||
<< Bytes(data, len);
|
||||
pck << csts::TLV_DATA << (u8) (12 + data.size()) << seqno << datId
|
||||
<< data;
|
||||
sendBody(pck, to);
|
||||
}
|
||||
void Protocol::sendData(u64 id, const char* data, size_t len, u32 seqno,
|
||||
void Protocol::sendData(u64 id, const Bytes& data, u32 seqno,
|
||||
u64 datId)
|
||||
{
|
||||
sendData(addrOfId(id), data, len, seqno, datId);
|
||||
sendData(addrOfId(id), data, seqno, datId);
|
||||
}
|
||||
|
||||
void Protocol::sendIHave(const SockAddr& to, u64 datId, u32 seqno) {
|
||||
|
@ -205,17 +206,11 @@ void Protocol::pollNetwork() {
|
|||
else {
|
||||
fprintf(stderr, "[ERROR] Unknown address family %d.\n",
|
||||
fromAddr.sa_family);
|
||||
fprintf(stderr, "\t%hu\n", ((SockAddr*)&fromAddr)->sin6_port);
|
||||
while(data.size() > 0) {
|
||||
u8 c;
|
||||
data >> c;
|
||||
fprintf(stderr, "%02X|", c);
|
||||
}
|
||||
fprintf(stderr, "\n");
|
||||
continue;
|
||||
}
|
||||
|
||||
puts("Received packet");
|
||||
fprintf(stderr, "[INFO] Received packet [%d]\n",
|
||||
data.size() > 8 ? data[8] : -1);
|
||||
AvailPacket pck;
|
||||
pck.from = convFromAddr;
|
||||
pck.data = data;
|
||||
|
|
|
@ -61,10 +61,9 @@ class Protocol {
|
|||
const std::vector<Neighbour>& neigh);
|
||||
/** Sends a neighbours list packet */
|
||||
|
||||
void sendData(const SockAddr& to, const char* data, size_t len,
|
||||
u32 seqno, u64 datId);
|
||||
void sendData(u64 id, const char* data, size_t len,
|
||||
void sendData(const SockAddr& to, const Bytes& data,
|
||||
u32 seqno, u64 datId);
|
||||
void sendData(u64 id, const Bytes& data, u32 seqno, u64 datId);
|
||||
|
||||
void sendIHave(const SockAddr& to, u64 datId, u32 seqno);
|
||||
void sendIHave(u64 id, u64 datId, u32 seqno);
|
||||
|
|
Loading…
Reference in a new issue