From c0f1c55e03a2e2c87219d94678959a739d495754 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Bastian?= Date: Sat, 26 Nov 2016 16:19:38 +0100 Subject: [PATCH] Classes for data strage, flooding --- dataStore.cpp | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++ dataStore.h | 98 ++++++++++++++++++++++++++++++++++++++++++++ flooder.cpp | 54 +++++++++++++++++++++++++ flooder.h | 49 ++++++++++++++++++++++ 4 files changed, 311 insertions(+) create mode 100644 dataStore.cpp create mode 100644 dataStore.h create mode 100644 flooder.cpp create mode 100644 flooder.h diff --git a/dataStore.cpp b/dataStore.cpp new file mode 100644 index 0000000..a801788 --- /dev/null +++ b/dataStore.cpp @@ -0,0 +1,110 @@ +/*************************************************************************** + * By Théophile Bastian, 2017 + * M1 Network course project at ENS Cachan, Juliusz Chroboczek. + * License: WTFPL v2 + **************************************************************************/ + +#include "dataStore.h" +#include +#include +using namespace std; + +DataStore::DataStore(Protocol* proto) : proto(proto) +{} + +DataStore::~DataStore() { +} + +void DataStore::update() { + while(!timeEvents.empty()) { + const TimeEvent& evt = timeEvents.top(); + if(evt.time > time(NULL)) // We're done for now. + break; + + timeEvents.pop(); + switch(evt.type) { + case EV_REPUBLISH: + handleRepublish(evt.id); + break; + + case EV_EXPIRES: + handleExpire(evt.id, evt.seqno); + break; + } + } +} + +void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) { + if(curSeqno.find(id) != curSeqno.end() && curSeqno[id] >= seqno) + return; + + data[id] = Data(pck, mine); + curSeqno[id] = seqno; + + if(mine) { + timeEvents.push(TimeEvent( + time(NULL) + csts::TIME_REPUBLISH_DATA, + seqno, id, EV_REPUBLISH)); + + } + else { + timeEvents.push(TimeEvent( + time(NULL) + csts::TIMEOUT_DATA, + seqno, id, EV_EXPIRES)); + // If it is not renewed, it will expire. + } + + recvTime[id] = time(NULL); + fprintf(stderr, "[INFO] Storing data %lX (%u)\n", id, seqno); + + handleFlood(id); +} + +ssize_t DataStore::dataSize(u64 id) { + if(data.find(id) == data.end()) + return -1; + return data[id].data.size(); +} + +ssize_t DataStore::readData(u64 id, char* buffer, size_t size) { + if(data.find(id) == data.end()) + return -1; + ssize_t len = min((ssize_t)size, dataSize(id)); + data[id].data.writeToBuffer(buffer, size); + return len; +} + +void DataStore::ids(std::vector& out) { + for(auto& dat : data) + out.push_back(dat.first); +} + +void DataStore::setFlooded(u64 id) { + toFlood_.erase(id); +} + +void DataStore::handleExpire(u64 id, u32 seqno) { + if(seqno < curSeqno[id]) + return; // Was updated in time + + cleanData(id); +} + +void DataStore::handleRepublish(u64 id) { + if(data.find(id) == data.end() || !data[id].isMine) + return; + + curSeqno[id] = time(NULL); + handleFlood(id); +} + +void DataStore::handleFlood(u64 id) { + toFlood_.insert(id); +} + +void DataStore::cleanData(u64 id) { + curSeqno.erase(id); + recvTime.erase(id); + data.erase(id); // Deletes the Bytes as well +} + diff --git a/dataStore.h b/dataStore.h new file mode 100644 index 0000000..8c31f4f --- /dev/null +++ b/dataStore.h @@ -0,0 +1,98 @@ +/*************************************************************************** + * By Théophile Bastian, 2017 + * M1 Network course project at ENS Cachan, Juliusz Chroboczek. + * License: WTFPL v2 + **************************************************************************/ + +#pragma once +#include +#include +#include +#include "data.h" +#include "nw_constants.h" +#include "protocol.h" +#include "Bytes.h" + +class DataStore { + public: + DataStore(Protocol* proto); + ~DataStore(); + + void update(); + /** Performs bookkeeping actions. Sends packets. This function should + * be called often enough. + */ + + void addData(Bytes pck, u32 seqno, u64 id, bool mine=false); + /** Adds a data in the data storage. If `mine` is `true`, the current + * node considers itself responsible for the data and will republish + * it when needed. + */ + + ssize_t dataSize(u64 id); + /** Gets the size of the data with id `id`. Returns -1 if there is no + * such data stored. + */ + + ssize_t readData(u64 id, char* buffer, size_t size); + /** Fills `buffer` with the data of the given `id`. + * If the data does not exist, returs -1; else, returns the length of + * data actually retrieved. + */ + + const Bytes& operator[](u64 id) { + return data[id].data; + } + + u32 getSeqno(u64 id) const { return curSeqno.at(id); } + + void ids(std::vector& out); + /** Fills `out` with the IDs of the stored data. */ + + const std::set toFlood() const { return toFlood_; } + /** Returns a list of data IDs that should be flooded. */ + + void setFlooded(u64 id); + /** Marks a data as flooded */ + + private: //meth + void handleExpire(u64 id, u32 seqno); + void handleRepublish(u64 id); + void handleFlood(u64 id); + + void cleanData(u64 id); + + private: + enum EvType { + EV_REPUBLISH, EV_EXPIRES + }; + struct TimeEvent { + TimeEvent(time_t time, u32 seqno, u64 id, EvType type) : + time(time), seqno(seqno), id(id), type(type) {} + time_t time; + u32 seqno; + u64 id; + EvType type; + + bool operator<(const TimeEvent& e) const { + return time > e.time; // Max-priority queue + } + }; + + struct Data { + Data() : data(), isMine(false) {} + Data(Bytes b) : data(b), isMine(false) {} + Data(Bytes b, bool mine) : data(b), isMine(mine) {} + Bytes data; + bool isMine; + }; + + std::priority_queue timeEvents; + Protocol* proto; + std::unordered_map data; + std::unordered_map curSeqno; + std::unordered_map recvTime; + + std::set toFlood_; +}; + diff --git a/flooder.cpp b/flooder.cpp new file mode 100644 index 0000000..6d54477 --- /dev/null +++ b/flooder.cpp @@ -0,0 +1,54 @@ +/*************************************************************************** + * By Théophile Bastian, 2017 + * M1 Network course project at ENS Cachan, Juliusz Chroboczek. + * License: WTFPL v2 + **************************************************************************/ + +#include "flooder.h" + +Flooder::Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, + const std::list& peers) : + data(data), datId(datId), seqno(seqno), proto(proto) +{ + fprintf(stderr, "[DBG] Created flooder for %lX (%u)\n", datId, seqno); + for(auto it=peers.begin(); it != peers.end(); ++it) { + triesCount[it->id] = 0; + toFlood.push(FloodEvt(time(NULL), it->id)); + } + update(); +} + +void Flooder::update() { + while(!toFlood.empty()) { + const FloodEvt& evt = toFlood.top(); + if(evt.time > time(NULL)) + break; + toFlood.pop(); + + if(triesCount[evt.id] > csts::FLOOD_RETRIES) { + fprintf(stderr, "[WARNING] Could not flood to %lX: no IHave.\n", + evt.id); + continue; + } + else if(triesCount[evt.id] < 0) { + fprintf(stderr, "[DBG] Got your IHave, %lX!\n", evt.id); + continue; // IHave received + } + + sendTo(evt.id); + triesCount[evt.id]++; + toFlood.push(FloodEvt(time(NULL) + csts::TIME_RESEND_FLOOD, evt.id)); + } +} + +void Flooder::gotIHave(u64 id, u32 withSeqno) { + if(seqno > withSeqno) + return; + + triesCount[id] = -1; +} + +void Flooder::sendTo(u64 id) { + proto->sendData(id, data, seqno, datId); +} + diff --git a/flooder.h b/flooder.h new file mode 100644 index 0000000..ffeca8e --- /dev/null +++ b/flooder.h @@ -0,0 +1,49 @@ +/*************************************************************************** + * By Théophile Bastian, 2017 + * M1 Network course project at ENS Cachan, Juliusz Chroboczek. + * License: WTFPL v2 + **************************************************************************/ + +#pragma once + +#include +#include +#include +#include +#include "data.h" +#include "nw_constants.h" +#include "protocol.h" + +class Flooder { + public: + Flooder(const Bytes& data, u64 datId, u32 seqno, Protocol* proto, + const std::list& peers); + + void update(); + /** Call often enough (ie ~1s) */ + + void gotIHave(u64 id, u32 withSeqno); + /** Acknoledges the reception of the resource by peer `id`. */ + + bool done() const { return toFlood.empty(); } + + private: //meth + void sendTo(u64 id); + + private: + struct FloodEvt { + FloodEvt(time_t time, u64 id) : time(time), id(id) {} + time_t time; + u64 id; + bool operator<(const FloodEvt& e) const { + return time > e.time; // Max priority queue. + } + }; + std::unordered_map triesCount; + std::priority_queue toFlood; + Bytes data; + u64 datId; + u32 seqno; + Protocol* proto; +}; +