|
1 #include "CS.h" |
|
2 |
|
3 CSP csP; |
|
4 |
|
5 void sighandle(int sig) { return; } |
|
6 void abend(DebugP deP) { |
|
7 int pid; |
|
8 //LOG(0, "ABORT, netstat:"); |
|
9 //if(!(pid = fork())) { |
|
10 // execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL); |
|
11 // exit(EXIT_SUCCESS); |
|
12 //} |
|
13 waitpid(pid, NULL, 0); |
|
14 LOG(0, "ABORT, backtrace:"); |
|
15 deP->back_trace(); |
|
16 kill(0, SIGTERM); |
|
17 exit(EXIT_FAILURE); |
|
18 } |
|
19 char *gpa(struct sockaddr *ai_addr) { // returns string with IP4 address & port assigned to the socket |
|
20 char *s = (char*) malloc(64); |
|
21 unsigned short port = *(unsigned short*) ai_addr->sa_data; |
|
22 char *a = ai_addr->sa_data + 2; |
|
23 sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port)); |
|
24 return s; |
|
25 } |
|
26 void gai(int level, struct addrinfo *ai, DebugP deP) { // logs assigned IP4 addresses from addrinfo chain |
|
27 struct addrinfo *sa = ai; |
|
28 if (deP->debug >= level) do { |
|
29 char *s = gpa(sa->ai_addr); |
|
30 strcpy(deP->s, s); free(s); |
|
31 deP->deb(level); |
|
32 } while ((sa = sa->ai_next)); |
|
33 fflush(stderr); |
|
34 } |
|
35 void ssl_err(char *s, DebugP deP) { |
|
36 long e = ERR_get_error(); |
|
37 while(e) { |
|
38 LOG(0, "%s: %s", s, ERR_error_string(e, NULL)); |
|
39 e = ERR_get_error(); |
|
40 } |
|
41 } |
|
42 static int getArg(const char *a) { |
|
43 return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; } |
|
44 HeaderS::HeaderS() { |
|
45 this->ttl = 0; } |
|
46 HeaderS::HeaderS(int ttl) { |
|
47 this->ttl = ttl; } |
|
48 int HeaderS::len() { |
|
49 return sizeof(HeaderS); } |
|
50 PayloadS::PayloadS() { |
|
51 strcpy(&text, "EMPTY"); } |
|
52 PayloadS::PayloadS(const char *text) { |
|
53 ts = time(NULL); |
|
54 strcpy(&(this->text), text); } |
|
55 int PayloadS::check(PayloadP p) { |
|
56 return (int)(strcmp(&text, &p->text) == 0); } |
|
57 char *PayloadS::deliver() { |
|
58 return &text; } |
|
59 string PayloadS::digest() { |
|
60 string payl_s = string(&text); |
|
61 if(payl_s.size() < 24) return payl_s; |
|
62 else return payl_s.substr(0, 8) + string("--------") + payl_s.substr(payl_s.size() - 8, 8); } |
|
63 void PayloadS::sabotage() { |
|
64 text = '?'; } |
|
65 int PayloadS::len() { |
|
66 return sizeof(PayloadS) + strlen(&text); } |
|
67 int ContainerS::len() { |
|
68 return hdr.len() + payl.len(); } |
|
69 DataC::DataC() {} |
|
70 DataC::DataC(DebugP callerDeP) { |
|
71 deP = new DebugC(); |
|
72 DEBID("%s DATA", callerDeP->debid); |
|
73 dataLen = sizeof(HeaderS) + sizeof(PayloadS) + csP->text.size(); |
|
74 contP = static_cast<ContainerP>(operator new(dataLen)); |
|
75 new(&contP->hdr) HeaderS(); |
|
76 new(&contP->payl) PayloadS(); |
|
77 LOG(5, "Empty Data instance established"); |
|
78 } |
|
79 void DataC::load(int ttl, const char *text) { |
|
80 new(&contP->hdr) HeaderS(ttl); |
|
81 new(&contP->payl) PayloadS(text); |
|
82 LOG(5, "Data instance loaded with payload"); |
|
83 return; |
|
84 } |
|
85 string DataC::unld() { |
|
86 return string(&(contP->payl.text)); } |
|
87 string DataC::digest() { |
|
88 return contP->payl.digest(); } |
|
89 int DataC::dttl() { |
|
90 return --contP->hdr.ttl; } |
|
91 int DataC::ttl() { |
|
92 return contP->hdr.ttl; } |
|
93 bool DataC::dataOk() { |
|
94 return csP->text == string(&(contP->payl.text)); } |
|
95 int DataC::ts() { |
|
96 return contP->hdr.ts; } |
|
97 int DataC::remPort(int remPort) { |
|
98 return (contP->hdr.remPort = remPort); } |
|
99 int DataC::remPort() { |
|
100 return contP->hdr.remPort; } |
|
101 NodeC::NodeC(ConstellationP co, int port) { |
|
102 deP = new DebugC(); |
|
103 DEBID("%sSSL %s node %d", co->ssl ? "" : "non", co->topo==mash ? "MASH" : "RING", port); |
|
104 LOG(4, "intializing ..."); |
|
105 data = DataC(deP); |
|
106 topo = co->topo; |
|
107 locPort = port; |
|
108 first = co->first; |
|
109 nodes = co->nodes; |
|
110 last = first + nodes - 1; |
|
111 cliSides = new SocketS[nodes]; |
|
112 srvSides = new SocketS[nodes]; |
|
113 for(int k=0; k<nodes; k++) cliSides[k].sc = srvSides[k].sc = -1; |
|
114 kicker = (locPort == first); |
|
115 forwP = co->forwP; |
|
116 closing = 0; |
|
117 ssl = co->ssl; |
|
118 ssc = 0; |
|
119 if(ssl) { |
|
120 char s[128]; |
|
121 SSL_load_error_strings(); |
|
122 SSL_library_init(); |
|
123 LOG(4, "setting SSL contex..."); |
|
124 if(!(ctxP = SSL_CTX_new(SSLv23_method()))) SSLERR("new SSL CTX"); |
|
125 SSL_CTX_set_mode(ctxP, SSL_MODE_AUTO_RETRY); |
|
126 SSL_CTX_set_verify(ctxP, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); // SSL will claim partner certificate |
|
127 sprintf(s, "%s/keys/%d.key", csP->cePath.c_str(), locPort); |
|
128 LOG(5, "SSL private key used: %s", s); |
|
129 if(SSL_CTX_use_PrivateKey_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file"); |
|
130 sprintf(s, "%s/certs/%d.pem", csP->cePath.c_str(), locPort); |
|
131 LOG(5, "SSL certificate used: %s", s); |
|
132 if(SSL_CTX_use_certificate_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file"); |
|
133 LOG(5, "SSL: CApath: %s", csP->caPath.c_str()); |
|
134 if(SSL_CTX_load_verify_locations(ctxP, NULL, csP->caPath.c_str()) != 1) SSLERR("hh's thrusted certs path"); |
|
135 } |
|
136 LOG(5, "initalized"); |
|
137 } |
|
138 int NodeC::run() { |
|
139 LOG(5, "binding, kicker=%d", kicker); |
|
140 bindN(); |
|
141 if(kicker) { |
|
142 data.load(csP->ttl, csP->text.c_str()); |
|
143 int sci, next; |
|
144 next = next_node(); sci = next - first; |
|
145 LOG(2, "ready to initial send %s, len=%d to node %d", data.digest().c_str(), data.dataLen, next); |
|
146 conn(sci, next); |
|
147 putN(sci); |
|
148 } |
|
149 mainLoop(); |
|
150 LOG(5, "closing ssc"); |
|
151 close(ssc); |
|
152 if(closing) closingThread.join(); // wait for closing thread |
|
153 struct sigaction sigact; |
|
154 sigfillset(&sigact.sa_mask); |
|
155 sigact.sa_handler = sighandle; |
|
156 if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction"); |
|
157 if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait"); |
|
158 if(--csP->shP->act == 0) { |
|
159 if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post"); |
|
160 kill(0, SIGUSR2); |
|
161 } |
|
162 else { |
|
163 if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); |
|
164 pause(); |
|
165 } |
|
166 int exitRc = EXIT_SUCCESS; |
|
167 if(kicker && !data.dataOk()) { |
|
168 SOFTERR("input and output differ"); |
|
169 exitRc = EXIT_FAILURE; |
|
170 } |
|
171 LOG(2, "ended"); |
|
172 exit(exitRc); |
|
173 } |
|
174 void NodeC::mainLoop() { |
|
175 fd_set rs; |
|
176 int nfds; |
|
177 struct timeval t = {1, 0}; |
|
178 nfds = 0; FD_ZERO(&rs); |
|
179 if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; } |
|
180 while(nfds) { |
|
181 t.tv_sec = csP->selTO; |
|
182 int rc; |
|
183 rc = select(nfds, &rs, NULL, NULL, &t); |
|
184 if(rc < 0 && errno != EINTR) SYSERR("select"); |
|
185 if(rc > 0) { |
|
186 if(FD_ISSET(ssc, &rs)) { |
|
187 int i; |
|
188 for(i=0; srvSides[i].sc > -1 && i < nodes; i++); // find unused slot for accept |
|
189 if(i == nodes) HARDERR("can't accept, all slots in use"); |
|
190 LOG(5, "slot for accept=%d", i); |
|
191 acc(i); |
|
192 forward(i); |
|
193 } |
|
194 else |
|
195 for(int i = 0; i < nodes; i++) { |
|
196 if(srvSides[i].sc > -1 && FD_ISSET(srvSides[i].sc, &rs)) forward(i); } |
|
197 } |
|
198 FD_ZERO(&rs); nfds = 0; t.tv_sec = 1; |
|
199 for(int i = 0; i < nodes; i++) { |
|
200 int sc = srvSides[i].sc; |
|
201 if(sc > -1) { FD_SET(sc, &rs); if(sc >= nfds) nfds = sc + 1; } |
|
202 } |
|
203 if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; } |
|
204 } |
|
205 } |
|
206 void NodeC::forward(int sci) { |
|
207 int next, scn; |
|
208 if(getN(sci)) { |
|
209 LOG(5, "received data from %u", data.remPort()); |
|
210 if(kicker) { |
|
211 LOG(3, "received from node %u: %s, ttl=%d", data.remPort(), data.digest().c_str(), data.ttl()); |
|
212 if(data.dttl() <= 0) { |
|
213 LOG(1, "received after passing all %s: %s", topo==mash ? "mashes" : "rings", data.digest().c_str()); |
|
214 closeClients(); |
|
215 *(forwP) = 0; |
|
216 LOG(4, "leaving forward closing"); |
|
217 return; |
|
218 } |
|
219 } |
|
220 next = next_node(); scn = next - first; |
|
221 LOG(5, "forwarding len=%d to %d --->", data.dataLen, next); |
|
222 if(cliSides[scn].sc < 0) conn(scn, next); |
|
223 if(*(forwP)) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); } |
|
224 putN(scn); |
|
225 LOG(5, "forwarded to %u", next); |
|
226 } |
|
227 else { |
|
228 closeClients(); |
|
229 closeSocket(sci, server); |
|
230 } |
|
231 return; |
|
232 } |
|
233 void NodeC::bindN() { |
|
234 LOG(4, "binding..."); |
|
235 struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo)); |
|
236 memset(sa, 0, sizeof(struct addrinfo)); |
|
237 sa->ai_family = AF_INET; |
|
238 sa->ai_socktype = SOCK_STREAM; |
|
239 sa->ai_protocol = 0; |
|
240 sa->ai_flags = AI_PASSIVE; |
|
241 char s[64]; |
|
242 sprintf(s, "%d", locPort); |
|
243 int e; |
|
244 if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e)); |
|
245 GAI(4, sa); |
|
246 if((ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc"); |
|
247 int opt = 1; |
|
248 if(setsockopt(ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options"); |
|
249 if(bind(ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind"); |
|
250 if(listen(ssc, 1) < 0) SYSERR("listen"); |
|
251 LOG(2, "bound to %d", locPort); |
|
252 } |
|
253 void NodeC::conn(int i, int remPort) { |
|
254 DebugP deP = new DebugC(); |
|
255 DEBID("%s to %u", this->deP->debid, remPort); |
|
256 cliSides[i].remPort = remPort; |
|
257 int e; |
|
258 LOG(4, "connecting to %u...", remPort); |
|
259 int retry = csP->connThreshold; |
|
260 struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo)); |
|
261 memset(ai, 0, sizeof(struct addrinfo)); |
|
262 ai->ai_family = AF_INET; |
|
263 ai->ai_socktype = SOCK_STREAM; |
|
264 ai->ai_protocol = 0; |
|
265 ai->ai_flags = 0; |
|
266 char port[6]; |
|
267 sprintf(port, "%d", remPort); |
|
268 if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e)); |
|
269 GAI(3, ai); |
|
270 if((cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc"); |
|
271 while(retry--) { |
|
272 if(connect(cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) { |
|
273 if(errno != ECONNREFUSED) SYSERR("connect"); |
|
274 usleep(csP->connTO); } |
|
275 else break; } |
|
276 if(retry < 1) { |
|
277 LOG(0, "connection refused threshold %d reached", csP->connThreshold); |
|
278 exit(EXIT_FAILURE); } |
|
279 if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); |
|
280 csP->shP->conns++; |
|
281 if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); |
|
282 socklen_t l = sizeof(struct sockaddr); |
|
283 struct sockaddr *sa = (sockaddr*)malloc(l); |
|
284 if(getpeername(cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); |
|
285 LOG(4, "peer: %s on sc=%d", gpa(sa), cliSides[i].sc); free(sa); |
|
286 if(ssl) { |
|
287 ERR_clear_error(); |
|
288 if(!(cliSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL"); |
|
289 if(!SSL_set_fd(cliSides[i].sslP, cliSides[i].sc)) SSLERR("client SSL set fd"); |
|
290 if((e = SSL_connect(cliSides[i].sslP)) < 1) { |
|
291 switch(SSL_get_error(cliSides[i].sslP, e)) { |
|
292 case SSL_ERROR_SYSCALL: |
|
293 ssl_err((char*)"SSL connect", deP); |
|
294 if(e == 0) LOG(0, "SSL connect: EOF on socket"); |
|
295 else LOG(0, "SSL connect: %s (%d)", strerror(errno), errno); |
|
296 abend(deP); |
|
297 break; |
|
298 default: |
|
299 SSLERR("SSL connect"); |
|
300 break; |
|
301 } |
|
302 } |
|
303 } |
|
304 LOG(2, "connected via sc=%d after %d retries", cliSides[i].sc, csP->connThreshold - (retry + 1)); |
|
305 } |
|
306 void NodeC::acc(int i) { |
|
307 LOG(4, "accepting..."); |
|
308 if((srvSides[i].sc = accept(ssc, NULL, NULL)) < 0) SYSERR("accept"); |
|
309 socklen_t l = sizeof(struct sockaddr); |
|
310 struct sockaddr *sa = (sockaddr*)malloc(l); |
|
311 if(getpeername(srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); |
|
312 LOG(4, "peer: %s on sc=%d", gpa(sa), srvSides[i].sc); free(sa); |
|
313 if(ssl) { |
|
314 int e; |
|
315 ERR_clear_error(); |
|
316 if(!(srvSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL"); |
|
317 if(!SSL_set_fd(srvSides[i].sslP, srvSides[i].sc)) SSLERR("server SSL set fd"); |
|
318 if((e = SSL_accept(srvSides[i].sslP)) < 1) { |
|
319 switch(SSL_get_error(srvSides[i].sslP, e)) { |
|
320 case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break; |
|
321 default: SSLERR("SSL accept"); break; |
|
322 } |
|
323 } |
|
324 } |
|
325 LOG(2, "accepted"); |
|
326 } |
|
327 void NodeC::closeSocket(int i, nodeside side) { |
|
328 SocketP sc; |
|
329 if(side) sc = srvSides; else sc = cliSides; |
|
330 LOG(5, "closing sc=%d...", sc[i].sc); |
|
331 if(ssl) { |
|
332 int e; |
|
333 if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)"); |
|
334 if(!e) { |
|
335 LOG(5, "SSL shutdown rc=0"); |
|
336 if((e = SSL_shutdown(sc[i].sslP)) < 0) { |
|
337 switch(SSL_get_error(sc[i].sslP, e)) { |
|
338 case SSL_ERROR_SYSCALL: { |
|
339 long e; |
|
340 if(!(e = ERR_get_error()) && errno) SYSERR("SSL shutdown (2)"); |
|
341 break; |
|
342 } |
|
343 default: |
|
344 SSLERR("SSL shutdown (2)"); |
|
345 break; |
|
346 } |
|
347 } |
|
348 } |
|
349 } |
|
350 close(sc[i].sc); |
|
351 LOG(4, "closed sc=%d", sc[i].sc); |
|
352 sc[i].sc = -1; |
|
353 } |
|
354 void closeCliTh(void *p) { |
|
355 NodeP nP = (NodeP)p; |
|
356 char *callerid = nP->deP->debid; |
|
357 DebugP deP = new DebugC(); |
|
358 DEBID("%s CLOSE clients thread", callerid); |
|
359 LOG(5, "start..."); |
|
360 for (int i = 0; i < nP->nodes; i++) if(nP->cliSides[i].sc > -1) nP->closeSocket(i, client); |
|
361 LOG(4, "all clients closed"); |
|
362 } |
|
363 void NodeC::closeClients() { |
|
364 if(!closing) try { closingThread = thread(closeCliTh, this); } catch(exception e) { |
|
365 cout << "closing tread: " << e.what() << '\n'; } |
|
366 closing = 1; |
|
367 } |
|
368 int NodeC::readN(int i) { |
|
369 LOG(5, "to read len=%d from sc=%d...", data.dataLen, srvSides[i].sc); |
|
370 int n, rest = data.dataLen; |
|
371 char *buf = (char*)data.contP; |
|
372 while(rest > 0) { |
|
373 if(ssl) { if((n = SSL_read(srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); } |
|
374 else { if((n = read(srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); } |
|
375 if(n == 0) { |
|
376 LOG(4, "read EOF"); |
|
377 return 0; |
|
378 } |
|
379 else { |
|
380 buf += n; rest -= n; |
|
381 if(rest > 0) LOG(5, "partly read %d bytes, buf=%p", n, buf); |
|
382 } |
|
383 } |
|
384 if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); |
|
385 csP->shP->msgs++; |
|
386 if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); |
|
387 LOG(5, "read %d from %u", data.dataLen, data.remPort()); |
|
388 return (data.dataLen); |
|
389 } |
|
390 int NodeC::writeN(int i) { |
|
391 LOG(5, "to write len=%d to sc=%d...", data.dataLen, cliSides[i].sc); |
|
392 int n, rest = data.dataLen; |
|
393 char *buf = (char*)data.contP; |
|
394 while(rest > 0) { |
|
395 if(ssl) { if((n = SSL_write(cliSides[i].sslP, data.contP, data.dataLen)) < 0) SSLERR("socket write"); } |
|
396 else { if((n = write(cliSides[i].sc, data.contP, data.dataLen)) < 0) SYSERR("socket write"); } |
|
397 buf += n; rest -= n; |
|
398 if(rest > 0) LOG(5, "partly written %d bytes", n); |
|
399 } |
|
400 LOG(5, "written %d", data.dataLen); |
|
401 return data.dataLen; |
|
402 } |
|
403 int NodeC::getN(int i) { return readN(i) > 0; } |
|
404 int NodeC::putN(int i) { |
|
405 data.remPort(locPort); |
|
406 return writeN(i) > 0; } |
|
407 int NodeC::next_node() { |
|
408 int next; |
|
409 if(topo == ring) { |
|
410 next = locPort + 1; |
|
411 if(next > last) next = first; |
|
412 } |
|
413 else while((next = first + nodes * ((float)random() / RAND_MAX)) == locPort); |
|
414 return next; |
|
415 } |
|
416 ConstellationC::ConstellationC() { |
|
417 deP = NULL; |
|
418 forwP = NULL; |
|
419 topo = ring; |
|
420 ssl = first = nodes = 0; |
|
421 }; |
|
422 ConstellationC::ConstellationC(topology topo, int ssl) { |
|
423 deP = new DebugC(); |
|
424 DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING"); |
|
425 this->topo = topo; |
|
426 if(topo == ring) { first=csP->rp0; nodes=csP->rn; } |
|
427 if(topo == mash) { first=csP->mp0; nodes=csP->mn; } |
|
428 this->ssl = ssl; |
|
429 first += ssl*500; |
|
430 forwP = NULL; |
|
431 } |
|
432 int ConstellationC::run() { |
|
433 int stat = 0, pid = 0, exitRc = EXIT_SUCCESS; |
|
434 if(nodes == 0) exit(0); |
|
435 if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); } |
|
436 pid_t *pids = new pid_t[nodes]; |
|
437 LOG(1, "%d nodes starting...", nodes); |
|
438 if((forwP = (int*)mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap"); |
|
439 *forwP = 1; |
|
440 for(int port = first; port < first + nodes; port++) { |
|
441 if(!(pid = fork())) (new NodeC(this, port))->run(); |
|
442 else { pids[port-first] = pid; LOG(4, "node %u established in process %u", port, pid); } |
|
443 } |
|
444 LOG(2, "all nodes established"); |
|
445 while((pid = wait(&stat)) > 0) |
|
446 if(WIFEXITED(stat)) { |
|
447 LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat)); |
|
448 if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE; |
|
449 } |
|
450 else exitRc = EXIT_FAILURE; |
|
451 LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "with ERROR"); |
|
452 exit(exitRc); |
|
453 } |
|
454 CSS::CSS(DebugP deP, char *prgnP) { |
|
455 ShareA(shP); |
|
456 shP->msgs = 0; |
|
457 shP->conns = 0; |
|
458 if(sem_init(&shP->counterSem, 1, 1) < 0) SYSERR("sem_init"); |
|
459 text = "bla bla"; |
|
460 ttl = 3; |
|
461 rp0 = 11000; |
|
462 mp0 = 12000; |
|
463 rn = 0; |
|
464 mn = 0; |
|
465 issl = 0; |
|
466 const string sslPathSuffP = "../CS"; |
|
467 caPath = "/home/local/etc/ssl/certs/"; |
|
468 connThreshold = 77; // connection retries threshold |
|
469 connTO = 0.01 * 1000*1000; // connection sleep time in usecs |
|
470 selTO = 1; // selection timeout in secs |
|
471 |
|
472 if(getArg("DEB") >= 0) DebugC::debug = getArg("DEB"); |
|
473 if(getenv("T") != NULL) { text = getenv("T"); } |
|
474 if(getenv("CEP") != NULL) cePath = getenv("CEP"); |
|
475 else { cePath = dirname(prgnP); cePath += "/"; cePath += sslPathSuffP; } |
|
476 if(getenv("CAP") != NULL) caPath = getenv("CAP"); |
|
477 if(getArg("TTL") > 0) ttl = getArg("TTL"); |
|
478 if(getArg("RP0") > 0) rp0 = getArg("RP0"); |
|
479 if(getArg("MP0") > 0) mp0 = getArg("MP0"); |
|
480 if(getArg("N") >= 0) { mn = getArg("N"); rn = mn; } |
|
481 if(getArg("SSL") >= 0) issl = getArg("SSL"); |
|
482 if(getArg("RN") >= 0) rn = getArg("RN"); |
|
483 if(getArg("MN") >= 0) mn = getArg("MN"); |
|
484 shP->act = rn + mn; // initialize active node processes counter |
|
485 if(issl > 1) shP->act += shP->act; |
|
486 pacing = 0; |
|
487 if(getenv("P") != NULL) { |
|
488 double d = atof(getenv("P")); |
|
489 pace.tv_sec=(time_t)trunc(d); |
|
490 pace.tv_nsec=(d-pace.tv_sec)*1000*1000*1000; |
|
491 if(pace.tv_sec > 0 || pace.tv_nsec > 0) pacing = 1; |
|
492 } |
|
493 if(getArg("RS") >= 0) srandom(getArg("RS")); |
|
494 } |
|
495 int main(int argc, char *argv[]) { |
|
496 DebugC::debug_init(argv[0]); |
|
497 DebugP deP = new DebugC(); |
|
498 DEBID("client/server demo"); |
|
499 csP = new CSS(deP, argv[0]); |
|
500 LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\ |
|
501 argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, DebugC::debug); |
|
502 if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->cePath.c_str(), csP->caPath.c_str()); |
|
503 if(csP->issl < 2) { |
|
504 if(!fork()) (new ConstellationC(ring, csP->issl))->run(); |
|
505 if(!fork()) (new ConstellationC(mash, csP->issl))->run(); |
|
506 } else for(int ssl = 0; ssl < csP->issl; ssl++) { |
|
507 if(!fork()) (new ConstellationC(ring, ssl))->run(); |
|
508 if(!fork()) (new ConstellationC(mash, ssl))->run(); |
|
509 } |
|
510 int stat, exitRc = EXIT_SUCCESS; |
|
511 while(wait(&stat) > 0) |
|
512 if(WIFEXITED(stat)) { |
|
513 LOG(5, "constellation ended with exit(%d)", WEXITSTATUS(stat)); |
|
514 if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE; |
|
515 } |
|
516 else exitRc = EXIT_FAILURE; |
|
517 LOG(1, "%s end, forwards=%d, connections=%d", exitRc == EXIT_SUCCESS ? "NORMAL" : "BAD", csP->shP->msgs, csP->shP->conns); |
|
518 exit(exitRc); |
|
519 } |