/*************************************************************************** * By Théophile Bastian, 2017 * M1 Network course project at ENS Cachan, Juliusz Chroboczek. * License: WTFPL v2 **************************************************************************/ #include "dataStore.h" #include #include #include using namespace std; DataStore::DataStore(Protocol* proto) : proto(proto) {} DataStore::~DataStore() { } void DataStore::update() { while(!timeEvents.empty()) { TimeEvent evt = timeEvents.top(); if(evt.time > time(NULL)) // We're done for now. break; timeEvents.pop(); switch(evt.type) { case EV_REPUBLISH: handleRepublish(evt.id); break; case EV_EXPIRES: handleExpire(evt.id, evt.seqno); break; } } } void DataStore::addData(Bytes pck, u32 seqno, u64 id, bool mine) { if(curSeqno.find(id) != curSeqno.end() && curSeqno[id] >= seqno) return; data[id] = Data(pck, mine); curSeqno[id] = seqno; if(mine) { timeEvents.push(TimeEvent( time(NULL) + csts::TIME_REPUBLISH_DATA, seqno, id, EV_REPUBLISH)); } else { timeEvents.push(TimeEvent( time(NULL) + csts::TIMEOUT_DATA, seqno, id, EV_EXPIRES)); // If it is not renewed, it will expire. } recvTime[id] = time(NULL); fprintf(stderr, "[INFO] Storing data %lX (%u)\n", id, seqno); handleFlood(id); } ssize_t DataStore::dataSize(u64 id) { if(data.find(id) == data.end()) return -1; return data[id].data.size(); } ssize_t DataStore::readData(u64 id, char* buffer, size_t size) { if(data.find(id) == data.end()) return -1; ssize_t len = min((ssize_t)size, dataSize(id)); data[id].data.writeToBuffer(buffer, size); return len; } void DataStore::ids(std::vector& out) { for(auto& dat : data) out.push_back(dat.first); } void DataStore::setFlooded(u64 id) { toFlood_.erase(id); } void DataStore::dump() { for(auto it=data.begin(); it != data.end(); ++it) { printf(">> DATA %lX (%u) ", it->first, curSeqno[it->first]); Bytes dat = it->second.data; if(dat.size() < 2) { printf("INVALID\n"); continue; } u8 type, len; dat >> type >> len; if(type == csts::TLV_DATA_TEXT) { char val[1024]; dat.writeToBuffer(val, 1023); val[min((int)dat.size(), 1023)] = '\0'; printf("'%s'\n", val); } else printf("type=%d\n", type); } } void DataStore::handleExpire(u64 id, u32 seqno) { if(seqno < curSeqno[id]) return; // Was updated in time cleanData(id); } void DataStore::handleRepublish(u64 datId) { if(data.find(datId) == data.end() || !data[datId].isMine) { return; } curSeqno[datId] = time(NULL); timeEvents.push(TimeEvent( time(NULL) + csts::TIME_REPUBLISH_DATA, curSeqno[datId], datId, EV_REPUBLISH)); handleFlood(datId); } void DataStore::handleFlood(u64 id) { toFlood_.insert(id); } void DataStore::cleanData(u64 id) { curSeqno.erase(id); recvTime.erase(id); data.erase(id); // Deletes the Bytes as well }