Browse Source

Sends, floods and receives data

datastore-rework
Théophile Bastian 6 years ago
parent
commit
86c91f7488
  1. 40
      configFile.cpp
  2. 8
      configFile.h
  3. 1
      dataStore.cpp
  4. 21
      flooder.cpp
  5. 6
      flooder.h
  6. 22
      main.cpp
  7. 37
      neighbours.cpp
  8. 5
      neighbours.h
  9. 2
      packetParser.cpp
  10. 15
      protocol.cpp

40
configFile.cpp

@ -11,11 +11,7 @@ @@ -11,11 +11,7 @@
using namespace std;
ConfigFile::ConfigFile() {
selfId=0;
for(int i=0; i < 8; i++) {
selfId <<= 8;
selfId += rand() % 0xFF;
}
selfId = randId();
}
bool ConfigFile::read(const char* path) {
@ -44,7 +40,6 @@ bool ConfigFile::read(const char* path) { @@ -44,7 +40,6 @@ bool ConfigFile::read(const char* path) {
addr.sin6_port = htons(port);
if(inet_pton(AF_INET6, addrStr.c_str(), &(addr.sin6_addr)) != 1) {
//TODO proper log
fprintf(stderr, "Could not convert '%s' to IPv6 address\n",
addrStr.c_str());
continue;
@ -52,10 +47,27 @@ bool ConfigFile::read(const char* path) { @@ -52,10 +47,27 @@ bool ConfigFile::read(const char* path) {
bootstrapNodes.push_back(Neighbour(id, addr));
}
else if(attr == "data") {
u64 id;
handle >> hex >> id >> dec;
if(id == 0)
id = randId();
string dataStr;
getline(handle, dataStr);
size_t nonWSPos = 0;
for(; nonWSPos < dataStr.size(); nonWSPos++)
if(dataStr[nonWSPos] != ' ' && dataStr[nonWSPos] != '\t')
break;
if(nonWSPos == dataStr.size())
continue;
data.push_back(make_pair(id, dataStr.substr(nonWSPos)));
}
else if(attr.empty())
continue;
else {
//TODO proper log
fprintf(stderr, "Unknown configuration item: '%s'\n",
attr.c_str());
continue;
@ -83,8 +95,22 @@ bool ConfigFile::write(const char* path) { @@ -83,8 +95,22 @@ bool ConfigFile::write(const char* path) {
<< addr << ' ' << ntohs(nei.addr.sin6_port) << '\n';
}
for(auto dat : data) {
handle << "data " << hex << dat.first << dec << " "
<< dat.second << '\n';
}
handle.close();
return true;
}
u64 ConfigFile::randId() const {
u64 out=0;
for(int i=0; i < 8; i++) {
out <<= 8;
out += rand() % 0xFF;
}
return out;
}

8
configFile.h

@ -26,8 +26,16 @@ class ConfigFile { @@ -26,8 +26,16 @@ class ConfigFile {
};
/** Bootstrap nodes. No setter: wouldn't be useful. */
const std::vector<std::pair<u64, std::string> >& getData() const {
return data;
}
/** Data to publish. */
private:
u64 randId() const;
u64 selfId;
std::vector<Neighbour> bootstrapNodes;
std::vector<std::pair<u64, std::string> > data;
};

1
dataStore.cpp

@ -45,7 +45,6 @@ void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) { @@ -45,7 +45,6 @@ void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) {
timeEvents.push(TimeEvent(
time(NULL) + csts::TIME_REPUBLISH_DATA,
seqno, id, EV_REPUBLISH));
}
else {
timeEvents.push(TimeEvent(

21
flooder.cpp

@ -10,7 +10,6 @@ Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, @@ -10,7 +10,6 @@ Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers) :
data(data), datId(datId), seqno(seqno), proto(proto)
{
fprintf(stderr, "[DBG] Created flooder for %lX (%u)\n", datId, seqno);
for(auto it=peers.begin(); it != peers.end(); ++it) {
triesCount[it->id] = 0;
toFlood.push(FloodEvt(time(NULL), it->id));
@ -18,11 +17,23 @@ Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, @@ -18,11 +17,23 @@ Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
update();
}
Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
u64 peer) :
data(data), datId(datId), seqno(seqno), proto(proto)
{
addPeer(peer);
update();
}
Flooder::~Flooder() {
}
void Flooder::update() {
while(!toFlood.empty()) {
const FloodEvt& evt = toFlood.top();
if(evt.time > time(NULL))
if(evt.time > time(NULL)) {
break;
}
toFlood.pop();
if(triesCount[evt.id] > csts::FLOOD_RETRIES) {
@ -31,7 +42,6 @@ void Flooder::update() { @@ -31,7 +42,6 @@ void Flooder::update() {
continue;
}
else if(triesCount[evt.id] < 0) {
fprintf(stderr, "[DBG] Got your IHave, %lX!\n", evt.id);
continue; // IHave received
}
@ -48,6 +58,11 @@ void Flooder::gotIHave(u64 id, u32 withSeqno) { @@ -48,6 +58,11 @@ void Flooder::gotIHave(u64 id, u32 withSeqno) {
triesCount[id] = -1;
}
void Flooder::addPeer(u64 peer) {
triesCount[peer] = 0;
toFlood.push(FloodEvt(time(NULL), peer));
}
void Flooder::sendTo(u64 id) {
proto->sendData(id, data, seqno, datId);
}

6
flooder.h

@ -18,6 +18,9 @@ class Flooder { @@ -18,6 +18,9 @@ class Flooder {
public:
Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers);
Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
u64 peer);
~Flooder();
void update();
/** Call often enough (ie ~1s) */
@ -25,6 +28,9 @@ class Flooder { @@ -25,6 +28,9 @@ class Flooder {
void gotIHave(u64 id, u32 withSeqno);
/** Acknoledges the reception of the resource by peer `id`. */
void addPeer(u64 peer);
/** Notifies that a new peer must be flooded. */
bool done() const { return toFlood.empty(); }
private: //meth

22
main.cpp

@ -16,6 +16,9 @@ @@ -16,6 +16,9 @@
#include <cstdlib>
#include <ctime>
#include <unistd.h>
#include <signal.h>
bool terminate=false;
int main(int argc, char** argv) {
bool hasConfig = false;
@ -27,6 +30,8 @@ int main(int argc, char** argv) { @@ -27,6 +30,8 @@ int main(int argc, char** argv) {
srand(time(NULL)+42);
signal(SIGINT, [](int) { terminate = true; });
ConfigFile cfg;
if(hasConfig) {
if(!cfg.read(configFilePath) || !cfg.write(configFilePath)) {
@ -40,24 +45,33 @@ int main(int argc, char** argv) { @@ -40,24 +45,33 @@ int main(int argc, char** argv) {
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(csts::DEFAULT_PORT);
printf("%lX\n", cfg.getSelfId());
fprintf(stderr, "[INFO] Starting with ID %lX\n", cfg.getSelfId());
Protocol proto(addr, cfg.getSelfId());
DataStore dataStore(&proto);
for(auto dat : cfg.getData()) {
Bytes pck;
pck << csts::TLV_DATA_TEXT << (u8)dat.second.size();
for(u8 chr : dat.second)
pck << chr;
dataStore.addData(pck, time(NULL), dat.first, true);
fprintf(stderr, "[INFO] Adding data `%s`\n", dat.second.c_str());
}
Neighbours neighboursManager(&proto, &dataStore);
for(const Neighbour& nei : cfg.getBootstrapNodes()) {
char addr[54];
inet_ntop(AF_INET6, &nei.addr.sin6_addr, addr, 54);
printf("Neigh: %lX [%s]:%hu\n", nei.id, addr,
ntohs(nei.addr.sin6_port));
fprintf(stderr, "[INFO] Bootstrap neighbour: %lX [%s]:%hu\n",
nei.id, addr, ntohs(nei.addr.sin6_port));
neighboursManager.addPotentialNei(nei);
}
PacketParser pckParser(&neighboursManager, &proto, &dataStore);
while(true) {
while(!terminate) {
neighboursManager.fullUpdate();
while(proto.readyRead()) {

37
neighbours.cpp

@ -15,6 +15,12 @@ Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) : @@ -15,6 +15,12 @@ Neighbours::Neighbours(Protocol* proto, DataStore* dataStore) :
proto(proto), dataStore(dataStore), lastPeerPeek(0), lastSentNR(0)
{}
Neighbours::~Neighbours() {
for(auto fl : dataFlooder) {
delete fl.second;
}
}
void Neighbours::fullCheck() {
for(auto nei : neiType) {
switch(nei.second) {
@ -66,17 +72,19 @@ void Neighbours::fullUpdate() { @@ -66,17 +72,19 @@ void Neighbours::fullUpdate() {
for(auto flooder=dataFlooder.begin(); flooder != dataFlooder.end(); )
{
if(flooder->second.done())
if(flooder->second->done()) {
delete flooder->second;
flooder = dataFlooder.erase(flooder);
}
else {
flooder->second.update();
flooder->second->update();
++flooder;
}
}
for(u64 datId : dataStore->toFlood()) {
dataFlooder.insert({datId,
Flooder(
new Flooder(
(*dataStore)[datId], datId, dataStore->getSeqno(datId),
proto, symNei)});
dataStore->setFlooded(datId);
@ -154,7 +162,7 @@ list<Neighbour>* Neighbours::listOfType(NeiType typ) { @@ -154,7 +162,7 @@ list<Neighbour>* Neighbours::listOfType(NeiType typ) {
void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) {
if(dataFlooder.find(datId) != dataFlooder.end()) {
dataFlooder.at(datId).gotIHave(from, seqno);
dataFlooder.at(datId)->gotIHave(from, seqno);
}
}
@ -177,6 +185,11 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) { @@ -177,6 +185,11 @@ void Neighbours::changeNeiType(u64 id, NeiType nType) {
}
if(!wasSpliced) {
fprintf(stderr, "[ERROR] Node %lX wasn't found (type change)\n", id);
return;
}
if(nType == NEI_SYM) {
floodTo(id);
}
}
@ -211,3 +224,19 @@ list<Neighbour>::iterator Neighbours::randPeer(list<Neighbour>* list) { @@ -211,3 +224,19 @@ list<Neighbour>::iterator Neighbours::randPeer(list<Neighbour>* list) {
return it;
}
void Neighbours::floodTo(u64 peer) {
vector<u64> datIds;
dataStore->ids(datIds);
for(u64 datId : datIds) {
auto curFlooder = dataFlooder.find(datId);
if(curFlooder != dataFlooder.end() && !curFlooder->second->done()) {
curFlooder->second->addPeer(peer);
}
else {
dataFlooder.insert({datId, new Flooder(
(*dataStore)[datId], datId,
dataStore->getSeqno(datId), proto, peer)});
}
}
}

5
neighbours.h

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
class Neighbours {
public:
Neighbours(Protocol* proto, DataStore* dataStore);
~Neighbours();
void fullCheck();
/** Cleans the peers lists by removing the expired entries. */
@ -65,6 +66,8 @@ class Neighbours { @@ -65,6 +66,8 @@ class Neighbours {
bool sendIHU(u64 id);
std::list<Neighbour>::iterator randPeer(std::list<Neighbour>* list);
void floodTo(u64 peer);
private:
Protocol* proto;
DataStore* dataStore;
@ -72,7 +75,7 @@ class Neighbours { @@ -72,7 +75,7 @@ class Neighbours {
std::unordered_map<u64, time_t> lastRecv, lastIHU;
std::unordered_map<u64, time_t> lastPckSent, lastIHUSent;
std::unordered_map<u64, NeiType> neiType;
std::unordered_map<u64, Flooder> dataFlooder;
std::unordered_map<u64, Flooder*> dataFlooder;
time_t lastPeerPeek, lastSentNR;
};

2
packetParser.cpp

@ -26,7 +26,7 @@ void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) { @@ -26,7 +26,7 @@ void PacketParser::readTLV(Bytes& pck, u64 nei, const SockAddr& addr) {
u8 type, len;
pck >> type >> len;
printf("[INFO] Analyzing %d\n", type);
fprintf(stderr, "[INFO] Parsing %d from %lX.\n", type, nei);
if(pck.size() < len) {
fprintf(stderr, "[WARNING] Advertised TLV does not fit in the packet"

15
protocol.cpp

@ -27,7 +27,6 @@ Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) : @@ -27,7 +27,6 @@ Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) :
}
// Set socket reception timeout
/*
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100000;
@ -35,7 +34,6 @@ Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) : @@ -35,7 +34,6 @@ Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) :
perror("Cannot set socket timeout");
throw NwError();
}
*/
startPollNetwork();
}
@ -71,18 +69,17 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) { @@ -71,18 +69,17 @@ void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
Bytes pck;
pck << csts::MAGIC << csts::VERSION << (u16)body.size() << selfId
<< body;
//TODO check size < MTU
char buffer[MAX_MTU];
pck.writeToBuffer(buffer, MAX_MTU);
ssize_t rc =
sendto(sock, buffer, pck.size(), 0, (struct sockaddr*)&to, sizeof(to));
if(rc != (ssize_t)pck.size()) {
//TODO log warning.
fprintf(stderr, "[WARNING] Whole packet not sent\n");
}
fprintf(stderr, "[INFO] sending packet [%d] to port %hu\n",
body.size() > 0 ? body[0] : -1, ntohs(to.sin6_port));
}
void Protocol::sendBody(const Bytes& body, const u64& id) {
fprintf(stderr, "[INFO] sending packet [%d] to %lX\n",
body.size() > 0 ? body[0] : -1, id);
return sendBody(body, addrOfId(id));
}
@ -169,10 +166,12 @@ void Protocol::pollNetwork() { @@ -169,10 +166,12 @@ void Protocol::pollNetwork() {
ssize_t readDat = recvfrom(sock, buffer, MAX_MTU, 0,
&fromAddr, &fromAddrLen);
if(readDat < 0) {
if(errno == 11) // Socket timeout - expected behaviour
continue;
perror("[WARNING] Bad packet");
continue;
}
if(readDat <= 0) {
if(readDat == 0) {
fprintf(stderr, "[WARNING] Empty packet.\n");
continue;
}
@ -209,8 +208,6 @@ void Protocol::pollNetwork() { @@ -209,8 +208,6 @@ void Protocol::pollNetwork() {
continue;
}
fprintf(stderr, "[INFO] Received packet [%d]\n",
data.size() > 8 ? data[8] : -1);
AvailPacket pck;
pck.from = convFromAddr;
pck.data = data;

Loading…
Cancel
Save