CScpp/CS.cpp
author hh
Thu, 21 Nov 2019 14:55:10 +0100
changeset 0 5c129dd80d4f
permissions -rw-r--r--
--
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
hh
parents:
diff changeset
     1
#include "CS.h"
hh
parents:
diff changeset
     2
hh
parents:
diff changeset
     3
CSP csP;
hh
parents:
diff changeset
     4
hh
parents:
diff changeset
     5
void sighandle(int sig) { return; }
hh
parents:
diff changeset
     6
void abend(DebugP deP) {
hh
parents:
diff changeset
     7
   int pid;
hh
parents:
diff changeset
     8
   //LOG(0, "ABORT, netstat:");
hh
parents:
diff changeset
     9
   //if(!(pid = fork())) {
hh
parents:
diff changeset
    10
   //   execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL);
hh
parents:
diff changeset
    11
   //   exit(EXIT_SUCCESS);
hh
parents:
diff changeset
    12
   //}
hh
parents:
diff changeset
    13
   waitpid(pid, NULL, 0);
hh
parents:
diff changeset
    14
   LOG(0, "ABORT, backtrace:");
hh
parents:
diff changeset
    15
   deP->back_trace();
hh
parents:
diff changeset
    16
   kill(0, SIGTERM);
hh
parents:
diff changeset
    17
   exit(EXIT_FAILURE);
hh
parents:
diff changeset
    18
}
hh
parents:
diff changeset
    19
char *gpa(struct sockaddr *ai_addr) {							// returns string with IP4 address & port assigned to the socket
hh
parents:
diff changeset
    20
   char *s = (char*) malloc(64);
hh
parents:
diff changeset
    21
   unsigned short port = *(unsigned short*) ai_addr->sa_data;
hh
parents:
diff changeset
    22
   char *a = ai_addr->sa_data + 2;
hh
parents:
diff changeset
    23
   sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port));
hh
parents:
diff changeset
    24
   return s;
hh
parents:
diff changeset
    25
}
hh
parents:
diff changeset
    26
void gai(int level, struct addrinfo *ai, DebugP deP) {			// logs assigned IP4 addresses from addrinfo chain
hh
parents:
diff changeset
    27
   struct addrinfo *sa = ai;
hh
parents:
diff changeset
    28
   if (deP->debug >= level) do {
hh
parents:
diff changeset
    29
      char *s = gpa(sa->ai_addr);
hh
parents:
diff changeset
    30
      strcpy(deP->s, s); free(s);
hh
parents:
diff changeset
    31
      deP->deb(level);
hh
parents:
diff changeset
    32
   } while ((sa = sa->ai_next));
hh
parents:
diff changeset
    33
   fflush(stderr);
hh
parents:
diff changeset
    34
}
hh
parents:
diff changeset
    35
void ssl_err(char *s, DebugP deP) {
hh
parents:
diff changeset
    36
   long e = ERR_get_error();
hh
parents:
diff changeset
    37
   while(e) {
hh
parents:
diff changeset
    38
      LOG(0, "%s: %s", s, ERR_error_string(e, NULL));
hh
parents:
diff changeset
    39
      e = ERR_get_error();
hh
parents:
diff changeset
    40
   }
hh
parents:
diff changeset
    41
}
hh
parents:
diff changeset
    42
static int getArg(const char *a) {
hh
parents:
diff changeset
    43
	return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; }
hh
parents:
diff changeset
    44
HeaderS::HeaderS() {
hh
parents:
diff changeset
    45
	this->ttl = 0; }
hh
parents:
diff changeset
    46
HeaderS::HeaderS(int ttl) {
hh
parents:
diff changeset
    47
	this->ttl = ttl; }
hh
parents:
diff changeset
    48
int HeaderS::len() {
hh
parents:
diff changeset
    49
	return sizeof(HeaderS); }
hh
parents:
diff changeset
    50
PayloadS::PayloadS() {
hh
parents:
diff changeset
    51
	strcpy(&text, "EMPTY"); }
hh
parents:
diff changeset
    52
PayloadS::PayloadS(const char *text) {
hh
parents:
diff changeset
    53
	ts = time(NULL);
hh
parents:
diff changeset
    54
	strcpy(&(this->text), text); }
hh
parents:
diff changeset
    55
int PayloadS::check(PayloadP p) {
hh
parents:
diff changeset
    56
   return (int)(strcmp(&text, &p->text) == 0); }
hh
parents:
diff changeset
    57
char *PayloadS::deliver() {
hh
parents:
diff changeset
    58
   return &text; }
hh
parents:
diff changeset
    59
string PayloadS::digest() {
hh
parents:
diff changeset
    60
	string payl_s = string(&text);
hh
parents:
diff changeset
    61
	if(payl_s.size() < 24) return payl_s;
hh
parents:
diff changeset
    62
	else return payl_s.substr(0, 8) + string("--------") + payl_s.substr(payl_s.size() - 8, 8); }
hh
parents:
diff changeset
    63
void PayloadS::sabotage() {
hh
parents:
diff changeset
    64
	text = '?'; }
hh
parents:
diff changeset
    65
int PayloadS::len() {
hh
parents:
diff changeset
    66
	return sizeof(PayloadS) + strlen(&text); }
hh
parents:
diff changeset
    67
int ContainerS::len() {
hh
parents:
diff changeset
    68
	return hdr.len() + payl.len(); }
hh
parents:
diff changeset
    69
DataC::DataC() {}
hh
parents:
diff changeset
    70
DataC::DataC(DebugP callerDeP) {
hh
parents:
diff changeset
    71
   deP = new DebugC();
hh
parents:
diff changeset
    72
   DEBID("%s DATA", callerDeP->debid);
hh
parents:
diff changeset
    73
   dataLen = sizeof(HeaderS) + sizeof(PayloadS) + csP->text.size();
hh
parents:
diff changeset
    74
   contP = static_cast<ContainerP>(operator new(dataLen));
hh
parents:
diff changeset
    75
   new(&contP->hdr) HeaderS();
hh
parents:
diff changeset
    76
   new(&contP->payl) PayloadS();
hh
parents:
diff changeset
    77
   LOG(5, "Empty Data instance established");
hh
parents:
diff changeset
    78
}
hh
parents:
diff changeset
    79
void DataC::load(int ttl, const char *text) {
hh
parents:
diff changeset
    80
   new(&contP->hdr) HeaderS(ttl);
hh
parents:
diff changeset
    81
   new(&contP->payl) PayloadS(text);
hh
parents:
diff changeset
    82
   LOG(5, "Data instance loaded with payload");
hh
parents:
diff changeset
    83
   return;
hh
parents:
diff changeset
    84
}
hh
parents:
diff changeset
    85
string DataC::unld() {
hh
parents:
diff changeset
    86
	return string(&(contP->payl.text)); }
hh
parents:
diff changeset
    87
string DataC::digest() {
hh
parents:
diff changeset
    88
	return contP->payl.digest(); }
hh
parents:
diff changeset
    89
int DataC::dttl() {
hh
parents:
diff changeset
    90
   return --contP->hdr.ttl; }
hh
parents:
diff changeset
    91
int DataC::ttl() {
hh
parents:
diff changeset
    92
	return contP->hdr.ttl; }
hh
parents:
diff changeset
    93
bool DataC::dataOk() {
hh
parents:
diff changeset
    94
   return csP->text == string(&(contP->payl.text)); }
hh
parents:
diff changeset
    95
int DataC::ts() {
hh
parents:
diff changeset
    96
   return contP->hdr.ts; }
hh
parents:
diff changeset
    97
int DataC::remPort(int remPort) {
hh
parents:
diff changeset
    98
   return (contP->hdr.remPort = remPort); }
hh
parents:
diff changeset
    99
int DataC::remPort() {
hh
parents:
diff changeset
   100
   return contP->hdr.remPort; }
hh
parents:
diff changeset
   101
NodeC::NodeC(ConstellationP co, int port) {
hh
parents:
diff changeset
   102
   deP = new DebugC();
hh
parents:
diff changeset
   103
   DEBID("%sSSL %s node %d", co->ssl ? "" : "non", co->topo==mash ? "MASH" : "RING", port);
hh
parents:
diff changeset
   104
   LOG(4, "intializing ...");
hh
parents:
diff changeset
   105
   data = DataC(deP);
hh
parents:
diff changeset
   106
   topo = co->topo;
hh
parents:
diff changeset
   107
   locPort = port;
hh
parents:
diff changeset
   108
   first = co->first;
hh
parents:
diff changeset
   109
   nodes = co->nodes;
hh
parents:
diff changeset
   110
   last = first + nodes - 1;
hh
parents:
diff changeset
   111
   cliSides = new SocketS[nodes];
hh
parents:
diff changeset
   112
   srvSides = new SocketS[nodes];
hh
parents:
diff changeset
   113
   for(int k=0; k<nodes; k++) 	cliSides[k].sc = srvSides[k].sc = -1;
hh
parents:
diff changeset
   114
   kicker = (locPort == first);
hh
parents:
diff changeset
   115
   forwP = co->forwP;
hh
parents:
diff changeset
   116
   closing = 0;
hh
parents:
diff changeset
   117
   ssl = co->ssl;
hh
parents:
diff changeset
   118
   ssc = 0;
hh
parents:
diff changeset
   119
   if(ssl) {
hh
parents:
diff changeset
   120
      char s[128];
hh
parents:
diff changeset
   121
      SSL_load_error_strings();
hh
parents:
diff changeset
   122
      SSL_library_init();
hh
parents:
diff changeset
   123
      LOG(4, "setting SSL contex...");
hh
parents:
diff changeset
   124
      if(!(ctxP = SSL_CTX_new(SSLv23_method()))) SSLERR("new SSL CTX");
hh
parents:
diff changeset
   125
      SSL_CTX_set_mode(ctxP, SSL_MODE_AUTO_RETRY);
hh
parents:
diff changeset
   126
      SSL_CTX_set_verify(ctxP, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);	// SSL will claim partner certificate
hh
parents:
diff changeset
   127
      sprintf(s, "%s/keys/%d.key", csP->cePath.c_str(), locPort);
hh
parents:
diff changeset
   128
      LOG(5, "SSL private key used: %s", s);
hh
parents:
diff changeset
   129
      if(SSL_CTX_use_PrivateKey_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
hh
parents:
diff changeset
   130
      sprintf(s, "%s/certs/%d.pem", csP->cePath.c_str(), locPort);
hh
parents:
diff changeset
   131
      LOG(5, "SSL certificate used: %s", s);
hh
parents:
diff changeset
   132
      if(SSL_CTX_use_certificate_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
hh
parents:
diff changeset
   133
      LOG(5, "SSL: CApath: %s", csP->caPath.c_str());
hh
parents:
diff changeset
   134
      if(SSL_CTX_load_verify_locations(ctxP, NULL, csP->caPath.c_str()) != 1) SSLERR("hh's thrusted certs path");
hh
parents:
diff changeset
   135
   }
hh
parents:
diff changeset
   136
   LOG(5, "initalized");
hh
parents:
diff changeset
   137
}
hh
parents:
diff changeset
   138
int NodeC::run() {
hh
parents:
diff changeset
   139
   LOG(5, "binding, kicker=%d", kicker);
hh
parents:
diff changeset
   140
   bindN();
hh
parents:
diff changeset
   141
   if(kicker) {
hh
parents:
diff changeset
   142
      data.load(csP->ttl, csP->text.c_str());
hh
parents:
diff changeset
   143
      int sci, next;
hh
parents:
diff changeset
   144
      next = next_node(); sci = next - first;
hh
parents:
diff changeset
   145
      LOG(2, "ready to initial send %s, len=%d to node %d",	data.digest().c_str(), data.dataLen, next);
hh
parents:
diff changeset
   146
      conn(sci, next);
hh
parents:
diff changeset
   147
      putN(sci);
hh
parents:
diff changeset
   148
   }
hh
parents:
diff changeset
   149
   mainLoop();
hh
parents:
diff changeset
   150
   LOG(5, "closing ssc");
hh
parents:
diff changeset
   151
   close(ssc);
hh
parents:
diff changeset
   152
   if(closing) closingThread.join();		// wait for closing thread
hh
parents:
diff changeset
   153
	struct sigaction sigact;
hh
parents:
diff changeset
   154
	sigfillset(&sigact.sa_mask);
hh
parents:
diff changeset
   155
	sigact.sa_handler = sighandle;
hh
parents:
diff changeset
   156
	if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
hh
parents:
diff changeset
   157
	if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
hh
parents:
diff changeset
   158
	if(--csP->shP->act == 0) {
hh
parents:
diff changeset
   159
		if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
hh
parents:
diff changeset
   160
		kill(0, SIGUSR2);
hh
parents:
diff changeset
   161
	}
hh
parents:
diff changeset
   162
	else {
hh
parents:
diff changeset
   163
		if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
hh
parents:
diff changeset
   164
		pause();
hh
parents:
diff changeset
   165
	}
hh
parents:
diff changeset
   166
   int exitRc = EXIT_SUCCESS;
hh
parents:
diff changeset
   167
   if(kicker && !data.dataOk()) {
hh
parents:
diff changeset
   168
   	SOFTERR("input and output differ");
hh
parents:
diff changeset
   169
   	exitRc = EXIT_FAILURE;
hh
parents:
diff changeset
   170
   }
hh
parents:
diff changeset
   171
   LOG(2, "ended");
hh
parents:
diff changeset
   172
   exit(exitRc);
hh
parents:
diff changeset
   173
}
hh
parents:
diff changeset
   174
void NodeC::mainLoop() {
hh
parents:
diff changeset
   175
   fd_set rs;
hh
parents:
diff changeset
   176
   int nfds;
hh
parents:
diff changeset
   177
   struct timeval t = {1, 0};
hh
parents:
diff changeset
   178
   nfds = 0; FD_ZERO(&rs);
hh
parents:
diff changeset
   179
   if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
hh
parents:
diff changeset
   180
   while(nfds) {
hh
parents:
diff changeset
   181
   	t.tv_sec = csP->selTO;
hh
parents:
diff changeset
   182
		int rc;
hh
parents:
diff changeset
   183
		rc = select(nfds, &rs, NULL, NULL, &t);
hh
parents:
diff changeset
   184
		if(rc < 0 && errno != EINTR) SYSERR("select");
hh
parents:
diff changeset
   185
		if(rc > 0) {
hh
parents:
diff changeset
   186
			if(FD_ISSET(ssc, &rs)) {
hh
parents:
diff changeset
   187
				int i;
hh
parents:
diff changeset
   188
				for(i=0; srvSides[i].sc > -1 && i < nodes; i++);						// find unused slot for accept
hh
parents:
diff changeset
   189
				if(i == nodes) HARDERR("can't accept, all slots in use");
hh
parents:
diff changeset
   190
				LOG(5, "slot for accept=%d", i);
hh
parents:
diff changeset
   191
				acc(i);
hh
parents:
diff changeset
   192
				forward(i);
hh
parents:
diff changeset
   193
			}
hh
parents:
diff changeset
   194
			else
hh
parents:
diff changeset
   195
				for(int i = 0; i < nodes; i++) {
hh
parents:
diff changeset
   196
					if(srvSides[i].sc > -1 && FD_ISSET(srvSides[i].sc, &rs)) forward(i); }
hh
parents:
diff changeset
   197
      }
hh
parents:
diff changeset
   198
      FD_ZERO(&rs); nfds = 0; t.tv_sec = 1;
hh
parents:
diff changeset
   199
      for(int i = 0; i < nodes; i++) {
hh
parents:
diff changeset
   200
      	int sc = srvSides[i].sc;
hh
parents:
diff changeset
   201
      	if(sc > -1) { FD_SET(sc, &rs); if(sc >= nfds) nfds = sc + 1; }
hh
parents:
diff changeset
   202
      }
hh
parents:
diff changeset
   203
      if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
hh
parents:
diff changeset
   204
   }
hh
parents:
diff changeset
   205
}
hh
parents:
diff changeset
   206
void NodeC::forward(int sci) {
hh
parents:
diff changeset
   207
   int next, scn;
hh
parents:
diff changeset
   208
   if(getN(sci)) {
hh
parents:
diff changeset
   209
   	LOG(5, "received data from %u", data.remPort());
hh
parents:
diff changeset
   210
      if(kicker) {
hh
parents:
diff changeset
   211
      	LOG(3, "received from node %u: %s, ttl=%d", data.remPort(), data.digest().c_str(), data.ttl());
hh
parents:
diff changeset
   212
      	if(data.dttl() <= 0) {
hh
parents:
diff changeset
   213
      		LOG(1, "received after passing all %s: %s", topo==mash ? "mashes" : "rings", data.digest().c_str());
hh
parents:
diff changeset
   214
      		closeClients();
hh
parents:
diff changeset
   215
      		*(forwP) = 0;
hh
parents:
diff changeset
   216
      		LOG(4, "leaving forward closing");
hh
parents:
diff changeset
   217
      		return;
hh
parents:
diff changeset
   218
      	}
hh
parents:
diff changeset
   219
      }
hh
parents:
diff changeset
   220
      next = next_node(); scn = next - first;
hh
parents:
diff changeset
   221
      LOG(5, "forwarding len=%d to %d --->", data.dataLen, next);
hh
parents:
diff changeset
   222
      if(cliSides[scn].sc < 0) conn(scn, next);
hh
parents:
diff changeset
   223
		if(*(forwP)) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
hh
parents:
diff changeset
   224
      putN(scn);
hh
parents:
diff changeset
   225
      LOG(5, "forwarded to %u", next);
hh
parents:
diff changeset
   226
   }
hh
parents:
diff changeset
   227
   else {
hh
parents:
diff changeset
   228
      closeClients();
hh
parents:
diff changeset
   229
      closeSocket(sci, server);
hh
parents:
diff changeset
   230
   }
hh
parents:
diff changeset
   231
   return;
hh
parents:
diff changeset
   232
}
hh
parents:
diff changeset
   233
void NodeC::bindN() {
hh
parents:
diff changeset
   234
   LOG(4, "binding...");
hh
parents:
diff changeset
   235
   struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
hh
parents:
diff changeset
   236
   memset(sa, 0, sizeof(struct addrinfo));
hh
parents:
diff changeset
   237
   sa->ai_family = AF_INET;
hh
parents:
diff changeset
   238
   sa->ai_socktype = SOCK_STREAM;
hh
parents:
diff changeset
   239
   sa->ai_protocol = 0;
hh
parents:
diff changeset
   240
   sa->ai_flags = AI_PASSIVE;
hh
parents:
diff changeset
   241
   char s[64];
hh
parents:
diff changeset
   242
   sprintf(s, "%d", locPort);
hh
parents:
diff changeset
   243
   int e;
hh
parents:
diff changeset
   244
   if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
hh
parents:
diff changeset
   245
   GAI(4, sa);
hh
parents:
diff changeset
   246
   if((ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
hh
parents:
diff changeset
   247
   int opt = 1;
hh
parents:
diff changeset
   248
   if(setsockopt(ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
hh
parents:
diff changeset
   249
   if(bind(ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
hh
parents:
diff changeset
   250
   if(listen(ssc, 1) < 0) SYSERR("listen");
hh
parents:
diff changeset
   251
   LOG(2, "bound to %d", locPort);
hh
parents:
diff changeset
   252
}
hh
parents:
diff changeset
   253
void NodeC::conn(int i, int remPort) {
hh
parents:
diff changeset
   254
   DebugP deP = new DebugC();
hh
parents:
diff changeset
   255
   DEBID("%s to %u", this->deP->debid, remPort);
hh
parents:
diff changeset
   256
   cliSides[i].remPort = remPort;
hh
parents:
diff changeset
   257
   int e;
hh
parents:
diff changeset
   258
   LOG(4, "connecting to %u...", remPort);
hh
parents:
diff changeset
   259
   int retry = csP->connThreshold;
hh
parents:
diff changeset
   260
   struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
hh
parents:
diff changeset
   261
   memset(ai, 0, sizeof(struct addrinfo));
hh
parents:
diff changeset
   262
   ai->ai_family = AF_INET;
hh
parents:
diff changeset
   263
   ai->ai_socktype = SOCK_STREAM;
hh
parents:
diff changeset
   264
   ai->ai_protocol = 0;
hh
parents:
diff changeset
   265
   ai->ai_flags = 0;
hh
parents:
diff changeset
   266
   char port[6];
hh
parents:
diff changeset
   267
   sprintf(port, "%d", remPort);
hh
parents:
diff changeset
   268
   if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
hh
parents:
diff changeset
   269
   GAI(3, ai);
hh
parents:
diff changeset
   270
   if((cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
hh
parents:
diff changeset
   271
   while(retry--) {
hh
parents:
diff changeset
   272
      if(connect(cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
hh
parents:
diff changeset
   273
      	if(errno != ECONNREFUSED) SYSERR("connect");
hh
parents:
diff changeset
   274
      	usleep(csP->connTO); }
hh
parents:
diff changeset
   275
      else break; }
hh
parents:
diff changeset
   276
   if(retry < 1) {
hh
parents:
diff changeset
   277
      LOG(0, "connection refused threshold %d reached", csP->connThreshold);
hh
parents:
diff changeset
   278
      exit(EXIT_FAILURE); }
hh
parents:
diff changeset
   279
   if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
hh
parents:
diff changeset
   280
   csP->shP->conns++;
hh
parents:
diff changeset
   281
   if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
hh
parents:
diff changeset
   282
   socklen_t l = sizeof(struct sockaddr);
hh
parents:
diff changeset
   283
   struct sockaddr *sa = (sockaddr*)malloc(l);
hh
parents:
diff changeset
   284
   if(getpeername(cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
hh
parents:
diff changeset
   285
   LOG(4, "peer: %s on sc=%d", gpa(sa), cliSides[i].sc); free(sa);
hh
parents:
diff changeset
   286
   if(ssl) {
hh
parents:
diff changeset
   287
      ERR_clear_error();
hh
parents:
diff changeset
   288
      if(!(cliSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
hh
parents:
diff changeset
   289
      if(!SSL_set_fd(cliSides[i].sslP, cliSides[i].sc)) SSLERR("client SSL set fd");
hh
parents:
diff changeset
   290
      if((e = SSL_connect(cliSides[i].sslP)) < 1) {
hh
parents:
diff changeset
   291
      	switch(SSL_get_error(cliSides[i].sslP, e)) {
hh
parents:
diff changeset
   292
      	case SSL_ERROR_SYSCALL:
hh
parents:
diff changeset
   293
      		ssl_err((char*)"SSL connect", deP);
hh
parents:
diff changeset
   294
      		if(e == 0) LOG(0, "SSL connect: EOF on socket");
hh
parents:
diff changeset
   295
      		else LOG(0, "SSL connect: %s (%d)", strerror(errno), errno);
hh
parents:
diff changeset
   296
      		abend(deP);
hh
parents:
diff changeset
   297
      		break;
hh
parents:
diff changeset
   298
			default:
hh
parents:
diff changeset
   299
				SSLERR("SSL connect");
hh
parents:
diff changeset
   300
				break;
hh
parents:
diff changeset
   301
      	}
hh
parents:
diff changeset
   302
      }
hh
parents:
diff changeset
   303
   }
hh
parents:
diff changeset
   304
   LOG(2, "connected via sc=%d after %d retries", cliSides[i].sc, csP->connThreshold - (retry + 1));
hh
parents:
diff changeset
   305
}
hh
parents:
diff changeset
   306
void NodeC::acc(int i) {
hh
parents:
diff changeset
   307
   LOG(4, "accepting...");
hh
parents:
diff changeset
   308
   if((srvSides[i].sc = accept(ssc, NULL, NULL)) < 0) SYSERR("accept");
hh
parents:
diff changeset
   309
   socklen_t l = sizeof(struct sockaddr);
hh
parents:
diff changeset
   310
   struct sockaddr *sa = (sockaddr*)malloc(l);
hh
parents:
diff changeset
   311
   if(getpeername(srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
hh
parents:
diff changeset
   312
   LOG(4, "peer: %s on sc=%d", gpa(sa), srvSides[i].sc); free(sa);
hh
parents:
diff changeset
   313
   if(ssl) {
hh
parents:
diff changeset
   314
      int e;
hh
parents:
diff changeset
   315
      ERR_clear_error();
hh
parents:
diff changeset
   316
      if(!(srvSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
hh
parents:
diff changeset
   317
      if(!SSL_set_fd(srvSides[i].sslP, srvSides[i].sc)) SSLERR("server SSL set fd");
hh
parents:
diff changeset
   318
      if((e = SSL_accept(srvSides[i].sslP)) < 1) {
hh
parents:
diff changeset
   319
      	switch(SSL_get_error(srvSides[i].sslP, e)) {
hh
parents:
diff changeset
   320
      	case SSL_ERROR_SYSCALL:	SYSERR("SSL accept"); break;
hh
parents:
diff changeset
   321
      	default: 					SSLERR("SSL accept"); break;
hh
parents:
diff changeset
   322
      	}
hh
parents:
diff changeset
   323
      }
hh
parents:
diff changeset
   324
   }
hh
parents:
diff changeset
   325
   LOG(2, "accepted");
hh
parents:
diff changeset
   326
}
hh
parents:
diff changeset
   327
void NodeC::closeSocket(int i, nodeside side) {
hh
parents:
diff changeset
   328
   SocketP sc;
hh
parents:
diff changeset
   329
   if(side) sc = srvSides; else sc = cliSides;
hh
parents:
diff changeset
   330
   LOG(5, "closing sc=%d...", sc[i].sc);
hh
parents:
diff changeset
   331
   if(ssl) {
hh
parents:
diff changeset
   332
   	int e;
hh
parents:
diff changeset
   333
   	if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
hh
parents:
diff changeset
   334
   	if(!e) {
hh
parents:
diff changeset
   335
   		LOG(5, "SSL shutdown rc=0");
hh
parents:
diff changeset
   336
   		if((e = SSL_shutdown(sc[i].sslP)) < 0) {
hh
parents:
diff changeset
   337
   			switch(SSL_get_error(sc[i].sslP, e)) {
hh
parents:
diff changeset
   338
   			case SSL_ERROR_SYSCALL: {
hh
parents:
diff changeset
   339
   				long e;
hh
parents:
diff changeset
   340
   				if(!(e = ERR_get_error()) && errno) SYSERR("SSL shutdown (2)");
hh
parents:
diff changeset
   341
   				break;
hh
parents:
diff changeset
   342
   			}
hh
parents:
diff changeset
   343
   			default:
hh
parents:
diff changeset
   344
   				SSLERR("SSL shutdown (2)");
hh
parents:
diff changeset
   345
   				break;
hh
parents:
diff changeset
   346
   			}
hh
parents:
diff changeset
   347
   		}
hh
parents:
diff changeset
   348
   	}
hh
parents:
diff changeset
   349
   }
hh
parents:
diff changeset
   350
   close(sc[i].sc);
hh
parents:
diff changeset
   351
   LOG(4, "closed sc=%d", sc[i].sc);
hh
parents:
diff changeset
   352
   sc[i].sc = -1;
hh
parents:
diff changeset
   353
}
hh
parents:
diff changeset
   354
void closeCliTh(void *p) {
hh
parents:
diff changeset
   355
   NodeP nP = (NodeP)p;
hh
parents:
diff changeset
   356
   char *callerid = nP->deP->debid;
hh
parents:
diff changeset
   357
   DebugP deP = new DebugC();
hh
parents:
diff changeset
   358
   DEBID("%s CLOSE clients thread", callerid);
hh
parents:
diff changeset
   359
   LOG(5, "start...");
hh
parents:
diff changeset
   360
   for (int i = 0; i < nP->nodes; i++) if(nP->cliSides[i].sc > -1) nP->closeSocket(i, client);
hh
parents:
diff changeset
   361
   LOG(4, "all clients closed");
hh
parents:
diff changeset
   362
}
hh
parents:
diff changeset
   363
void NodeC::closeClients() {
hh
parents:
diff changeset
   364
   if(!closing) try { closingThread = thread(closeCliTh, this); } catch(exception e) {
hh
parents:
diff changeset
   365
   		cout << "closing tread: " << e.what() << '\n'; }
hh
parents:
diff changeset
   366
   closing = 1;
hh
parents:
diff changeset
   367
}
hh
parents:
diff changeset
   368
int NodeC::readN(int i) {
hh
parents:
diff changeset
   369
   LOG(5, "to read len=%d from sc=%d...", data.dataLen, srvSides[i].sc);
hh
parents:
diff changeset
   370
   int n, rest = data.dataLen;
hh
parents:
diff changeset
   371
   char *buf = (char*)data.contP;
hh
parents:
diff changeset
   372
   while(rest > 0) {
hh
parents:
diff changeset
   373
   	if(ssl) { if((n = SSL_read(srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
hh
parents:
diff changeset
   374
   	else { if((n = read(srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
hh
parents:
diff changeset
   375
   	if(n == 0) {
hh
parents:
diff changeset
   376
   		LOG(4, "read EOF");
hh
parents:
diff changeset
   377
   		return 0;
hh
parents:
diff changeset
   378
   	}
hh
parents:
diff changeset
   379
   	else {
hh
parents:
diff changeset
   380
   		buf += n; rest -= n;
hh
parents:
diff changeset
   381
   		if(rest > 0) LOG(5, "partly read %d bytes, buf=%p", n, buf);
hh
parents:
diff changeset
   382
   	}
hh
parents:
diff changeset
   383
   }
hh
parents:
diff changeset
   384
	if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
hh
parents:
diff changeset
   385
	csP->shP->msgs++;
hh
parents:
diff changeset
   386
	if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
hh
parents:
diff changeset
   387
	LOG(5, "read %d from %u", data.dataLen, data.remPort());
hh
parents:
diff changeset
   388
	return (data.dataLen);
hh
parents:
diff changeset
   389
}
hh
parents:
diff changeset
   390
int NodeC::writeN(int i) {
hh
parents:
diff changeset
   391
   LOG(5, "to write len=%d to sc=%d...", data.dataLen, cliSides[i].sc);
hh
parents:
diff changeset
   392
   int n, rest = data.dataLen;
hh
parents:
diff changeset
   393
   char *buf = (char*)data.contP;
hh
parents:
diff changeset
   394
   while(rest > 0) {
hh
parents:
diff changeset
   395
		if(ssl) { if((n = SSL_write(cliSides[i].sslP, data.contP, data.dataLen)) < 0) SSLERR("socket write"); }
hh
parents:
diff changeset
   396
		else { if((n = write(cliSides[i].sc, data.contP, data.dataLen)) < 0) SYSERR("socket write"); }
hh
parents:
diff changeset
   397
		buf += n; rest -= n;
hh
parents:
diff changeset
   398
		if(rest > 0) LOG(5, "partly written %d bytes", n);
hh
parents:
diff changeset
   399
	}
hh
parents:
diff changeset
   400
   LOG(5, "written %d", data.dataLen);
hh
parents:
diff changeset
   401
   return data.dataLen;
hh
parents:
diff changeset
   402
}
hh
parents:
diff changeset
   403
int NodeC::getN(int i) { return readN(i) > 0; }
hh
parents:
diff changeset
   404
int NodeC::putN(int i) {
hh
parents:
diff changeset
   405
	data.remPort(locPort);
hh
parents:
diff changeset
   406
	return writeN(i) > 0; }
hh
parents:
diff changeset
   407
int NodeC::next_node() {
hh
parents:
diff changeset
   408
   int next;
hh
parents:
diff changeset
   409
   if(topo == ring) {
hh
parents:
diff changeset
   410
      next = locPort + 1;
hh
parents:
diff changeset
   411
      if(next > last) next = first;
hh
parents:
diff changeset
   412
   }
hh
parents:
diff changeset
   413
   else while((next = first + nodes * ((float)random() / RAND_MAX)) == locPort);
hh
parents:
diff changeset
   414
   return next;
hh
parents:
diff changeset
   415
}
hh
parents:
diff changeset
   416
ConstellationC::ConstellationC() {
hh
parents:
diff changeset
   417
   deP = NULL;
hh
parents:
diff changeset
   418
   forwP = NULL;
hh
parents:
diff changeset
   419
   topo = ring;
hh
parents:
diff changeset
   420
   ssl = first = nodes = 0;
hh
parents:
diff changeset
   421
};
hh
parents:
diff changeset
   422
ConstellationC::ConstellationC(topology topo, int ssl) {
hh
parents:
diff changeset
   423
   deP = new DebugC();
hh
parents:
diff changeset
   424
   DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING");
hh
parents:
diff changeset
   425
   this->topo = topo;
hh
parents:
diff changeset
   426
   if(topo == ring) { first=csP->rp0; nodes=csP->rn; }
hh
parents:
diff changeset
   427
   if(topo == mash) { first=csP->mp0; nodes=csP->mn; }
hh
parents:
diff changeset
   428
   this->ssl = ssl;
hh
parents:
diff changeset
   429
   first += ssl*500;
hh
parents:
diff changeset
   430
   forwP = NULL;
hh
parents:
diff changeset
   431
}
hh
parents:
diff changeset
   432
int ConstellationC::run() {
hh
parents:
diff changeset
   433
	int stat = 0, pid = 0, exitRc = EXIT_SUCCESS;
hh
parents:
diff changeset
   434
   if(nodes == 0) exit(0);
hh
parents:
diff changeset
   435
   if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); }
hh
parents:
diff changeset
   436
	pid_t *pids = new pid_t[nodes];
hh
parents:
diff changeset
   437
   LOG(1, "%d nodes starting...", nodes);
hh
parents:
diff changeset
   438
   if((forwP = (int*)mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap");
hh
parents:
diff changeset
   439
   *forwP = 1;
hh
parents:
diff changeset
   440
   for(int port = first; port < first + nodes; port++) {
hh
parents:
diff changeset
   441
      if(!(pid = fork())) (new NodeC(this, port))->run();
hh
parents:
diff changeset
   442
      else { pids[port-first] = pid; LOG(4, "node %u established in process %u", port, pid); }
hh
parents:
diff changeset
   443
   }
hh
parents:
diff changeset
   444
   LOG(2, "all nodes established");
hh
parents:
diff changeset
   445
   while((pid = wait(&stat)) > 0)
hh
parents:
diff changeset
   446
     if(WIFEXITED(stat)) {
hh
parents:
diff changeset
   447
   	  LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
hh
parents:
diff changeset
   448
   	  if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
hh
parents:
diff changeset
   449
     }
hh
parents:
diff changeset
   450
     else exitRc = EXIT_FAILURE;
hh
parents:
diff changeset
   451
   LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "with ERROR");
hh
parents:
diff changeset
   452
   exit(exitRc);
hh
parents:
diff changeset
   453
}
hh
parents:
diff changeset
   454
CSS::CSS(DebugP deP, char *prgnP) {
hh
parents:
diff changeset
   455
   ShareA(shP);
hh
parents:
diff changeset
   456
   shP->msgs = 0;
hh
parents:
diff changeset
   457
   shP->conns = 0;
hh
parents:
diff changeset
   458
   if(sem_init(&shP->counterSem, 1, 1) < 0) SYSERR("sem_init");
hh
parents:
diff changeset
   459
   text = "bla bla";
hh
parents:
diff changeset
   460
   ttl = 3;
hh
parents:
diff changeset
   461
   rp0 = 11000;
hh
parents:
diff changeset
   462
   mp0 = 12000;
hh
parents:
diff changeset
   463
   rn = 0;
hh
parents:
diff changeset
   464
   mn = 0;
hh
parents:
diff changeset
   465
   issl = 0;
hh
parents:
diff changeset
   466
   const string sslPathSuffP = "../CS";
hh
parents:
diff changeset
   467
   caPath = "/home/local/etc/ssl/certs/";
hh
parents:
diff changeset
   468
   connThreshold = 77;			// connection retries threshold
hh
parents:
diff changeset
   469
   connTO = 0.01 * 1000*1000;	// connection sleep time in usecs
hh
parents:
diff changeset
   470
   selTO = 1;						// selection timeout in secs
hh
parents:
diff changeset
   471
hh
parents:
diff changeset
   472
   if(getArg("DEB") >= 0) DebugC::debug = getArg("DEB");
hh
parents:
diff changeset
   473
   if(getenv("T") != NULL) { text = getenv("T"); }
hh
parents:
diff changeset
   474
   if(getenv("CEP") != NULL) cePath = getenv("CEP");
hh
parents:
diff changeset
   475
   else { cePath = dirname(prgnP); cePath += "/"; cePath += sslPathSuffP; }
hh
parents:
diff changeset
   476
   if(getenv("CAP") != NULL)  caPath = getenv("CAP");
hh
parents:
diff changeset
   477
   if(getArg("TTL") > 0) ttl = getArg("TTL");
hh
parents:
diff changeset
   478
   if(getArg("RP0") > 0) rp0 = getArg("RP0");
hh
parents:
diff changeset
   479
   if(getArg("MP0") > 0) mp0 = getArg("MP0");
hh
parents:
diff changeset
   480
   if(getArg("N") >= 0) { mn = getArg("N"); rn = mn; }
hh
parents:
diff changeset
   481
   if(getArg("SSL") >= 0) issl = getArg("SSL");
hh
parents:
diff changeset
   482
   if(getArg("RN") >= 0) rn = getArg("RN");
hh
parents:
diff changeset
   483
   if(getArg("MN") >= 0) mn = getArg("MN");
hh
parents:
diff changeset
   484
	shP->act = rn + mn;		// initialize active node processes counter
hh
parents:
diff changeset
   485
	if(issl > 1) shP->act += shP->act;
hh
parents:
diff changeset
   486
	pacing = 0;
hh
parents:
diff changeset
   487
   if(getenv("P") != NULL) {
hh
parents:
diff changeset
   488
	  	double d = atof(getenv("P"));
hh
parents:
diff changeset
   489
	  	pace.tv_sec=(time_t)trunc(d);
hh
parents:
diff changeset
   490
	  	pace.tv_nsec=(d-pace.tv_sec)*1000*1000*1000;
hh
parents:
diff changeset
   491
	  	if(pace.tv_sec > 0 || pace.tv_nsec > 0) pacing = 1;
hh
parents:
diff changeset
   492
   }
hh
parents:
diff changeset
   493
   if(getArg("RS") >= 0) srandom(getArg("RS"));
hh
parents:
diff changeset
   494
}
hh
parents:
diff changeset
   495
int main(int argc, char *argv[]) {
hh
parents:
diff changeset
   496
   DebugC::debug_init(argv[0]);
hh
parents:
diff changeset
   497
   DebugP deP = new DebugC();
hh
parents:
diff changeset
   498
   DEBID("client/server demo");
hh
parents:
diff changeset
   499
   csP = new CSS(deP, argv[0]);
hh
parents:
diff changeset
   500
   LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\
hh
parents:
diff changeset
   501
   			argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, DebugC::debug);
hh
parents:
diff changeset
   502
   if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->cePath.c_str(), csP->caPath.c_str());
hh
parents:
diff changeset
   503
   if(csP->issl < 2) {
hh
parents:
diff changeset
   504
      if(!fork()) (new ConstellationC(ring, csP->issl))->run();
hh
parents:
diff changeset
   505
      if(!fork()) (new ConstellationC(mash, csP->issl))->run();
hh
parents:
diff changeset
   506
   } else for(int  ssl = 0; ssl < csP->issl; ssl++) {
hh
parents:
diff changeset
   507
      if(!fork()) (new ConstellationC(ring, ssl))->run();
hh
parents:
diff changeset
   508
      if(!fork()) (new ConstellationC(mash, ssl))->run();
hh
parents:
diff changeset
   509
   }
hh
parents:
diff changeset
   510
   int stat, exitRc = EXIT_SUCCESS;
hh
parents:
diff changeset
   511
   while(wait(&stat) > 0)
hh
parents:
diff changeset
   512
   	if(WIFEXITED(stat)) {
hh
parents:
diff changeset
   513
   		LOG(5, "constellation ended with exit(%d)", WEXITSTATUS(stat));
hh
parents:
diff changeset
   514
   		if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
hh
parents:
diff changeset
   515
   	}
hh
parents:
diff changeset
   516
      else exitRc = EXIT_FAILURE;
hh
parents:
diff changeset
   517
   LOG(1, "%s end, forwards=%d, connections=%d", exitRc == EXIT_SUCCESS ? "NORMAL" : "BAD", csP->shP->msgs, csP->shP->conns);
hh
parents:
diff changeset
   518
   exit(exitRc);
hh
parents:
diff changeset
   519
}