Compare commits

...

2 commits

Author SHA1 Message Date
Théophile Bastian a31d9e29eb Dump data on RETURN ; FIX sending
Packet sending was broken: active iterators could be invalidated.
2016-11-27 12:16:56 +01:00
Théophile Bastian 32b56c6c91 Aggregate packets before sending. 2016-11-27 09:49:28 +01:00
9 changed files with 137 additions and 16 deletions

View file

@ -33,3 +33,6 @@ données).
Le programme produit des logs verbeux mais humainement lisibles sur sa sortie
d'erreur (stderr).
Le programme affiche son état actuel (voisins + infos sur eux, données + infos
sur elles) lors d'un appui sur RETURN.

View file

@ -83,6 +83,27 @@ void DataStore::setFlooded(u64 id) {
toFlood_.erase(id);
}
void DataStore::dump() {
for(auto it=data.begin(); it != data.end(); ++it) {
printf(">> DATA %lX (%u) ", it->first, curSeqno[it->first]);
Bytes dat = it->second.data;
if(dat.size() < 2) {
printf("INVALID\n");
continue;
}
u8 type, len;
dat >> type >> len;
if(type == csts::TLV_DATA_TEXT) {
char val[1024];
dat.writeToBuffer(val, 1023);
val[min((int)dat.size(), 1023)] = '\0';
printf("'%s'\n", val);
}
else
printf("type=%d\n", type);
}
}
void DataStore::handleExpire(u64 id, u32 seqno) {
if(seqno < curSeqno[id])
return; // Was updated in time

View file

@ -55,6 +55,9 @@ class DataStore {
void setFlooded(u64 id);
/** Marks a data as flooded */
void dump();
/** Dumps everything on STDIN */
private: //meth
void handleExpire(u64 id, u32 seqno);
void handleRepublish(u64 datId);

View file

@ -20,6 +20,26 @@
bool terminate=false;
char readStdin() {
fd_set fdSet;
FD_ZERO(&fdSet);
FD_SET(STDIN_FILENO, &fdSet);
struct timeval tv = {1, 0};
if(select(STDIN_FILENO+1, &fdSet, NULL, NULL, &tv) < 0) {
perror("[WARNING] Bad select");
return 0;
}
else {
if(FD_ISSET(STDIN_FILENO, &fdSet)) {
char c = getchar();
fflush(stdin);
return c;
}
return 0;
}
}
int main(int argc, char** argv) {
bool hasConfig = false;
char* configFilePath = nullptr;
@ -83,7 +103,13 @@ int main(int argc, char** argv) {
dataStore.update();
sleep(1);
proto.sendAllNow();
// sleep(1);
char c = readStdin();
if(c != 0) {
dataStore.dump();
neighboursManager.dump();
}
}
return 0;

View file

@ -167,6 +167,36 @@ void Neighbours::gotIHave(u64 from, u64 datId, u32 seqno) {
}
}
void Neighbours::dump() {
for(auto nei : potentialNei)
dumpNei(nei, NEI_POTENTIAL);
for(auto nei : unidirNei)
dumpNei(nei, NEI_UNIDIR);
for(auto nei : symNei)
dumpNei(nei, NEI_SYM);
}
void Neighbours::dumpNei(const Neighbour& nei, NeiType type) {
printf(">> NEIGHBOUR ");
switch(type) {
case NEI_POTENTIAL:
printf("potential ");
break;
case NEI_UNIDIR:
printf("unidir ");
break;
case NEI_SYM:
printf("sym ");
break;
default:
break;
}
printf("%lX lastPck=%ld lastIHU=%ld stateMatch=%d\n",
nei.id, time(NULL) - lastRecv[nei.id],
time(NULL) - lastIHU[nei.id],
type == neiType[nei.id]);
}
void Neighbours::changeNeiType(u64 id, NeiType nType) {
NeiType cType = neiType[id];
if(cType == nType)

View file

@ -52,12 +52,17 @@ class Neighbours {
* this data from the peer `from`.
*/
void dump();
/** Dumps everything to STDIN. */
private: //meth
class WrongNeiType : public std::exception {};
enum NeiType {
NEI_UNDEF, NEI_POTENTIAL, NEI_UNIDIR, NEI_SYM
};
void dumpNei(const Neighbour& nei, NeiType type);
std::list<Neighbour>* listOfType(NeiType typ);
void changeNeiType(u64 id, NeiType nType);
void updateSendPackets(const Neighbour& nei);

View file

@ -38,6 +38,7 @@ const int TIME_RESEND_FLOOD = 3; // s
const int SYM_COUNT_BEFORE_PEEK = 5;
const int NB_NEIGH_PER_NR = 5;
const int DFT_MAX_MTU = 1460; // bytes
const int FLOOD_RETRIES = 3;

View file

@ -65,22 +65,18 @@ Bytes Protocol::getPacket(SockAddr* from) {
return pck.data;
}
void Protocol::sendBody(const Bytes& body, const SockAddr& to) {
Bytes pck;
pck << csts::MAGIC << csts::VERSION << (u16)body.size() << selfId
<< body;
char buffer[MAX_MTU];
pck.writeToBuffer(buffer, MAX_MTU);
ssize_t rc =
sendto(sock, buffer, pck.size(), 0, (struct sockaddr*)&to, sizeof(to));
if(rc != (ssize_t)pck.size()) {
fprintf(stderr, "[WARNING] Whole packet not sent\n");
}
}
void Protocol::sendBody(const Bytes& body, const u64& id) {
fprintf(stderr, "[INFO] sending packet [%d] to %lX\n",
body.size() > 0 ? body[0] : -1, id);
return sendBody(body, addrOfId(id));
if(aggregatedTLVs.find(id) != aggregatedTLVs.end()) { // TLVs are waiting
if(aggregatedTLVs[id].size() + body.size() + 12 > csts::DFT_MAX_MTU) {
// Can't wait
sendNow(id);
}
}
aggregatedTLVs[id] << body;
}
void Protocol::sendEmpty(u64 to) {
@ -132,6 +128,12 @@ void Protocol::sendIHave(u64 to, u64 datId, u32 seqno) {
sendBody(pck, to);
}
void Protocol::sendAllNow() {
for(const auto& it : aggregatedTLVs)
sendNow(it.first, false);
aggregatedTLVs.clear();
}
void Protocol::startPollNetwork() {
pollThread = std::thread([this] { this->pollNetwork(); });
}
@ -216,3 +218,23 @@ SockAddr Protocol::addrOfV4(const sockaddr_in& addrv4) {
return out;
}
void Protocol::sendNow(u64 id, bool erase) {
if(aggregatedTLVs.find(id) == aggregatedTLVs.end())
return; // Nothing to send.
Bytes aggregated = aggregatedTLVs[id];
if(erase)
aggregatedTLVs.erase(id);
Bytes pck;
pck << csts::MAGIC << csts::VERSION << (u16)aggregated.size() << selfId
<< aggregated;
char buffer[MAX_MTU];
pck.writeToBuffer(buffer, MAX_MTU);
SockAddr destAddr = addrOfId(id);
ssize_t rc = sendto(sock, buffer, pck.size(), 0,
(struct sockaddr*)&destAddr, sizeof(destAddr));
if(rc != (ssize_t)pck.size()) {
fprintf(stderr, "[WARNING] Whole packet not sent\n");
}
}

View file

@ -40,9 +40,10 @@ class Protocol {
Bytes getPacket(SockAddr* from);
/** Appends the body of a full packet to `out`. */
void sendBody(const Bytes& body, const SockAddr& to);
void sendBody(const Bytes& body, const u64& id);
/** Sends the given `body` (wrapped in headers) */
/** Sends the given `body` (wrapped in headers)
* Actually, this puts the `body` in an aggregated TLVs bytes and
* waits up to 500ms before sending. */
void sendEmpty(u64 to);
@ -64,12 +65,19 @@ class Protocol {
u64 getSelfId() const { return selfId; }
/** Returns the ID of this node. */
void sendAllNow();
/** Notifies the object that it should send every waiting packet now.
*/
private: //meth
void startPollNetwork();
void pollNetwork();
const SockAddr& addrOfId(u64 id);
SockAddr addrOfV4(const sockaddr_in& addrv4);
void sendNow(u64 id, bool erase=true);
/** Sends the aggregated TLVs right now. */
private:
struct AvailPacket {
SockAddr from;
@ -85,5 +93,7 @@ class Protocol {
std::mutex availPacketsMutex;
std::queue<AvailPacket> availPackets;
std::unordered_map<u64, SockAddr> addrMap;
std::unordered_map<u64, Bytes> aggregatedTLVs;
};