Compare commits

...

2 commits

Author SHA1 Message Date
Théophile Bastian a31d9e29eb Dump data on RETURN ; FIX sending
Packet sending was broken: active iterators could be invalidated.
2016-11-27 12:16:56 +01:00
Théophile Bastian 32b56c6c91 Aggregate packets before sending. 2016-11-27 09:49:28 +01:00
9 changed files with 137 additions and 16 deletions

View file

@ -33,3 +33,6 @@ données).
Le programme produit des logs verbeux mais humainement lisibles sur sa sortie Le programme produit des logs verbeux mais humainement lisibles sur sa sortie
d'erreur (stderr). d'erreur (stderr).
Le programme affiche son état actuel (voisins + infos sur eux, données + infos
sur elles) lors d'un appui sur RETURN.

View file

@ -83,6 +83,27 @@ void DataStore::setFlooded(u64 id) {
toFlood_.erase(id); toFlood_.erase(id);
} }
void DataStore::dump() {
for(auto it=data.begin(); it != data.end(); ++it) {
printf(">> DATA %lX (%u) ", it->first, curSeqno[it->first]);
Bytes dat = it->second.data;
if(dat.size() < 2) {
printf("INVALID\n");
continue;
}
u8 type, len;
dat >> type >> len;
if(type == csts::TLV_DATA_TEXT) {
char val[1024];
dat.writeToBuffer(val, 1023);
val[min((int)dat.size(), 1023)] = '\0';
printf("'%s'\n", val);
}
else
printf("type=%d\n", type);
}
}
void DataStore::handleExpire(u64 id, u32 seqno) { void DataStore::handleExpire(u64 id, u32 seqno) {
if(seqno < curSeqno[id]) if(seqno < curSeqno[id])
return; // Was updated in time return; // Was updated in time

View file

@ -55,6 +55,9 @@ class DataStore {
void setFlooded(u64 id); void setFlooded(u64 id);
/** Marks a data as flooded */ /** Marks a data as flooded */
void dump();
/** Dumps everything on STDIN */
private: //meth private: //meth
void handleExpire(u64 id, u32 seqno); void handleExpire(u64 id, u32 seqno);
void handleRepublish(u64 datId); void handleRepublish(u64 datId);

View file

@ -20,6 +20,26 @@
bool terminate=false; bool terminate=false;
char readStdin() {
fd_set fdSet;
FD_ZERO(&fdSet);
FD_SET(STDIN_FILENO, &fdSet);
struct timeval tv = {1, 0};
if(select(STDIN_FILENO+1, &fdSet, NULL, NULL, &tv) < 0) {
perror("[WARNING] Bad select");
return 0;
}
else {
if(FD_ISSET(STDIN_FILENO, &fdSet)) {
char c = getchar();
fflush(stdin);
return c;
}
return 0;
}
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
bool hasConfig = false; bool hasConfig = false;
char* configFilePath = nullptr; char* configFilePath = nullptr;
@ -83,7 +103,13 @@ int main(int argc, char** argv) {
dataStore.update(); dataStore.update();
sleep(1); proto.sendAllNow();
// sleep(1);
char c = readStdin();
if(c != 0) {
dataStore.dump();
neighboursManager.dump();
}
} }
return 0; return 0;

View file

@ -167,6 +167,36 @@ void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) {
} }
} }
void Neighbours::dump() {
for(auto nei : potentialNei)
dumpNei(nei, NEI_POTENTIAL);
for(auto nei : unidirNei)
dumpNei(nei, NEI_UNIDIR);
for(auto nei : symNei)
dumpNei(nei, NEI_SYM);
}
void Neighbours::dumpNei(const Neighbour& nei, NeiType type) {
printf(">> NEIGHBOUR ");
switch(type) {
case NEI_POTENTIAL:
printf("potential ");
break;
case NEI_UNIDIR:
printf("unidir ");
break;
case NEI_SYM:
printf("sym ");
break;
default:
break;
}
printf("%lX lastPck=%ld lastIHU=%ld stateMatch=%d\n",
nei.id, time(NULL) - lastRecv[nei.id],
time(NULL) - lastIHU[nei.id],
type == neiType[nei.id]);
}
void Neighbours::changeNeiType(u64 id, NeiType nType) { void Neighbours::changeNeiType(u64 id, NeiType nType) {
NeiType cType = neiType[id]; NeiType cType = neiType[id];
if(cType == nType) if(cType == nType)

View file

@ -52,12 +52,17 @@ class Neighbours {
* this data from the peer `from`. * this data from the peer `from`.
*/ */
void dump();
/** Dumps everything to STDIN. */
private: //meth private: //meth
class WrongNeiType : public std::exception {}; class WrongNeiType : public std::exception {};
enum NeiType { enum NeiType {
NEI_UNDEF, NEI_POTENTIAL, NEI_UNIDIR, NEI_SYM NEI_UNDEF, NEI_POTENTIAL, NEI_UNIDIR, NEI_SYM
}; };
void dumpNei(const Neighbour& nei, NeiType type);
std::list<Neighbour>* listOfType(NeiType typ); std::list<Neighbour>* listOfType(NeiType typ);
void changeNeiType(u64 id, NeiType nType); void changeNeiType(u64 id, NeiType nType);
void updateSendPackets(const Neighbour& nei); void updateSendPackets(const Neighbour& nei);

View file

@ -38,6 +38,7 @@ const int TIME_RESEND_FLOOD = 3; // s
const int SYM_COUNT_BEFORE_PEEK = 5; const int SYM_COUNT_BEFORE_PEEK = 5;
const int NB_NEIGH_PER_NR = 5; const int NB_NEIGH_PER_NR = 5;
const int DFT_MAX_MTU = 1460; // bytes
const int FLOOD_RETRIES = 3; const int FLOOD_RETRIES = 3;

View file

@ -65,22 +65,18 @@ Bytes Protocol::getPacket(SockAddr* from) {
return pck.data; return pck.data;
} }
void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
Bytes pck;
pck << csts::MAGIC << csts::VERSION << (u16)body.size() << selfId
<< body;
char buffer[MAX_MTU];
pck.writeToBuffer(buffer, MAX_MTU);
ssize_t rc =
sendto(sock, buffer, pck.size(), 0, (struct sockaddr*)&to, sizeof(to));
if(rc != (ssize_t)pck.size()) {
fprintf(stderr, "[WARNING] Whole packet not sent\n");
}
}
void Protocol::sendBody(const Bytes& body, const u64& id) { void Protocol::sendBody(const Bytes& body, const u64& id) {
fprintf(stderr, "[INFO] sending packet [%d] to %lX\n", fprintf(stderr, "[INFO] sending packet [%d] to %lX\n",
body.size() > 0 ? body[0] : -1, id); body.size() > 0 ? body[0] : -1, id);
return sendBody(body, addrOfId(id));
if(aggregatedTLVs.find(id) != aggregatedTLVs.end()) { // TLVs are waiting
if(aggregatedTLVs[id].size() + body.size() + 12 > csts::DFT_MAX_MTU) {
// Can't wait
sendNow(id);
}
}
aggregatedTLVs[id] << body;
} }
void Protocol::sendEmpty(u64 to) { void Protocol::sendEmpty(u64 to) {
@ -132,6 +128,12 @@ void Protocol::sendIHave(u64 to, u64 datId, u32 seqno) {
sendBody(pck, to); sendBody(pck, to);
} }
void Protocol::sendAllNow() {
for(const auto& it : aggregatedTLVs)
sendNow(it.first, false);
aggregatedTLVs.clear();
}
void Protocol::startPollNetwork() { void Protocol::startPollNetwork() {
pollThread = std::thread([this] { this->pollNetwork(); }); pollThread = std::thread([this] { this->pollNetwork(); });
} }
@ -216,3 +218,23 @@ SockAddr Protocol::addrOfV4(const sockaddr_in& addrv4) {
return out; return out;
} }
void Protocol::sendNow(u64 id, bool erase) {
if(aggregatedTLVs.find(id) == aggregatedTLVs.end())
return; // Nothing to send.
Bytes aggregated = aggregatedTLVs[id];
if(erase)
aggregatedTLVs.erase(id);
Bytes pck;
pck << csts::MAGIC << csts::VERSION << (u16)aggregated.size() << selfId
<< aggregated;
char buffer[MAX_MTU];
pck.writeToBuffer(buffer, MAX_MTU);
SockAddr destAddr = addrOfId(id);
ssize_t rc = sendto(sock, buffer, pck.size(), 0,
(struct sockaddr*)&destAddr, sizeof(destAddr));
if(rc != (ssize_t)pck.size()) {
fprintf(stderr, "[WARNING] Whole packet not sent\n");
}
}

View file

@ -40,9 +40,10 @@ class Protocol {
Bytes getPacket(SockAddr* from); Bytes getPacket(SockAddr* from);
/** Appends the body of a full packet to `out`. */ /** Appends the body of a full packet to `out`. */
void sendBody(const Bytes& body, const SockAddr& to);
void sendBody(const Bytes& body, const u64& id); void sendBody(const Bytes& body, const u64& id);
/** Sends the given `body` (wrapped in headers) */ /** Sends the given `body` (wrapped in headers)
* Actually, this puts the `body` in an aggregated TLVs bytes and
* waits up to 500ms before sending. */
void sendEmpty(u64 to); void sendEmpty(u64 to);
@ -64,12 +65,19 @@ class Protocol {
u64 getSelfId() const { return selfId; } u64 getSelfId() const { return selfId; }
/** Returns the ID of this node. */ /** Returns the ID of this node. */
void sendAllNow();
/** Notifies the object that it should send every waiting packet now.
*/
private: //meth private: //meth
void startPollNetwork(); void startPollNetwork();
void pollNetwork(); void pollNetwork();
const SockAddr& addrOfId(u64 id); const SockAddr& addrOfId(u64 id);
SockAddr addrOfV4(const sockaddr_in& addrv4); SockAddr addrOfV4(const sockaddr_in& addrv4);
void sendNow(u64 id, bool erase=true);
/** Sends the aggregated TLVs right now. */
private: private:
struct AvailPacket { struct AvailPacket {
SockAddr from; SockAddr from;
@ -85,5 +93,7 @@ class Protocol {
std::mutex availPacketsMutex; std::mutex availPacketsMutex;
std::queue<AvailPacket> availPackets; std::queue<AvailPacket> availPackets;
std::unordered_map<u64, SockAddr> addrMap; std::unordered_map<u64, SockAddr> addrMap;
std::unordered_map<u64, Bytes> aggregatedTLVs;
}; };