Classes for data strage, flooding

This commit is contained in:
Théophile Bastian 2016-11-26 16:19:38 +01:00
parent 143cff86f5
commit c0f1c55e03
4 changed files with 311 additions and 0 deletions

110
dataStore.cpp Normal file
View file

@ -0,0 +1,110 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#include "dataStore.h"
#include <cstring>
#include <algorithm>
using namespace std;
DataStore::DataStore(Protocol* proto) : proto(proto)
{}
DataStore::~DataStore() {
}
void DataStore::update() {
while(!timeEvents.empty()) {
const TimeEvent& evt = timeEvents.top();
if(evt.time > time(NULL)) // We're done for now.
break;
timeEvents.pop();
switch(evt.type) {
case EV_REPUBLISH:
handleRepublish(evt.id);
break;
case EV_EXPIRES:
handleExpire(evt.id, evt.seqno);
break;
}
}
}
void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) {
if(curSeqno.find(id) != curSeqno.end() && curSeqno[id] >= seqno)
return;
data[id] = Data(pck, mine);
curSeqno[id] = seqno;
if(mine) {
timeEvents.push(TimeEvent(
time(NULL) + csts::TIME_REPUBLISH_DATA,
seqno, id, EV_REPUBLISH));
}
else {
timeEvents.push(TimeEvent(
time(NULL) + csts::TIMEOUT_DATA,
seqno, id, EV_EXPIRES));
// If it is not renewed, it will expire.
}
recvTime[id] = time(NULL);
fprintf(stderr, "[INFO] Storing data %lX (%u)\n", id, seqno);
handleFlood(id);
}
ssize_t DataStore::dataSize(u64 id) {
if(data.find(id) == data.end())
return -1;
return data[id].data.size();
}
ssize_t DataStore::readData(u64 id, char* buffer, size_t size) {
if(data.find(id) == data.end())
return -1;
ssize_t len = min((ssize_t)size, dataSize(id));
data[id].data.writeToBuffer(buffer, size);
return len;
}
void DataStore::ids(std::vector<u64>& out) {
for(auto& dat : data)
out.push_back(dat.first);
}
void DataStore::setFlooded(u64 id) {
toFlood_.erase(id);
}
void DataStore::handleExpire(u64 id, u32 seqno) {
if(seqno < curSeqno[id])
return; // Was updated in time
cleanData(id);
}
void DataStore::handleRepublish(u64 id) {
if(data.find(id) == data.end() || !data[id].isMine)
return;
curSeqno[id] = time(NULL);
handleFlood(id);
}
void DataStore::handleFlood(u64 id) {
toFlood_.insert(id);
}
void DataStore::cleanData(u64 id) {
curSeqno.erase(id);
recvTime.erase(id);
data.erase(id); // Deletes the Bytes as well
}

98
dataStore.h Normal file
View file

@ -0,0 +1,98 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#pragma once
#include <queue>
#include <unordered_map>
#include <set>
#include "data.h"
#include "nw_constants.h"
#include "protocol.h"
#include "Bytes.h"
class DataStore {
public:
DataStore(Protocol* proto);
~DataStore();
void update();
/** Performs bookkeeping actions. Sends packets. This function should
* be called often enough.
*/
void addData(Bytes pck, u32 seqno, u64 id, bool mine=false);
/** Adds a data in the data storage. If `mine` is `true`, the current
* node considers itself responsible for the data and will republish
* it when needed.
*/
ssize_t dataSize(u64 id);
/** Gets the size of the data with id `id`. Returns -1 if there is no
* such data stored.
*/
ssize_t readData(u64 id, char* buffer, size_t size);
/** Fills `buffer` with the data of the given `id`.
* If the data does not exist, returs -1; else, returns the length of
* data actually retrieved.
*/
const Bytes& operator[](u64 id) {
return data[id].data;
}
u32 getSeqno(u64 id) const { return curSeqno.at(id); }
void ids(std::vector<u64>& out);
/** Fills `out` with the IDs of the stored data. */
const std::set<u64> toFlood() const { return toFlood_; }
/** Returns a list of data IDs that should be flooded. */
void setFlooded(u64 id);
/** Marks a data as flooded */
private: //meth
void handleExpire(u64 id, u32 seqno);
void handleRepublish(u64 id);
void handleFlood(u64 id);
void cleanData(u64 id);
private:
enum EvType {
EV_REPUBLISH, EV_EXPIRES
};
struct TimeEvent {
TimeEvent(time_t time, u32 seqno, u64 id, EvType type) :
time(time), seqno(seqno), id(id), type(type) {}
time_t time;
u32 seqno;
u64 id;
EvType type;
bool operator<(const TimeEvent& e) const {
return time > e.time; // Max-priority queue
}
};
struct Data {
Data() : data(), isMine(false) {}
Data(Bytes b) : data(b), isMine(false) {}
Data(Bytes b, bool mine) : data(b), isMine(mine) {}
Bytes data;
bool isMine;
};
std::priority_queue<TimeEvent> timeEvents;
Protocol* proto;
std::unordered_map<u64, Data> data;
std::unordered_map<u64, u32> curSeqno;
std::unordered_map<u64, time_t> recvTime;
std::set<u64> toFlood_;
};

54
flooder.cpp Normal file
View file

@ -0,0 +1,54 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#include "flooder.h"
Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers) :
data(data), datId(datId), seqno(seqno), proto(proto)
{
fprintf(stderr, "[DBG] Created flooder for %lX (%u)\n", datId, seqno);
for(auto it=peers.begin(); it != peers.end(); ++it) {
triesCount[it->id] = 0;
toFlood.push(FloodEvt(time(NULL), it->id));
}
update();
}
void Flooder::update() {
while(!toFlood.empty()) {
const FloodEvt& evt = toFlood.top();
if(evt.time > time(NULL))
break;
toFlood.pop();
if(triesCount[evt.id] > csts::FLOOD_RETRIES) {
fprintf(stderr, "[WARNING] Could not flood to %lX: no IHave.\n",
evt.id);
continue;
}
else if(triesCount[evt.id] < 0) {
fprintf(stderr, "[DBG] Got your IHave, %lX!\n", evt.id);
continue; // IHave received
}
sendTo(evt.id);
triesCount[evt.id]++;
toFlood.push(FloodEvt(time(NULL) + csts::TIME_RESEND_FLOOD, evt.id));
}
}
void Flooder::gotIHave(u64 id, u32 withSeqno) {
if(seqno > withSeqno)
return;
triesCount[id] = -1;
}
void Flooder::sendTo(u64 id) {
proto->sendData(id, data, seqno, datId);
}

49
flooder.h Normal file
View file

@ -0,0 +1,49 @@
/***************************************************************************
* By Théophile Bastian, 2017
* M1 Network course project at ENS Cachan, Juliusz Chroboczek.
* License: WTFPL v2 <http://www.wtfpl.net/>
**************************************************************************/
#pragma once
#include <ctime>
#include <queue>
#include <list>
#include <unordered_map>
#include "data.h"
#include "nw_constants.h"
#include "protocol.h"
class Flooder {
public:
Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto,
const std::list<Neighbour>& peers);
void update();
/** Call often enough (ie ~1s) */
void gotIHave(u64 id, u32 withSeqno);
/** Acknoledges the reception of the resource by peer `id`. */
bool done() const { return toFlood.empty(); }
private: //meth
void sendTo(u64 id);
private:
struct FloodEvt {
FloodEvt(time_t time, u64 id) : time(time), id(id) {}
time_t time;
u64 id;
bool operator<(const FloodEvt& e) const {
return time > e.time; // Max priority queue.
}
};
std::unordered_map<u64, int> triesCount;
std::priority_queue<FloodEvt> toFlood;
Bytes data;
u64 datId;
u32 seqno;
Protocol* proto;
};