diff -r 000000000000 -r 5c129dd80d4f CSc/Node.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSc/Node.c Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,336 @@ +#include "CS.h" + +NodeP thisP; +DebugP deP; +DataP dataP; + +void sighandle(int sig) { return; } + +void bindN() { + LOG(4, "binding..."); + struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(sa, 0, sizeof(struct addrinfo)); + sa->ai_family = AF_INET; + sa->ai_socktype = SOCK_STREAM; + sa->ai_protocol = 0; + sa->ai_flags = AI_PASSIVE; + char s[64]; + sprintf(s, "%d", thisP->locPort); + int e; + if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e)); + GAI(4, sa); + if((thisP->ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc"); + int opt = 1; + if(setsockopt(thisP->ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options"); + if(bind(thisP->ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind"); + if(listen(thisP->ssc, 1) < 0) SYSERR("listen"); + LOG(2, "bound to %d", thisP->locPort); +} +void conn(int i, int remPort) { + DebugT debug, *deP = &debug; + DEBID("%s to %u", thisP->debug.id, remPort); + thisP->cliSides[i].remPort = remPort; + int e; + LOG(4, "connecting to %u...", remPort); + int retry = csP->connThreshold; + struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(ai, 0, sizeof(struct addrinfo)); + ai->ai_family = AF_INET; + ai->ai_socktype = SOCK_STREAM; + ai->ai_protocol = 0; + ai->ai_flags = 0; + char port[6]; + sprintf(port, "%d", remPort); + if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e)); + GAI(3, ai); + if((thisP->cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc"); + while(retry--) { + if(connect(thisP->cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) { + if(errno != ECONNREFUSED) SYSERR("connect"); + usleep(csP->connTO); + } + else break; + } + if(retry < 1) { + LOG(0, "connection refused threshold %d reached", csP->connThreshold); + exit(EXIT_FAILURE); + } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->conns++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = malloc(l); + if(getpeername(thisP->cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->cliSides[i].sc); free(sa); + + if(thisP->ssl) { + ERR_clear_error(); + if(!(thisP->cliSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL"); + if(!SSL_set_fd(thisP->cliSides[i].sslP, thisP->cliSides[i].sc)) SSLERR("client SSL set fd"); + if((e = SSL_connect(thisP->cliSides[i].sslP)) < 1) { + switch(SSL_get_error(thisP->cliSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: SYSERR("SSL connect"); break; + default: SSLERR("SSL connect"); break; + } + } + } + LOG(2, "connected via sc=%d after %d retries", thisP->cliSides[i].sc, csP->connThreshold - (retry + 1)); +} +void acc(int i) { + LOG(4, "accepting..."); + if((thisP->srvSides[i].sc = accept(thisP->ssc, NULL, NULL)) < 0) SYSERR("accept"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = malloc(l); + if(getpeername(thisP->srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->srvSides[i].sc); free(sa); + if(thisP->ssl) { + int e; + ERR_clear_error(); + if(!(thisP->srvSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL"); + if(!SSL_set_fd(thisP->srvSides[i].sslP, thisP->srvSides[i].sc)) SSLERR("server SSL set fd"); + if((e = SSL_accept(thisP->srvSides[i].sslP)) < 1) { + switch(SSL_get_error(thisP->srvSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break; + default: SSLERR("SSL accept"); break; + } + } + } + LOG(2, "accepted"); +} +void closeN(int i, nodeside side) { + SocketP sc; + if(side) sc = thisP->srvSides; else sc = thisP->cliSides; + LOG(5, "closing sc=%d...", sc[i].sc); + if(thisP->ssl) { + int e; + if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)"); + if(!e) { + LOG(5, "SSL shutdown rc=0"); + if((e = SSL_shutdown(sc[i].sslP)) < 0) { + switch(SSL_get_error(sc[i].sslP, e)) { + case SSL_ERROR_SYSCALL: + if(!(e = ERR_get_error())) { + if(errno) SYSERR("SSL shutdown (2)"); + break; + } + break; + default: SSLERR("SSL shutdown (2)"); break; + } + } + } + } + close(sc[i].sc); + LOG(4, "closed sc=%d", sc[i].sc); + sc[i].sc = -1; +} +void *close_clients() { + DebugT debug, *deP = &debug; + DEBID("%s CLOSE clients", thisP->debug.id); + LOG(5, "start..."); + for (int i = 0; i < thisP->nodes; i++) if(thisP->cliSides[i].sc > -1) closeN(i, client); + LOG(4, "all clients closed"); + pthread_exit(NULL); +} +void close_node() { + if(!thisP->closing) { + thisP->closingThread = 0; + if(pthread_create(&thisP->closingThread, NULL, &close_clients, NULL) != 0) SYSERR("create thread"); + thisP->closing = 1; + } +} +int readN(int i) { + LOG(5, "ready to read len=%d from sc=%d...", dataP->dataLen, thisP->srvSides[i].sc); + int n, rest = dataP->dataLen; + void *buf = dataP->contP; + while(rest > 0) { + if(thisP->ssl) { if((n = SSL_read(thisP->srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); } + else { if((n = read(thisP->srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); } + if(n == 0) { + LOG(4, "read EOF"); + return 0; + } + else { + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly read %d bytes", n); + } + } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->msgs++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + LOG(5, "read %d from %u", dataP->dataLen, dataP->contP->hdr.listPort); + return dataP->dataLen; +} +int writeN(int i) { + DebugT debug, *deP = &debug; + DEBID("%s to %u", thisP->debug.id, thisP->cliSides[i].remPort); + LOG(5, "ready to write len=%d to sc=%d...", dataP->dataLen, thisP->cliSides[i].sc); + int n, rest = dataP->dataLen; + void *buf = dataP->contP; + while(rest > 0) { + if(thisP->ssl) { if((n = SSL_write(thisP->cliSides[i].sslP, buf, rest)) < 0) SSLERR("socket write"); } + else { if((n = write(thisP->cliSides[i].sc, buf, rest)) < 0) SYSERR("socket write"); } + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly written %d bytes", n); + } + LOG(5, "written %d", dataP->dataLen); + return dataP->dataLen; +} +int getN(int i) { + return readN(i) > 0; +} +int putN(int i) { + dataP->contP->hdr.listPort = thisP->locPort; + return writeN(i) > 0; +} +int next_node() { + int next; + if(thisP->topo == ring) { + next = thisP->locPort + 1; + if(next > thisP->last) next = thisP->first; + } + else while((next = thisP->first + thisP->nodes * ((float)random() / RAND_MAX)) == thisP->locPort); + return next; +} +void forward(int sci) { + int next, scn; + char digest[24]; + if(getN(sci)) { + LOG(5, "received data from %u", remPortData(dataP)); + if(thisP->kicker) { + LOG(4, "received from node %u: %s, ttl=%d", + remPortData(dataP), digest24Data(dataP, digest), ttlData(dataP)); +// if(ttlData(dataP) == 2) sabotageData(dataP); +// if(ttlData(dataP) == 2) errno=0, SYSERR("signal test"); + if(dttlData(dataP) <= 0) { + LOG(1, "received after passing all %s: %s", thisP->topo==mash ? "mashes" : "rings",digest24Data(dataP, digest)); + close_node(); + *(thisP->forw) = 0; + LOG(4, "leaving forward closing"); + return; + } + } + next = next_node(); scn = next - thisP->first; + LOG(5, "forwarding to %d, len=%d, ttl=%d --->", next, dataP->dataLen, ttlData(dataP)); + if(thisP->cliSides[scn].sc < 0) conn(scn, next); + if(*(thisP->forw) && csP->pacing) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); } + putN(scn); + LOG(5, "forwarded to %u", next); + } + else { + close_node(); + closeN(sci, server); + } + return; +} +void main_loop() { + sigset_t pacing; + sigemptyset(&pacing); + sigaddset(&pacing, PACING); + union { // simple select mask debug + fd_set rs; + uint mask; + } u; + int nfds; + FD_ZERO(&(u.rs)); nfds = 0; + if(*(thisP->forw)) { + FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; } + while(nfds) { + struct timeval t = {csP->selTO, 0}; + LOG(5, "selecting, mask=%08x", u.mask); + int rc; + rc = select(nfds, &(u.rs), NULL, NULL, &t); + if(rc < 0 && errno != EINTR) SYSERR("select"); + if(rc > 0) { + LOG(5, "return from select, mask=%08x", u.mask); + if(FD_ISSET(thisP->ssc, &(u.rs))) { // ssc posted: accept & forward + int i; + for(i=0; thisP->srvSides[i].sc > -1 && i < thisP->nodes; i++); // find unused slot for accept + if(i == thisP->nodes) HARDERR("can't accept, all slots in use"); + LOG(5, "slot for accept=%d", i); + acc(i); + forward(i); + } + else // check which connected socket is posted + for(int i = 0; i < thisP->nodes; i++) + if(thisP->srvSides[i].sc > -1 && FD_ISSET(thisP->srvSides[i].sc, &(u.rs))) forward(i); + } + FD_ZERO(&(u.rs)); nfds = 0; + if(*(thisP->forw)) { FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; } + for(int i = 0; i < thisP->nodes; i++) { // mask all connected client side sockets for select + int sc = thisP->srvSides[i].sc; + if(sc > -1) { FD_SET(sc, &(u.rs)); if(sc >= nfds) nfds = sc + 1; } + } + } +} +void Node(topology topo, int *forw, int port, int first, int n, int ssl) { + NodeT this; + thisP = &this; + deP = &(thisP->debug); + DEBID("%sSSL %s node %d", ssl ? "" : "non", topo==mash ? "MASH" : "RING", port); + LOG(4, "initializing..."); + dataP = &thisP->data; + Data(dataP, deP); + thisP->topo = topo; + thisP->locPort = port; + thisP->first = first; + thisP->last = first + n - 1; + thisP->nodes = n; + thisP->cliSides = malloc(n*sizeof(SocketT)); + thisP->srvSides = malloc(n*sizeof(SocketT)); + for(int k=0; kcliSides[k].sc = thisP->srvSides[k].sc = -1; + thisP->kicker = (port == first); + thisP->forw = forw; + thisP->nodeIdx = port - first; + thisP->closing = 0; + thisP->ssl = ssl; + if(thisP->ssl) { + char s[64]; + SSL_load_error_strings(); + SSL_library_init(); + LOG(4, "setting SSL contex..."); + if(!(thisP->ctx = SSL_CTX_new(TLS_method()))) SSLERR("new SSL CTX"); + SSL_CTX_set_mode(thisP->ctx, SSL_MODE_AUTO_RETRY); + SSL_CTX_set_verify(thisP->ctx, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); + sprintf(s, "%s/keys/%d.key", csP->ceP, thisP->locPort); + if(SSL_CTX_use_PrivateKey_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file"); + sprintf(s, "%s/certs/%d.pem", csP->ceP, thisP->locPort); + LOG(5, "SSL private key used: %s", s); + if(SSL_CTX_use_certificate_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file"); + LOG(5, "SSL certificate used: %s", s); + if(SSL_CTX_load_verify_locations(thisP->ctx, NULL, csP->caP) != 1) SSLERR("hh's thrusted certs path"); + } + LOG(5, "initalized"); + + bindN(thisP); + if(thisP->kicker) { + loadData(dataP, csP->ttl, csP->text); + int sci, next; + next = next_node(); sci = next - thisP->first; + char digest[24]; + LOG(1, "KICKER: ready to initial send %s, len=%d to node %d", digest24Data(dataP, digest), dataP->dataLen, next); + conn(sci, next); + putN(sci); + } + + main_loop(); + + LOG(5, "closing ssc"); + close(thisP->ssc); + if(thisP->closing) { // wait for closing thread + if(pthread_join(thisP->closingThread, NULL) != 0) SYSERR("join closing thread"); } + struct sigaction sigact; + sigfillset(&sigact.sa_mask); + sigact.sa_handler=sighandle; + if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction"); + if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait"); + int active = --csP->shP->act; + if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post"); + if(active > 0) pause(); + else kill(0, SIGUSR2); + int exitRc = EXIT_SUCCESS; + if(thisP->kicker && !chkData(dataP)) { + SOFTERR("INPUT AND OUTPUT DIFFER"); + exitRc = EXIT_FAILURE; } + LOG(2, "ended"); + exit(exitRc); +}