CSc/Node.c
changeset 0 5c129dd80d4f
equal deleted inserted replaced
-1:000000000000 0:5c129dd80d4f
       
     1 #include "CS.h"
       
     2 
       
     3 NodeP thisP;
       
     4 DebugP deP;
       
     5 DataP dataP;
       
     6 
       
     7 void sighandle(int sig) { return; }
       
     8 
       
     9 void bindN() {
       
    10 	LOG(4, "binding...");
       
    11 	struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
       
    12 	memset(sa, 0, sizeof(struct addrinfo));
       
    13 	sa->ai_family = AF_INET;
       
    14 	sa->ai_socktype = SOCK_STREAM;
       
    15 	sa->ai_protocol = 0;
       
    16 	sa->ai_flags = AI_PASSIVE;
       
    17 	char s[64];
       
    18 	sprintf(s, "%d", thisP->locPort);
       
    19 	int e;
       
    20 	if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
       
    21 	GAI(4, sa);
       
    22 	if((thisP->ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
       
    23    	int opt = 1;
       
    24    	if(setsockopt(thisP->ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
       
    25 	if(bind(thisP->ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
       
    26 	if(listen(thisP->ssc, 1) < 0) SYSERR("listen");
       
    27 	LOG(2, "bound to %d", thisP->locPort);
       
    28 }
       
    29 void conn(int i, int remPort) {
       
    30 	DebugT debug, *deP = &debug;
       
    31 	DEBID("%s to %u", thisP->debug.id, remPort);
       
    32 	thisP->cliSides[i].remPort = remPort;
       
    33 	int e;
       
    34 	LOG(4, "connecting to %u...", remPort);
       
    35 	int retry = csP->connThreshold;
       
    36 	struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
       
    37 	memset(ai, 0, sizeof(struct addrinfo));
       
    38 	ai->ai_family = AF_INET;
       
    39 	ai->ai_socktype = SOCK_STREAM;
       
    40 	ai->ai_protocol = 0;
       
    41 	ai->ai_flags = 0;
       
    42 	char port[6];
       
    43 	sprintf(port, "%d", remPort);
       
    44 	if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
       
    45 	GAI(3, ai);
       
    46 	if((thisP->cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
       
    47 	while(retry--) {
       
    48 		if(connect(thisP->cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
       
    49 			if(errno != ECONNREFUSED) SYSERR("connect");
       
    50 			usleep(csP->connTO);
       
    51 		}
       
    52 		else break;
       
    53 	}
       
    54 	if(retry < 1) {
       
    55 		LOG(0, "connection refused threshold %d reached", csP->connThreshold);
       
    56 		exit(EXIT_FAILURE);
       
    57 	}
       
    58 	if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
       
    59 	csP->shP->conns++;
       
    60 	if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
       
    61 	socklen_t l = sizeof(struct sockaddr);
       
    62 	struct sockaddr *sa = malloc(l);
       
    63 	if(getpeername(thisP->cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
       
    64 	LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->cliSides[i].sc); free(sa);
       
    65 
       
    66 	if(thisP->ssl) {
       
    67 		ERR_clear_error();
       
    68 		if(!(thisP->cliSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
       
    69 		if(!SSL_set_fd(thisP->cliSides[i].sslP, thisP->cliSides[i].sc)) SSLERR("client SSL set fd");
       
    70 		if((e = SSL_connect(thisP->cliSides[i].sslP)) < 1) {
       
    71 			switch(SSL_get_error(thisP->cliSides[i].sslP, e)) {
       
    72 				case SSL_ERROR_SYSCALL: SYSERR("SSL connect"); break;
       
    73 				default:	SSLERR("SSL connect"); break;
       
    74 			}
       
    75 		}
       
    76 	}
       
    77 	LOG(2, "connected via sc=%d after %d retries", thisP->cliSides[i].sc, csP->connThreshold - (retry + 1));
       
    78 }
       
    79 void acc(int i) {
       
    80 	LOG(4, "accepting...");
       
    81 	if((thisP->srvSides[i].sc = accept(thisP->ssc, NULL, NULL)) < 0) SYSERR("accept");
       
    82 	socklen_t l = sizeof(struct sockaddr);
       
    83 	struct sockaddr *sa = malloc(l);
       
    84 	if(getpeername(thisP->srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
       
    85 	LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->srvSides[i].sc); free(sa);
       
    86 	if(thisP->ssl) {
       
    87 		int e;
       
    88 		ERR_clear_error();
       
    89 		if(!(thisP->srvSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
       
    90 		if(!SSL_set_fd(thisP->srvSides[i].sslP, thisP->srvSides[i].sc)) SSLERR("server SSL set fd");
       
    91 		if((e = SSL_accept(thisP->srvSides[i].sslP)) < 1) {
       
    92 			switch(SSL_get_error(thisP->srvSides[i].sslP, e)) {
       
    93 				case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break;
       
    94 				default:	SSLERR("SSL accept"); break;
       
    95 			}
       
    96 		}
       
    97 	}
       
    98 	LOG(2, "accepted");
       
    99 }
       
   100 void closeN(int i, nodeside side) {
       
   101 	SocketP sc;
       
   102 	if(side) sc = thisP->srvSides; else sc = thisP->cliSides;
       
   103 	LOG(5, "closing sc=%d...", sc[i].sc);
       
   104 	if(thisP->ssl) {
       
   105 		int e;
       
   106 		if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
       
   107 		if(!e) {
       
   108 			LOG(5, "SSL shutdown rc=0");
       
   109 			if((e = SSL_shutdown(sc[i].sslP)) < 0) {
       
   110 				switch(SSL_get_error(sc[i].sslP, e)) {
       
   111 					case SSL_ERROR_SYSCALL:
       
   112 						if(!(e = ERR_get_error())) {
       
   113 							if(errno) SYSERR("SSL shutdown (2)");
       
   114 							break;
       
   115 						}
       
   116 						break;
       
   117 					default:	SSLERR("SSL shutdown (2)"); break;
       
   118 				}
       
   119 			}
       
   120 		}
       
   121 	}
       
   122 	close(sc[i].sc);
       
   123 	LOG(4, "closed sc=%d", sc[i].sc);
       
   124 	sc[i].sc = -1;
       
   125 }
       
   126 void *close_clients() {
       
   127 	DebugT debug, *deP = &debug;
       
   128 	DEBID("%s CLOSE clients", thisP->debug.id);
       
   129 	LOG(5, "start...");
       
   130 	for (int i = 0; i < thisP->nodes; i++) if(thisP->cliSides[i].sc > -1) closeN(i, client);
       
   131 	LOG(4, "all clients closed");
       
   132 	pthread_exit(NULL);
       
   133 }
       
   134 void close_node() {
       
   135 	if(!thisP->closing) {
       
   136 		thisP->closingThread = 0;
       
   137 		if(pthread_create(&thisP->closingThread, NULL, &close_clients, NULL) != 0) SYSERR("create thread");
       
   138 	thisP->closing = 1;
       
   139 	}
       
   140 }
       
   141 int readN(int i) {
       
   142 	LOG(5, "ready to read len=%d from sc=%d...", dataP->dataLen, thisP->srvSides[i].sc);
       
   143 	int n, rest = dataP->dataLen;
       
   144 	void *buf = dataP->contP;
       
   145 	while(rest > 0) {
       
   146 		if(thisP->ssl) { if((n = SSL_read(thisP->srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
       
   147 		else { if((n = read(thisP->srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
       
   148 		if(n == 0) {
       
   149 			LOG(4, "read EOF");
       
   150 			return 0;
       
   151 		}
       
   152 		else {
       
   153 			buf += n; rest -= n;
       
   154 			if(rest > 0) LOG(5, "partly read %d bytes", n);
       
   155 		}
       
   156 	}
       
   157 	if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
       
   158 	csP->shP->msgs++;
       
   159 	if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
       
   160 	LOG(5, "read %d from %u", dataP->dataLen, dataP->contP->hdr.listPort);
       
   161 	return dataP->dataLen;
       
   162 }
       
   163 int writeN(int i) {
       
   164 	DebugT debug, *deP = &debug;
       
   165 	DEBID("%s to %u", thisP->debug.id, thisP->cliSides[i].remPort);
       
   166 	LOG(5, "ready to write len=%d to sc=%d...", dataP->dataLen, thisP->cliSides[i].sc);
       
   167 	int n, rest = dataP->dataLen;
       
   168 	void *buf = dataP->contP;
       
   169 	while(rest > 0) {
       
   170 		if(thisP->ssl) { if((n = SSL_write(thisP->cliSides[i].sslP, buf, rest)) < 0) SSLERR("socket write"); }
       
   171 		else { if((n = write(thisP->cliSides[i].sc, buf, rest)) < 0) SYSERR("socket write"); }
       
   172 		buf += n; rest -= n;
       
   173 		if(rest > 0) LOG(5, "partly written %d bytes", n);
       
   174 	}
       
   175 	LOG(5, "written %d", dataP->dataLen);
       
   176 	return dataP->dataLen;
       
   177 }
       
   178 int getN(int i) {
       
   179 	return readN(i) > 0;
       
   180 }
       
   181 int putN(int i) {
       
   182 	dataP->contP->hdr.listPort = thisP->locPort;
       
   183 	return writeN(i) > 0;
       
   184 }
       
   185 int next_node() {
       
   186 	int next;
       
   187 	if(thisP->topo == ring) {
       
   188 		next = thisP->locPort + 1;
       
   189 		if(next > thisP->last) next = thisP->first;
       
   190 	}
       
   191 	else while((next = thisP->first + thisP->nodes * ((float)random() / RAND_MAX)) == thisP->locPort);
       
   192 	return next;
       
   193 }
       
   194 void forward(int sci) {
       
   195 	int next, scn;
       
   196 	char digest[24];
       
   197 	if(getN(sci)) {
       
   198    	LOG(5, "received data from %u", remPortData(dataP));
       
   199 		if(thisP->kicker) {
       
   200 			LOG(4, "received from node %u: %s, ttl=%d",
       
   201 						remPortData(dataP), digest24Data(dataP, digest), ttlData(dataP));
       
   202 //			if(ttlData(dataP) == 2) sabotageData(dataP);
       
   203 //			if(ttlData(dataP) == 2) errno=0, SYSERR("signal test");
       
   204 			if(dttlData(dataP) <= 0) {
       
   205       		LOG(1, "received after passing all %s: %s", thisP->topo==mash ? "mashes" : "rings",digest24Data(dataP, digest));
       
   206 				close_node();
       
   207 				*(thisP->forw) = 0;
       
   208 				LOG(4, "leaving forward closing");
       
   209 				return;
       
   210 			}
       
   211 		}
       
   212 		next = next_node(); scn = next - thisP->first;
       
   213 		LOG(5, "forwarding to %d, len=%d, ttl=%d --->", next, dataP->dataLen, ttlData(dataP));
       
   214 		if(thisP->cliSides[scn].sc < 0) conn(scn, next);
       
   215 		if(*(thisP->forw) && csP->pacing) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
       
   216 		putN(scn);
       
   217       LOG(5, "forwarded to %u", next);
       
   218 	}
       
   219 	else {
       
   220 		close_node();
       
   221 		closeN(sci, server);
       
   222 	}
       
   223 	return;
       
   224 }
       
   225 void main_loop() {
       
   226 	sigset_t pacing;
       
   227 	sigemptyset(&pacing);
       
   228    sigaddset(&pacing, PACING);
       
   229 	union {		// simple select mask debug
       
   230 	   fd_set rs;
       
   231 	   uint mask;
       
   232 	} u;
       
   233 	int nfds;
       
   234 	FD_ZERO(&(u.rs)); nfds = 0;
       
   235 	if(*(thisP->forw)) {
       
   236 		FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
       
   237 	while(nfds) {
       
   238 		struct timeval t = {csP->selTO, 0};
       
   239 		LOG(5, "selecting, mask=%08x", u.mask);
       
   240 		int rc;
       
   241 		rc = select(nfds, &(u.rs), NULL, NULL, &t);
       
   242 		if(rc < 0 && errno != EINTR) SYSERR("select");
       
   243 		if(rc > 0) {
       
   244 			LOG(5, "return from select, mask=%08x", u.mask);
       
   245 			if(FD_ISSET(thisP->ssc, &(u.rs))) {												// ssc posted: accept & forward
       
   246 				int i;
       
   247 				for(i=0; thisP->srvSides[i].sc > -1 && i < thisP->nodes; i++);		// find unused slot for accept
       
   248 				if(i == thisP->nodes) HARDERR("can't accept, all slots in use");
       
   249 				LOG(5, "slot for accept=%d", i);
       
   250 				acc(i);
       
   251 				forward(i);
       
   252 			}
       
   253 			else																						// check which connected socket is posted
       
   254 				for(int i = 0; i < thisP->nodes; i++)
       
   255 					if(thisP->srvSides[i].sc > -1 && FD_ISSET(thisP->srvSides[i].sc, &(u.rs))) forward(i);
       
   256 		}
       
   257 		FD_ZERO(&(u.rs)); nfds = 0;
       
   258 		if(*(thisP->forw)) { FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
       
   259 		for(int i = 0; i < thisP->nodes; i++) {											// mask all connected client side sockets for select
       
   260 			int sc = thisP->srvSides[i].sc;
       
   261 			if(sc > -1) { FD_SET(sc, &(u.rs)); if(sc >= nfds) nfds = sc + 1; }
       
   262 		}
       
   263 	}
       
   264 }
       
   265 void Node(topology topo, int *forw, int port, int first, int n, int ssl) {
       
   266 	NodeT this;
       
   267 	thisP = &this;
       
   268 	deP = &(thisP->debug);
       
   269 	DEBID("%sSSL %s node %d", ssl ? "" : "non", topo==mash ? "MASH" : "RING", port);
       
   270 	LOG(4, "initializing...");
       
   271 	dataP = &thisP->data;
       
   272 	Data(dataP, deP);
       
   273 	thisP->topo = topo;
       
   274 	thisP->locPort = port;
       
   275 	thisP->first = first;
       
   276 	thisP->last = first + n - 1;
       
   277 	thisP->nodes = n;
       
   278 	thisP->cliSides = malloc(n*sizeof(SocketT));
       
   279 	thisP->srvSides = malloc(n*sizeof(SocketT));
       
   280 	for(int k=0; k<n; k++) 	thisP->cliSides[k].sc = thisP->srvSides[k].sc = -1;
       
   281 	thisP->kicker = (port == first);
       
   282 	thisP->forw = forw;
       
   283 	thisP->nodeIdx = port - first;
       
   284 	thisP->closing = 0;
       
   285 	thisP->ssl = ssl;
       
   286 	if(thisP->ssl) {
       
   287 		char s[64];
       
   288 		SSL_load_error_strings();
       
   289 		SSL_library_init();
       
   290 		LOG(4, "setting SSL contex...");
       
   291 		if(!(thisP->ctx = SSL_CTX_new(TLS_method()))) SSLERR("new SSL CTX");
       
   292 		SSL_CTX_set_mode(thisP->ctx, SSL_MODE_AUTO_RETRY);
       
   293 		SSL_CTX_set_verify(thisP->ctx, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
       
   294 		sprintf(s, "%s/keys/%d.key", csP->ceP, thisP->locPort);
       
   295 		if(SSL_CTX_use_PrivateKey_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
       
   296 		sprintf(s, "%s/certs/%d.pem", csP->ceP, thisP->locPort);
       
   297 		LOG(5, "SSL private key used: %s", s);
       
   298 		if(SSL_CTX_use_certificate_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
       
   299 		LOG(5, "SSL certificate used: %s", s);
       
   300 		if(SSL_CTX_load_verify_locations(thisP->ctx, NULL, csP->caP) != 1)	SSLERR("hh's thrusted certs path");
       
   301 	}
       
   302 	LOG(5, "initalized");
       
   303 
       
   304 	bindN(thisP);
       
   305 	if(thisP->kicker) {
       
   306 		loadData(dataP, csP->ttl, csP->text);
       
   307 		int sci, next;
       
   308 		next = next_node(); sci = next - thisP->first;
       
   309 		char digest[24];
       
   310 		LOG(1, "KICKER: ready to initial send %s, len=%d to node %d", digest24Data(dataP, digest), dataP->dataLen, next);
       
   311 		conn(sci, next);
       
   312 		putN(sci);
       
   313 	}
       
   314 
       
   315 	main_loop();
       
   316 
       
   317 	LOG(5, "closing ssc");
       
   318 	close(thisP->ssc);
       
   319 	if(thisP->closing) {		// wait for closing thread
       
   320 		if(pthread_join(thisP->closingThread, NULL) != 0) SYSERR("join closing thread"); }
       
   321 	struct sigaction sigact;
       
   322 	sigfillset(&sigact.sa_mask);
       
   323 	sigact.sa_handler=sighandle;
       
   324 	if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
       
   325 	if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
       
   326 	int active = --csP->shP->act;
       
   327 	if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
       
   328 	if(active > 0)	pause();
       
   329 	else kill(0, SIGUSR2);
       
   330 	int exitRc = EXIT_SUCCESS;
       
   331 	if(thisP->kicker && !chkData(dataP)) {
       
   332 		SOFTERR("INPUT AND OUTPUT DIFFER");
       
   333 		exitRc = EXIT_FAILURE; }
       
   334 	LOG(2, "ended");
       
   335 	exit(exitRc);
       
   336 }