Compare commits

...

2 commits

15 changed files with 515 additions and 68 deletions

View file

@ -53,12 +53,12 @@ Bytes& Bytes::operator<<(u8 v) {
return *this;
}
Bytes& Bytes::operator<<(u16 v) {
insertData<u16>(htons(v));
insertData<u16>(v);
// insertData<u16>(htons(v));
return *this;
}
Bytes& Bytes::operator<<(u32 v) {
insertData<u32>(htonl(v));
insertData<u32>(v);
// insertData<u32>(htonl(v));
return *this;
}
@ -103,6 +103,12 @@ Bytes& Bytes::operator>>(char& v) {
return *this;
}
Bytes Bytes::extract(size_t len) {
Bytes out = sub(0, len); // Handles out of range
firstIndex += len;
return out;
}
void Bytes::operator=(const Bytes& oth) {
firstIndex=0;
data.clear();
@ -112,7 +118,7 @@ void Bytes::operator=(const Bytes& oth) {
}
Bytes Bytes::sub(size_t beg, size_t len) const {
if(beg+len >= size())
if(beg+len > size())
throw OutOfRange();
Bytes out;
for(size_t byte=0; byte < len; byte++)

View file

@ -55,6 +55,11 @@ class Bytes {
/// Extracts the given data type from the vector. Returns *this to
/// allow chaining.
Bytes extract(size_t len);
/** Extracts the first `len` bytes and returns them. Throws
* `OutOfRange` if `len` > `size()`.
*/
void operator=(const Bytes& oth);
/// Copies the given Bytes object into its own data.

View file

@ -1,8 +1,9 @@
CXX=g++
CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O2
CXXFLAGS=-Wall -Wextra -Werror -pedantic -std=c++14 -O0 -g
CXXLIBS=-lpthread
OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o
OBJS = Bytes.o main.o protocol.o neighbours.o packetParser.o configFile.o \
dataStore.o flooder.o
TARGET = jeanhubert
all: $(TARGET)

110
dataStore.cpp Normal file
View file

@ -0,0 +1,110 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#include "dataStore.h"
#include <cstring>
#include <algorithm>
using namespace std;
DataStore::DataStore(Protocol* proto) : proto(proto)
{}
DataStore::~DataStore() {
}
void DataStore::update() {
while(!timeEvents.empty()) {
const TimeEvent& evt = timeEvents.top();
if(evt.time > time(NULL)) // We're done for now.
break;
timeEvents.pop();
switch(evt.type) {
case EV_REPUBLISH:
handleRepublish(evt.id);
break;
case EV_EXPIRES:
handleExpire(evt.id, evt.seqno);
break;
}
}
}
void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) {
if(curSeqno.find(id) != curSeqno.end() && curSeqno[id] >= seqno)
return;
data[id] = Data(pck, mine);
curSeqno[id] = seqno;
if(mine) {
timeEvents.push(TimeEvent(
time(NULL) + csts::TIME_REPUBLISH_DATA,
seqno, id, EV_REPUBLISH));
}
else {
timeEvents.push(TimeEvent(
time(NULL) + csts::TIMEOUT_DATA,
seqno, id, EV_EXPIRES));
// If it is not renewed, it will expire.
}
recvTime[id] = time(NULL);
fprintf(stderr, "[INFO] Storing data %lX (%u)\n", id, seqno);
handleFlood(id);
}
ssize_t DataStore::dataSize(u64 id) {
if(data.find(id) == data.end())
return -1;
return data[id].data.size();
}
ssize_t DataStore::readData(u64 id, char* buffer, size_t size) {
if(data.find(id) == data.end())
return -1;
ssize_t len = min((ssize_t)size, dataSize(id));
data[id].data.writeToBuffer(buffer, size);
return len;
}
void DataStore::ids(std::vector<u64>& out) {
for(auto& dat : data)
out.push_back(dat.first);
}
void DataStore::setFlooded(u64 id) {
toFlood_.erase(id);
}
void DataStore::handleExpire(u64 id, u32 seqno) {
if(seqno < curSeqno[id])
return; // Was updated in time
cleanData(id);
}
void DataStore::handleRepublish(u64 id) {
if(data.find(id) == data.end() || !data[id].isMine)
return;
curSeqno[id] = time(NULL);
handleFlood(id);
}
void DataStore::handleFlood(u64 id) {
toFlood_.insert(id);
}
void DataStore::cleanData(u64 id) {
curSeqno.erase(id);
recvTime.erase(id);
data.erase(id); // Deletes the Bytes as well
}

98
dataStore.h Normal file
View file

@ -0,0 +1,98 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#pragma once
#include <queue>
#include <unordered_map>
#include <set>
#include "data.h"
#include "nw_constants.h"
#include "protocol.h"
#include "Bytes.h"
class DataStore {
public:
DataStore(Protocol* proto);
~DataStore();
void update();
/** Performs bookkeeping actions. Sends packets. This function should
* be called often enough.
*/
void addData(Bytes pck, u32 seqno, u64 id, bool mine=false);
/** Adds a data in the data storage. If `mine` is `true`, the current
* node considers itself responsible for the data and will republish
* it when needed.
*/
ssize_t dataSize(u64 id);
/** Gets the size of the data with id `id`. Returns -1 if there is no
* such data stored.
*/
ssize_t readData(u64 id, char* buffer, size_t size);
/** Fills `buffer` with the data of the given `id`.
* If the data does not exist, returs -1; else, returns the length of
* data actually retrieved.
*/
const Bytes& operator[](u64 id) {
return data[id].data;
}
u32 getSeqno(u64 id) const { return curSeqno.at(id); }
void ids(std::vector<u64>& out);
/** Fills `out` with the IDs of the stored data. */
const std::set<u64> toFlood() const { return toFlood_; }
/** Returns a list of data IDs that should be flooded. */
void setFlooded(u64 id);
/** Marks a data as flooded */
private: //meth
void handleExpire(u64 id, u32 seqno);
void handleRepublish(u64 id);
void handleFlood(u64 id);
void cleanData(u64 id);
private:
enum EvType {
EV_REPUBLISH, EV_EXPIRES
};
struct TimeEvent {
TimeEvent(time_t time, u32 seqno, u64 id, EvType type) :
time(time), seqno(seqno), id(id), type(type) {}
time_t time;
u32 seqno;
u64 id;
EvType type;
bool operator<(const TimeEvent& e) const {
return time > e.time; // Max-priority queue
}
};
struct Data {
Data() : data(), isMine(false) {}
Data(Bytes b) : data(b), isMine(false) {}
Data(Bytes b, bool mine) : data(b), isMine(mine) {}
Bytes data;
bool isMine;
};
std::priority_queue<TimeEvent> timeEvents;
Protocol* proto;
std::unordered_map<u64, Data> data;
std::unordered_map<u64, u32> curSeqno;
std::unordered_map<u64, time_t> recvTime;
std::set<u64> toFlood_;
};

54
flooder.cpp Normal file
View file

@ -0,0 +1,54 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#include "flooder.h"
Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers) :
data(data), datId(datId), seqno(seqno), proto(proto)
{
fprintf(stderr, "[DBG] Created flooder for %lX (%u)\n", datId, seqno);
for(auto it=peers.begin(); it != peers.end(); ++it) {
triesCount[it->id] = 0;
toFlood.push(FloodEvt(time(NULL), it->id));
}
update();
}
void Flooder::update() {
while(!toFlood.empty()) {
const FloodEvt& evt = toFlood.top();
if(evt.time > time(NULL))
break;
toFlood.pop();
if(triesCount[evt.id] > csts::FLOOD_RETRIES) {
fprintf(stderr, "[WARNING] Could not flood to %lX: no IHave.\n",
evt.id);
continue;
}
else if(triesCount[evt.id] < 0) {
fprintf(stderr, "[DBG] Got your IHave, %lX!\n", evt.id);
continue; // IHave received
}
sendTo(evt.id);
triesCount[evt.id]++;
toFlood.push(FloodEvt(time(NULL) + csts::TIME_RESEND_FLOOD, evt.id));
}
}
void Flooder::gotIHave(u64 id, u32 withSeqno) {
if(seqno > withSeqno)
return;
triesCount[id] = -1;
}
void Flooder::sendTo(u64 id) {
proto->sendData(id, data, seqno, datId);
}

49
flooder.h Normal file
View file

@ -0,0 +1,49 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#pragma once
#include <ctime>
#include <queue>
#include <list>
#include <unordered_map>
#include "data.h"
#include "nw_constants.h"
#include "protocol.h"
class Flooder {
public:
Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers);
void update();
/** Call often enough (ie ~1s) */
void gotIHave(u64 id, u32 withSeqno);
/** Acknoledges the reception of the resource by peer `id`. */
bool done() const { return toFlood.empty(); }
private: //meth
void sendTo(u64 id);
private:
struct FloodEvt {
FloodEvt(time_t time, u64 id) : time(time), id(id) {}
time_t time;
u64 id;
bool operator<(const FloodEvt& e) const {
return time > e.time; // Max priority queue.
}
};
std::unordered_map<u64, int> triesCount;
std::priority_queue<FloodEvt> toFlood;
Bytes data;
u64 datId;
u32 seqno;
Protocol* proto;
};

View file

@ -10,6 +10,7 @@
#include "neighbours.h"
#include "packetParser.h"
#include "configFile.h"
#include "dataStore.h"
#include <cstring>
#include <cstdio>
#include <cstdlib>
@ -43,7 +44,9 @@ int main(int argc, char** argv) {
Protocol proto(addr, cfg.getSelfId());
Neighbours neighboursManager(&proto);
DataStore dataStore(&proto);
Neighbours neighboursManager(&proto, &dataStore);
for(const Neighbour& nei : cfg.getBootstrapNodes()) {
char addr[54];
inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54);
@ -52,7 +55,7 @@ int main(int argc, char** argv) {
neighboursManager.addPotentialNei(nei);
}
PacketParser pckParser(&neighboursManager, &proto);
PacketParser pckParser(&neighboursManager, &proto, &dataStore);
while(true) {
neighboursManager.fullUpdate();
@ -60,10 +63,12 @@ int main(int argc, char** argv) {
while(proto.readyRead()) {
SockAddr fromAddr;
Bytes pck = proto.getPacket(&fromAddr);
pckParser.parse(pck);
pckParser.parse(pck, fromAddr);
}
sleep(2);
dataStore.update();
sleep(1);
}
return 0;

View file

@ -11,8 +11,8 @@
using namespace std;
Neighbours::Neighbours(Protocol* proto) :
proto(proto), lastPeerPeek(0)
Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) :
proto(proto), dataStore(dataStore), lastPeerPeek(0), lastSentNR(0)
{}
void Neighbours::fullCheck() {
@ -46,17 +46,46 @@ void Neighbours::fullUpdate() {
if(potentialNei.size() > 0
&& symNei.size() < csts::SYM_COUNT_BEFORE_PEEK
&& time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK) {
int nPeerId = rand() % potentialNei.size();
auto it = potentialNei.begin();
for(int at=0; at < nPeerId; ++at, ++it);
&& time(NULL) - lastPeerPeek >= csts::TIME_PEER_PEEK)
{
auto it = randPeer(&potentialNei);
proto->sendEmpty(it->id);
lastPckSent[it->id] = time(NULL);
lastPeerPeek = time(NULL);
}
if(symNei.size() > 0 && potentialNei.size() < 5
&& time(NULL) - lastSentNR >= csts::TIME_SEND_NR)
{
auto it = randPeer(&symNei);
proto->sendNReq(it->id);
lastPckSent[it->id] = time(NULL);
lastSentNR = time(NULL);
}
for(auto flooder=dataFlooder.begin(); flooder != dataFlooder.end(); )
{
if(flooder->second.done())
flooder = dataFlooder.erase(flooder);
else {
flooder->second.update();
++flooder;
}
}
for(u64 datId : dataStore->toFlood()) {
dataFlooder.insert({datId,
Flooder(
(*dataStore)[datId], datId, dataStore->getSeqno(datId),
proto, symNei)});
dataStore->setFlooded(datId);
}
}
void Neighbours::addPotentialNei(const Neighbour& nei) {
if(neiType.find(nei.id) != neiType.end())
return; // We already know him
potentialNei.push_back(nei);
lastRecv.insert({nei.id, 0});
lastIHU.insert({nei.id, 0});
@ -64,7 +93,10 @@ void Neighbours::addPotentialNei(const Neighbour& nei) {
proto->addIdAddr(nei.addr, nei.id);
}
void Neighbours::receivedFrom(u64 id) {
void Neighbours::receivedFrom(u64 id, const SockAddr& addr) {
if(neiType.find(id) == neiType.end())
addPotentialNei(Neighbour(id, addr));
NeiType typ = neiType[id];
lastRecv[id] = time(NULL);
@ -72,7 +104,13 @@ void Neighbours::receivedFrom(u64 id) {
changeNeiType(id, NEI_UNIDIR);
}
void Neighbours::hadIHU(u64 id) {
void Neighbours::hadIHU(u64 id, const SockAddr& addr) {
if(neiType.find(id) == neiType.end()) {
fprintf(stderr, "[WARNING] %lX heard us — yet we did not send "
"anything?\n", id);
addPotentialNei(Neighbour(id, addr));
}
NeiType typ = neiType[id];
lastRecv[id] = time(NULL);
lastIHU[id] = time(NULL);
@ -80,6 +118,26 @@ void Neighbours::hadIHU(u64 id) {
changeNeiType(id, NEI_SYM);
}
void Neighbours::getNeighbours(vector<Neighbour>& out, u64 except, int count) {
vector<list<Neighbour>::iterator > permutation;
for(auto it = symNei.begin(); it != symNei.end(); ++it)
permutation.push_back(it);
for(size_t i=0; i < permutation.size(); i++) {
int swapWith = rand() % permutation.size();
auto tmp = permutation[i];
permutation[i] = permutation[swapWith];
permutation[swapWith] = tmp;
}
for(int cur=0, pos=0; cur < count && (size_t)pos < permutation.size();
pos++) {
if(permutation[pos]->id != except) {
out.push_back(*permutation[pos]);
cur++;
}
}
}
list<Neighbour>* Neighbours::listOfType(NeiType typ) {
switch(typ) {
case NEI_POTENTIAL:
@ -94,8 +152,13 @@ list<Neighbour>* Neighbours::listOfType(NeiType typ) {
throw WrongNeiType();
}
void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) {
if(dataFlooder.find(datId) != dataFlooder.end()) {
dataFlooder.at(datId).gotIHave(from, seqno);
}
}
void Neighbours::changeNeiType(u64 id, NeiType nType) {
printf("TYPE %lX %d\n", id, neiType[id]);
NeiType cType = neiType[id];
if(cType == nType)
return;
@ -106,11 +169,8 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
bool wasSpliced=false;
for(auto it=fromList->begin(); it != fromList->end(); ++it) {
if(it->id == id) {
printf("%ld %ld\n", fromList->size(), toList->size());
toList->push_back(*it); // splice() doesn't work?!
fromList->erase(it);
printf("%ld %ld %d\n", fromList->size(), toList->size(),
toList == &unidirNei);
wasSpliced=true;
break;
}
@ -118,7 +178,6 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
if(!wasSpliced) {
fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id);
}
printf("TYPE %lX %d\n", id, neiType[id]);
}
void Neighbours::updateSendPackets(const Neighbour& nei) {
@ -128,7 +187,6 @@ void Neighbours::updateSendPackets(const Neighbour& nei) {
bool Neighbours::sendEmpty(u64 id) {
if(time(NULL) - lastPckSent[id] >= csts::TIME_RESEND_EMPTY) {
printf("[DBG] sending empty packet to %lX.\n", id);
lastPckSent[id] = time(NULL);
proto->sendEmpty(id);
return true;
@ -138,7 +196,6 @@ bool Neighbours::sendEmpty(u64 id) {
bool Neighbours::sendIHU(u64 id) {
if(time(NULL) - lastIHUSent[id] >= csts::TIME_RESEND_IHU) {
printf("[DBG] sending IHU packet to %lX.\n", id);
lastIHUSent[id] = time(NULL);
lastPckSent[id] = time(NULL);
proto->sendIHU(id);
@ -147,3 +204,10 @@ bool Neighbours::sendIHU(u64 id) {
return false;
}
list<Neighbour>::iterator Neighbours::randPeer(list<Neighbour>* list) {
int nPeerId = rand() % list->size();
auto it = list->begin();
for(int at=0; at < nPeerId; ++at, ++it);
return it;
}

View file

@ -7,14 +7,17 @@
#pragma once
#include <list>
#include <set>
#include <ctime>
#include <unordered_map>
#include "data.h"
#include "protocol.h"
#include "flooder.h"
#include "dataStore.h"
class Neighbours {
public:
Neighbours(Protocol* proto);
Neighbours(Protocol* proto, DataStore* dataStore);
void fullCheck();
/** Cleans the peers lists by removing the expired entries. */
@ -28,16 +31,26 @@ class Neighbours {
void addPotentialNei(const Neighbour& nei);
/** Adds a `Neighbour` to the list of potential neighbours. */
void receivedFrom(u64 id);
void receivedFrom(u64 id, const SockAddr& addr);
/** Signals that a packet was received from `id`, performs the
* appropriate bookkeeping actions.
*/
void hadIHU(u64 id);
void hadIHU(u64 id, const SockAddr& addr);
/** Signals that a IHU was received from `id`, performs the
* appropriate bookkeeping actions.
*/
void getNeighbours(std::vector<Neighbour>& out, u64 except, int count);
/** Fills `out` with at most `count` symetric neighbours, not
* including `except`.
*/
void gotIHave(u64 from, u64 datId, u32 seqno);
/** Notifies the flooders that a `IHave` packet has been received for
* this data from the peer `from`.
*/
private: //meth
class WrongNeiType : public std::exception {};
enum NeiType {
@ -50,13 +63,16 @@ class Neighbours {
bool sendEmpty(u64 id);
bool sendIHU(u64 id);
std::list<Neighbour>::iterator randPeer(std::list<Neighbour>* list);
private:
Protocol* proto;
DataStore* dataStore;
std::list<Neighbour> potentialNei, unidirNei, symNei;
std::unordered_map<u64, time_t> lastRecv, lastIHU;
std::unordered_map<u64, time_t> lastPckSent, lastIHUSent;
std::unordered_map<u64, NeiType> neiType;
time_t lastPeerPeek;
std::unordered_map<u64, Flooder> dataFlooder;
time_t lastPeerPeek, lastSentNR;
};

View file

@ -28,11 +28,18 @@ const u8 TLV_DATA_JPG = 34;
const int TIMEOUT_UNIDIR = 100; // s
const int TIMEOUT_SYM_RECV = 150; // s
const int TIMEOUT_SYM_IHU = 300; // s
const int TIMEOUT_DATA = 35*60; // s
const int TIME_RESEND_IHU = 90; // s
const int TIME_RESEND_EMPTY = 30; // s
const int TIME_REPUBLISH_DATA = 25*60; // s
const int TIME_PEER_PEEK = 30; // s
const int TIME_SEND_NR = 60; // s
const int TIME_RESEND_FLOOD = 3; // s
const int SYM_COUNT_BEFORE_PEEK = 5;
const int NB_NEIGH_PER_NR = 5;
const int FLOOD_RETRIES = 3;
const u16 DEFAULT_PORT = 1192;

View file

@ -7,24 +7,35 @@
#include "packetParser.h"
#include <cstring>
PacketParser::PacketParser(Neighbours* nei, Protocol* proto) :
neighbours(nei), protocol(proto)
PacketParser::PacketParser(Neighbours* nei, Protocol* proto,
DataStore* dataStore) :
neighbours(nei), protocol(proto), dataStore(dataStore)
{}
void PacketParser::parse(Bytes pck) {
void PacketParser::parse(Bytes pck, const SockAddr& addr) {
u64 peerId;
pck >> peerId;
neighbours->receivedFrom(peerId);
neighbours->receivedFrom(peerId, addr);
while(pck.size() > 0) {
readTLV(pck, peerId);
readTLV(pck, peerId, addr);
}
}
void PacketParser::readTLV(Bytes& pck, u64 nei) {
void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) {
u8 type, len;
pck >> type >> len;
printf("[INFO] Analyzing %d\n", type);
if(pck.size() < len) {
fprintf(stderr, "[WARNING] Advertised TLV does not fit in the packet"
".\n");
pck.extract(pck.size());
return;
}
Bytes subpacket = pck.extract(len);
switch(type) {
case csts::TLV_PAD1:
case csts::TLV_PADN:
@ -32,34 +43,34 @@ void PacketParser::readTLV(Bytes& pck, u64 nei) {
case csts::TLV_IHU: {
u64 ihuId;
pck >> ihuId;
subpacket >> ihuId;
if(ihuId != protocol->getSelfId())
break;
neighbours->hadIHU(nei);
neighbours->hadIHU(nei, addr);
break;
}
case csts::TLV_NR:
//TODO
handleNR(nei);
break;
case csts::TLV_NEIGH:
receiveNeigh(pck, len);
receiveNeigh(subpacket);
break;
case csts::TLV_DATA:
receiveData(pck, len, nei);
receiveData(subpacket, nei);
break;
case csts::TLV_IHAVE:
//TODO
receiveIHave(subpacket, nei);
break;
}
}
void PacketParser::receiveNeigh(Bytes& pck, u8 length) {
while(length >= 8+16+2) { /* enough to read one peer */
void PacketParser::receiveNeigh(Bytes& pck) {
while(pck.size() >= 8+16+2) { /* enough to read one peer */
u64 id;
u16 port;
SockAddr addr;
@ -72,17 +83,40 @@ void PacketParser::receiveNeigh(Bytes& pck, u8 length) {
pck >> port;
addr.sin6_port = htons(port);
if(id == protocol->getSelfId())
continue;
fprintf(stderr, "[INFO] Adding neighbour %lX\n", id);
Neighbour nei(id, addr);
neighbours->addPotentialNei(nei);
}
}
void PacketParser::receiveData(Bytes& pck, u8 length, u64 from) {
void PacketParser::receiveData(Bytes& pck, u64 from) {
u32 seqNo;
u64 datId;
pck >> seqNo >> datId;
neighbours->gotIHave(from, datId, seqNo);
protocol->sendIHave(from, datId, seqNo);
//TODO
length++;
dataStore->addData(pck, seqNo, datId, false);
}
void PacketParser::receiveIHave(Bytes& pck, u64 from) {
u32 seqno;
u64 datId;
if(pck.size() < 8+4) {
fprintf(stderr, "[WARNING] IHave too short from %lX\n", from);
return;
}
pck >> seqno >> datId;
neighbours->gotIHave(from, datId, seqno);
}
void PacketParser::handleNR(u64 from) {
std::vector<Neighbour> neigh;
neighbours->getNeighbours(neigh, from, csts::NB_NEIGH_PER_NR);
protocol->sendNeighbours(from, neigh);
}

View file

@ -8,20 +8,24 @@
#include "neighbours.h"
#include "protocol.h"
#include "dataStore.h"
class PacketParser {
public:
PacketParser(Neighbours* nei, Protocol* proto);
PacketParser(Neighbours* nei, Protocol* proto, DataStore* dataStore);
void parse(Bytes pck);
void parse(Bytes pck, const SockAddr& addr);
private: //meth
void readTLV(Bytes& pck, u64 nei);
void receiveNeigh(Bytes& pck, u8 length);
void receiveData(Bytes& pck, u8 length, u64 from);
void readTLV(Bytes& pck, u64 nei, const SockAddr& addr);
void receiveNeigh(Bytes& pck);
void receiveData(Bytes& pck, u64 from);
void receiveIHave(Bytes& pck, u64 from);
void handleNR(u64 from);
private:
Neighbours* neighbours;
Protocol* protocol;
DataStore* dataStore;
};

View file

@ -79,7 +79,8 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
if(rc != (ssize_t)pck.size()) {
//TODO log warning.
}
fprintf(stderr, "[INFO] sending packet to port %hu\n", ntohs(to.sin6_port));
fprintf(stderr, "[INFO] sending packet [%d] to port %hu\n",
body.size() > 0 ? body[0] : -1, ntohs(to.sin6_port));
}
void Protocol::sendBody(const Bytes& body, const u64& id) {
return sendBody(body, addrOfId(id));
@ -130,20 +131,20 @@ void Protocol::sendNeighbours(u64 id, const std::vector<Neighbour>& neigh) {
sendNeighbours(addrOfId(id), neigh);
}
void Protocol::sendData(const SockAddr& to, const char* data, size_t len,
void Protocol::sendData(const SockAddr& to, const Bytes& data,
u32 seqno, u64 datId)
{
Bytes pck;
if(len + 12 >= 1<<8)
if(data.size() + 12 >= 1<<8)
throw DataTooLongError();
pck << csts::TLV_DATA << (u8) (12 + len) << seqno << datId
<< Bytes(data, len);
pck << csts::TLV_DATA << (u8) (12 + data.size()) << seqno << datId
<< data;
sendBody(pck, to);
}
void Protocol::sendData(u64 id, const char* data, size_t len, u32 seqno,
void Protocol::sendData(u64 id, const Bytes& data, u32 seqno,
u64 datId)
{
sendData(addrOfId(id), data, len, seqno, datId);
sendData(addrOfId(id), data, seqno, datId);
}
void Protocol::sendIHave(const SockAddr& to, u64 datId, u32 seqno) {
@ -205,17 +206,11 @@ void Protocol::pollNetwork() {
else {
fprintf(stderr, "[ERROR] Unknown address family %d.\n",
fromAddr.sa_family);
fprintf(stderr, "\t%hu\n", ((SockAddr*)&fromAddr)->sin6_port);
while(data.size() > 0) {
u8 c;
data >> c;
fprintf(stderr, "%02X|", c);
}
fprintf(stderr, "\n");
continue;
}
puts("Received packet");
fprintf(stderr, "[INFO] Received packet [%d]\n",
data.size() > 8 ? data[8] : -1);
AvailPacket pck;
pck.from = convFromAddr;
pck.data = data;

View file

@ -61,10 +61,9 @@ class Protocol {
const std::vector<Neighbour>& neigh);
/** Sends a neighbours list packet */
void sendData(const SockAddr& to, const char* data, size_t len,
u32 seqno, u64 datId);
void sendData(u64 id, const char* data, size_t len,
void sendData(const SockAddr& to, const Bytes& data,
u32 seqno, u64 datId);
void sendData(u64 id, const Bytes& data, u32 seqno, u64 datId);
void sendIHave(const SockAddr& to, u64 datId, u32 seqno);
void sendIHave(u64 id, u64 datId, u32 seqno);