/*************************************************************************** * By Théophile Bastian, 2017 * M1 Network course project at ENS Cachan, Juliusz Chroboczek. * License: WTFPL v2 **************************************************************************/ #include "protocol.h" #include using namespace std; const size_t MAX_MTU = (1 << 16) + 42; Protocol::Protocol(const SockAddr& listenAddr, u64 selfId) : sock(socket(PF_INET6, SOCK_DGRAM, 0)), listenAddr(listenAddr), selfId(selfId), terminating(false) { int reuseVal = 1; if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseVal, sizeof(reuseVal))) { perror("Could not set SO_REUSEADDR for socket"); throw NwError(); } if(bind(sock, (struct sockaddr*)(&listenAddr), sizeof(listenAddr)) != 0) { perror("Cannot bind socket"); throw NwError(); } // Set socket reception timeout struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 100000; if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { perror("Cannot set socket timeout"); throw NwError(); } startPollNetwork(); } Protocol::~Protocol() { terminating = true; pollThread.join(); } void Protocol::addIdAddr(const SockAddr& addr, u64 id) { addrMap.insert({id, addr}); } bool Protocol::readyRead() const { return !availPackets.empty(); } Bytes Protocol::getPacket(SockAddr* from) { if(!readyRead()) return Bytes(); AvailPacket pck; { lock_guard(this->availPacketsMutex); pck = availPackets.front(); availPackets.pop(); } if(from != NULL) *from = pck.from; return pck.data; } void Protocol::sendBody(const Bytes& body, const u64& id) { fprintf(stderr, "[INFO] sending packet [%d] to %lX\n", body.size() > 0 ? body[0] : -1, id); if(aggregatedTLVs.find(id) != aggregatedTLVs.end()) { // TLVs are waiting if(aggregatedTLVs[id].size() + body.size() + 12 > csts::DFT_MAX_MTU) { // Can't wait sendNow(id); } } aggregatedTLVs[id] << body; } void Protocol::sendEmpty(u64 to) { sendBody(Bytes(), to); } void Protocol::sendIHU(u64 id) { Bytes pck; pck << csts::TLV_IHU << (u8) 8 << id; sendBody(pck, id); } void Protocol::sendNReq(u64 to) { Bytes pck; pck << csts::TLV_NR << (u8) 0; sendBody(pck, to); } void Protocol::sendNeighbours(u64 to, const std::vector& neigh) { Bytes pck; u8 len = neigh.size() * (8+16+2); pck << csts::TLV_NEIGH << len; for(const Neighbour& nei : neigh) { u8 addr[16]; memcpy(addr, nei.addr.sin6_addr.s6_addr, 16); pck << nei.id; for(int b=0; b < 16; b++) pck << addr[b]; pck << (u16)nei.addr.sin6_port; } sendBody(pck, to); } void Protocol::sendData(u64 to, const Bytes& data, u32 seqno, u64 datId) { Bytes pck; if(data.size() + 12 >= 1<<8) throw DataTooLongError(); pck << csts::TLV_DATA << (u8) (12 + data.size()) << seqno << datId << data; sendBody(pck, to); } void Protocol::sendIHave(u64 to, u64 datId, u32 seqno) { Bytes pck; pck << csts::TLV_IHAVE << (u8) 12 << seqno << datId; sendBody(pck, to); } void Protocol::sendAllNow() { for(const auto& it : aggregatedTLVs) sendNow(it.first, false); aggregatedTLVs.clear(); } void Protocol::startPollNetwork() { pollThread = std::thread([this] { this->pollNetwork(); }); } void Protocol::pollNetwork() { u8 buffer[MAX_MTU]; while(!terminating) { struct sockaddr fromAddr; memset(&fromAddr, 0, sizeof(fromAddr)); socklen_t fromAddrLen=sizeof(fromAddr); ssize_t readDat = recvfrom(sock, buffer, MAX_MTU, 0, &fromAddr, &fromAddrLen); if(readDat < 0) { if(errno == 11) // Socket timeout - expected behaviour continue; perror("[WARNING] Bad packet"); continue; } if(readDat == 0) { fprintf(stderr, "[WARNING] Empty packet.\n"); continue; } Bytes data(buffer, readDat); u8 magic, version; data >> magic >> version; if(magic != csts::MAGIC) { fprintf(stderr, "[WARNING] Bad magic byte %u\n", magic); continue; } if(version != csts::VERSION) { fprintf(stderr, "[WARNING] Bad version %d\n", version); continue; } u16 bodyLen; data >> bodyLen; if(data.size() < bodyLen + 8u) { fprintf(stderr, "[WARNING] Body too short (%lu < %d)\n", data.size(), bodyLen+8u); continue; } else if(data.size() != bodyLen + 8u) { fprintf(stderr, "[WARNING] Body too long\n"); } SockAddr convFromAddr; if(fromAddr.sa_family == AF_INET) convFromAddr = addrOfV4(*((struct sockaddr_in*)&fromAddr)); else if(fromAddr.sa_family == AF_INET6) convFromAddr = *(SockAddr*)&fromAddr; else { fprintf(stderr, "[ERROR] Unknown address family %d.\n", fromAddr.sa_family); continue; } AvailPacket pck; pck.from = convFromAddr; pck.data = data; { lock_guard(this->availPacketsMutex); availPackets.push(pck); } } } const SockAddr& Protocol::addrOfId(u64 id) { try { return addrMap.at(id); } catch(std::out_of_range& e) { throw UnknownId(id); } } SockAddr Protocol::addrOfV4(const sockaddr_in& addrv4) { SockAddr out; memset(&out, 0, sizeof(SockAddr)); out.sin6_family = AF_INET6; out.sin6_port = addrv4.sin_port; char addr6[17]; inet_pton(AF_INET6, "::ffff:0:0", addr6); memcpy(addr6 + 12, &addrv4.sin_addr.s_addr, 4); return out; } void Protocol::sendNow(u64 id, bool erase) { if(aggregatedTLVs.find(id) == aggregatedTLVs.end()) return; // Nothing to send. Bytes aggregated = aggregatedTLVs[id]; if(erase) aggregatedTLVs.erase(id); Bytes pck; pck << csts::MAGIC << csts::VERSION << (u16)aggregated.size() << selfId << aggregated; char buffer[MAX_MTU]; pck.writeToBuffer(buffer, MAX_MTU); SockAddr destAddr = addrOfId(id); ssize_t rc = sendto(sock, buffer, pck.size(), 0, (struct sockaddr*)&destAddr, sizeof(destAddr)); if(rc != (ssize_t)pck.size()) { fprintf(stderr, "[WARNING] Whole packet not sent\n"); } }