|
1 # coding=utf8 |
|
2 |
|
3 import sys, os, ssl, time, socket, errno, signal |
|
4 from Crypto.Cipher import DES3 |
|
5 from d import D |
|
6 from parms import Parms |
|
7 |
|
8 |
|
9 class Node(): |
|
10 |
|
11 class AllPortsBusy(Exception): |
|
12 """všechny TCP porty pro server longtasks nebo pro peering jsou obsazeny""" |
|
13 |
|
14 blocking = True # select mode zatím není implementovaný |
|
15 useSSLContext = False |
|
16 ctx = None |
|
17 UDPbroadcastGO = False |
|
18 |
|
19 def __init__(self, d, chan=Parms.sslchannel, host=Parms.srvhost, port=None, conn=True, tryPort=True, peering=False): |
|
20 self._issl = Parms.ssl |
|
21 self._chan = chan |
|
22 self._bindhost = host |
|
23 self._baseport = Parms.baseport + (self._chan * 10) + (0 if self._issl else 1) |
|
24 self._minport = self._baseport + 1 |
|
25 self._maxport = self._baseport + 9 |
|
26 self._baseid = "netnode {}SSL".format("" if self._issl else "non") |
|
27 self.d = D("{} {}".format(d.debid, self._baseid)) |
|
28 self._srv_side = None |
|
29 self._UDPpasswd = "heslo" |
|
30 self._UDPbroadcast_addr = Parms.broadcast |
|
31 self._UDPbroadcast_port = Parms.udpport |
|
32 self._UDP_key = "PEER_IP" |
|
33 self._UDPbroadcastGO = False |
|
34 self.sslContext() |
|
35 if conn: # TCP connect |
|
36 if peering: |
|
37 host, port = self.get_peerport() |
|
38 self.conn(host=host, port=port) |
|
39 else: # socket bind |
|
40 if tryPort: # hledej volný port |
|
41 self.bindtrynext(self._bindhost) |
|
42 if peering: self.send_peerport() |
|
43 else: # zkus bind a případně čekej na uvolnění |
|
44 self.bindwait(self._bindhost) |
|
45 |
|
46 |
|
47 def get(self, size): |
|
48 try: |
|
49 if self.d.ll(5): self.d.log("get data from scfile...") |
|
50 data = self._scfile.read(size) |
|
51 if self.d.ll(5): self.d.log("{} bytes read".format(len(data))) |
|
52 return data |
|
53 except Exception as e: |
|
54 self.d.abend("read from socket", e) |
|
55 return -1 |
|
56 |
|
57 |
|
58 def genget(self, size=-1): |
|
59 rest = size |
|
60 while rest != 0: |
|
61 n = rest if 0 < rest < Parms.bufSize else Parms.bufSize |
|
62 data = self.get(n) |
|
63 r = len(data) |
|
64 if r < 1: break |
|
65 rest = rest - r |
|
66 yield data |
|
67 |
|
68 def getnum(self): |
|
69 b = self._scfile.read(12).decode() |
|
70 num = int(b) if b else -2 # -2 = EOD, -1 = directory, 0 and higher = data size |
|
71 if self.d.ll(5): self.d.log("getnum, got {:012d} (-2 means EOD)".format(num)) |
|
72 return num |
|
73 |
|
74 def getstr(self, decode = True): |
|
75 lb = self.getnum() |
|
76 if lb < 1: |
|
77 return "" |
|
78 else: |
|
79 _data = self._scfile.read(int(lb)) |
|
80 return _data.decode() if decode else _data |
|
81 |
|
82 def getfn(self): |
|
83 return self.getstr() |
|
84 |
|
85 def getcmd(self): |
|
86 try: |
|
87 return self._scfile.read(8).decode().rstrip('_') |
|
88 except Exception as e: |
|
89 if isinstance(e, socket.timeout): |
|
90 if self.d.ll(4): self.d.log("getcmd timeout") |
|
91 else: |
|
92 self.d.log("I/O err: {}".format(e)) |
|
93 return "" |
|
94 |
|
95 def receive_dir(self, fp, size, timestamp): |
|
96 os.makedirs(fp, exist_ok=True) |
|
97 os.utime(fp, (timestamp, timestamp)) |
|
98 return True |
|
99 |
|
100 def receive_file(self, fp, size, timestamp, counter=None): |
|
101 if os.path.dirname(fp): os.makedirs(os.path.dirname(fp), exist_ok=True) |
|
102 tempfp = fp + ".dejsem.partX" |
|
103 with open(tempfp, mode='w+b') as f: |
|
104 for data in self.genget(size = size): |
|
105 if counter: counter.update(len(data)) |
|
106 f.write(data) |
|
107 if os.path.getsize(tempfp) == size: |
|
108 os.rename(tempfp, fp) |
|
109 os.utime(fp, (timestamp, timestamp)) |
|
110 return True |
|
111 |
|
112 def receive_stream(self, fp, size, counter=None): |
|
113 if os.path.dirname(fp): os.makedirs(os.path.dirname(fp), exist_ok=True) |
|
114 tempfp = fp + ".{}.partX".format(Parms.applName) |
|
115 with open(tempfp, mode='w+b') as f: |
|
116 for data in self.genget(size = size): |
|
117 if counter: counter.update(len(data)) |
|
118 f.write(data) |
|
119 if os.path.getsize(tempfp) == size: |
|
120 os.rename(tempfp, fp) |
|
121 return True |
|
122 |
|
123 def put(self, data): |
|
124 if self.d.ll(5): self.d.log("PUT: data len={}, sending...".format(len(data))) |
|
125 try: |
|
126 l = self._scfile.write(data) |
|
127 if self.d.ll(5): self.d.log("PUT: data len={}, sent".format(l)) |
|
128 self._scfile.flush() |
|
129 except Exception as e: |
|
130 self.d.abend("send err", e) |
|
131 return False |
|
132 return True |
|
133 |
|
134 def genput(self): |
|
135 try: |
|
136 while True: |
|
137 data = yield None |
|
138 self.put(data) |
|
139 except Exception as e: |
|
140 self.d.abend("write to socket", e) |
|
141 raise e |
|
142 finally: |
|
143 self._scfile.flush() |
|
144 |
|
145 def sendEOD(self): |
|
146 self.putnum(0) |
|
147 |
|
148 def putnum(self, n): |
|
149 if self.d.ll(5): self.d.log("putnum, num={:012d}".format(n)) |
|
150 self._scfile.write(bytes("{:012d}".format(n), "utf8")) |
|
151 self._scfile.flush() |
|
152 |
|
153 def putstr(self, fn): |
|
154 b = bytes(str(fn), "utf8") |
|
155 self.putnum(len(b)) |
|
156 if self.d.ll(5): self.d.log("putstr, string={}".format(fn)) |
|
157 self._scfile.write(b) |
|
158 self._scfile.flush() |
|
159 |
|
160 def putcmd(self, act): |
|
161 if self.d.ll(3): self.d.log("action: " + act) |
|
162 # self._node.payload.data = bytes("{}".format(act), "utf8") |
|
163 self.put(bytes("{}".format(act.ljust(8, '_')), "utf8")) |
|
164 |
|
165 def sendport(self, port): |
|
166 """send dynamically allocated port to client""" |
|
167 self.putnum(port) |
|
168 |
|
169 def putfileinfo(self, fp, relfp): |
|
170 if self.d.ll(5): self.d.log("putfileinfo fp={}, relfp={}...".format(fp, relfp)) |
|
171 self.putstr(relfp) |
|
172 size = os.path.getsize(fp) if os.path.isfile(fp) else -1 |
|
173 self.putnum(size) |
|
174 timestamp = int(os.path.getmtime(fp)) if os.path.exists(fp) else 0 |
|
175 self.putnum(timestamp) |
|
176 if self.d.ll(4): self.d.log("fileinfo sent: fn={}, size={}, timestamp={}".format(relfp, size, timestamp)) |
|
177 |
|
178 def digest(self): |
|
179 return self.data if len(self.data) < 24 else self.data[0:8].decode() + "--------" + self.data[-8:].decode() |
|
180 |
|
181 def sslContext(self): |
|
182 if self._issl: |
|
183 if Node.useSSLContext: |
|
184 if not Node.ctx: |
|
185 if self.d.ll(4): self.d.log( |
|
186 "setting SSL context: certfile={}, capath={}...".format(Parms.sslCert, Parms.sslCAPath)) |
|
187 try: |
|
188 Node.ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) # PROTOCOL_SSLv23 |
|
189 Node.ctx.verify_mode = ssl.CERT_REQUIRED # CERT_REQUIRED | CERT_OPTIONAL | CERT_NONE |
|
190 Node.ctx.load_cert_chain(Parms.sslCert) |
|
191 Node.ctx.load_verify_locations(None, Parms.sslCAPath) |
|
192 except ssl.SSLError as e: |
|
193 self.d.abendHard("SSL context", e) |
|
194 |
|
195 def getssc(self): |
|
196 try: |
|
197 ssc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
198 ssc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|
199 except Exception as e: |
|
200 self.d.abendHard("ssc alloc", e) |
|
201 if self._issl: |
|
202 try: |
|
203 if Node.useSSLContext: |
|
204 ssc = Node.ctx.wrap_socket(ssc, server_side=True) |
|
205 else: |
|
206 ssc = ssl.wrap_socket( |
|
207 ssc, |
|
208 certfile=Parms.sslCert, |
|
209 ca_certs=Parms.sslCAPath, |
|
210 server_side=True, |
|
211 cert_reqs=ssl.CERT_REQUIRED, |
|
212 ssl_version=ssl.PROTOCOL_TLSv1) |
|
213 except ssl.SSLError as e: |
|
214 self.d.abendHard("ssc SSL wrap", e) |
|
215 return ssc |
|
216 |
|
217 def bindwait(self, host): |
|
218 port = self._baseport |
|
219 self.d.log("binding to {}:{}".format(host, port)) |
|
220 ssc = self.getssc() |
|
221 tries = 0 |
|
222 while True: |
|
223 try: |
|
224 ssc.bind((host, port)) |
|
225 break |
|
226 except Exception as e: |
|
227 if e.strerror == "Address already in use": |
|
228 if not tries: self.d.log("Address {}:{} already in use, waiting 10 secs...".format(host, port)) |
|
229 tries = tries + 1 if tries < 77 else 0 |
|
230 try: |
|
231 time.sleep(10) |
|
232 except KeyboardInterrupt: |
|
233 raise |
|
234 continue |
|
235 self.d.abendHard("bind", e) |
|
236 except KeyboardInterrupt: |
|
237 raise |
|
238 ssc.listen(1) |
|
239 if self.d.ll(2): self.d.log("bound to {}:{}".format(host, port)) |
|
240 self._ssc = ssc |
|
241 self.port = port |
|
242 return ssc |
|
243 |
|
244 def bindtrynext(self, host): |
|
245 for port in range(self._minport, self._maxport + 1): |
|
246 if self.d.ll(4): self.d.log("trying to bind to {}:{}...".format(host, port)) |
|
247 try: |
|
248 ssc = self.getssc() |
|
249 ssc.bind((host, port)) |
|
250 ssc.listen(1) |
|
251 break |
|
252 except Exception as e: |
|
253 if e.strerror == "Address already in use": |
|
254 if port < self._maxport: |
|
255 continue |
|
256 raise Node.AllPortsBusy |
|
257 self.d.abend("bind", e) |
|
258 if self.d.ll(2): self.d.log("bound to {}:{}".format(host, port)) |
|
259 self._ssc = ssc |
|
260 self.port = port |
|
261 return (ssc, port) |
|
262 |
|
263 def send_peerport(self): |
|
264 """UDP broadcast host:port pair for peer""" |
|
265 ipport = "{:012d}{}{:012d}{}{:012d}".format(len(self._UDP_key), self._UDP_key, len(self._bindhost), self._bindhost, self.port) |
|
266 |
|
267 c = DES3.new(self.rawKey(self._UDPpasswd, 24), DES3.MODE_ECB) |
|
268 data = ipport.encode() |
|
269 enc = c.encrypt(data + b' ' * (8 - len(data) % 8)) |
|
270 if self.d.ll(4): self.d.log("len=%d, enc=[%s]" % (len(enc), enc.hex())) |
|
271 s = socket.socket(type=socket.SOCK_DGRAM) |
|
272 s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) |
|
273 a = (self._UDPbroadcast_addr, self._UDPbroadcast_port) |
|
274 if self.d.ll(4): self.d.log("start udp sending to {}:{}: {}".format(self._UDPbroadcast_addr, Parms.udpport, data.decode())) |
|
275 |
|
276 pid = os.fork() |
|
277 if pid: self._UDPbroadcastPID = pid |
|
278 else: |
|
279 self.UDPbroadcastGO = True |
|
280 signal.signal(signal.SIGHUP, self.UDPstop) |
|
281 signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGHUP}) |
|
282 retries = 777 |
|
283 while retries > 0 and self.UDPbroadcastGO: |
|
284 s.sendto(enc, a) |
|
285 time.sleep(1) |
|
286 retries -= 1 |
|
287 sys.exit(0) |
|
288 |
|
289 def get_peerport(self): |
|
290 """get peer host:port pair broadcasted by peer via UDP""" |
|
291 c = DES3.new(self.rawKey(self._UDPpasswd, 24), DES3.MODE_ECB) |
|
292 s = socket.socket(type=socket.SOCK_DGRAM) |
|
293 a = ('', self._UDPbroadcast_port) |
|
294 if self.d.ll(4): self.d.log("binding to udp-port {}:{}".format(a[0], a[1])) |
|
295 s.bind(a) |
|
296 s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) |
|
297 while True: |
|
298 (dataBytes, (ip, port)) = s.recvfrom(512) |
|
299 try: # ignore bad dgrams |
|
300 data = c.decrypt(dataBytes).decode().strip() if Parms.ssl else dataBytes.decode() |
|
301 except: continue |
|
302 if self.d.ll(5): self.d.log("datalen={}, data={}".format(len(data), data)) |
|
303 strlen = int(data[:12]) |
|
304 key = data[12:12+strlen] |
|
305 if not key == self._UDP_key: continue |
|
306 data = data[12+strlen:] |
|
307 strlen = int(data[:12]) |
|
308 host = data[12:12+strlen] |
|
309 port = int(data[12+strlen:]) |
|
310 if self.d.ll(4): self.d.log("peer listening on {}:{}".format(host, port)) |
|
311 return (host, port) |
|
312 |
|
313 def rawKey(self, passwd, keylen): |
|
314 key = b'' |
|
315 while len(key) < keylen: |
|
316 key = key + passwd.encode() |
|
317 return key[:keylen] |
|
318 |
|
319 def UDPstop(self, sign, frame): |
|
320 self.UDPbroadcastGO = False |
|
321 |
|
322 def UDPsignalHUP(self): |
|
323 os.kill(self._UDPbroadcastPID, signal.SIGHUP) # stop UDP broadcast |
|
324 |
|
325 def acc(self, acc_TO=Parms.peer_accept_timeout): |
|
326 if self.d.ll(4): self.d.log("accepting on {} ...".format(self.port)) |
|
327 self._ssc.settimeout(acc_TO) |
|
328 try: |
|
329 self._sc, (froma, fromp) = self._ssc.accept() |
|
330 except KeyboardInterrupt: |
|
331 if self.d.ll(4): self.d.log("KeyboardInterrupt") |
|
332 raise |
|
333 except Exception as e: |
|
334 self.d.abend("accept", e) |
|
335 return False |
|
336 # fileno = self._sc.fileno() |
|
337 if self.d.ll(2): self.d.log("conn request on {}SSL port {} from {}:{}" |
|
338 .format("" if self._issl else "non", self.port, froma, fromp)) |
|
339 if Node.blocking: |
|
340 self._sc.settimeout(Parms.blockTimeout) |
|
341 else: # select mode není zatím implementovaný |
|
342 self._srv_side[self._sc] = self._sc |
|
343 if self.d.ll(3): self.d.log("srv side={}".format(*(sc.fileno() for sc in self._srv_side.values()))) |
|
344 accepted = False |
|
345 commonName = "nonSSL" |
|
346 certSubject = {} |
|
347 if self._chan > 0 and self._issl: |
|
348 certSubject.update(i for (i,) in self._sc.getpeercert()['subject']) |
|
349 self.d.log("client certificate subject:", certSubject, sev=4) |
|
350 if "commonName" in certSubject: commonName = certSubject["commonName"] |
|
351 if commonName == "{:02d}".format(self._chan): accepted = True |
|
352 # alternativa |
|
353 # for ((key, value),) in sc.getpeercert().get("subject"): |
|
354 # if key == "commonName": |
|
355 # commonName = value |
|
356 # if commonName == "{:02d}".format(self._chan): accepted = True |
|
357 else: |
|
358 accepted = True |
|
359 if self.d.ll(2): self.d.log("client {} {}".format(certSubject["commonName"], "accepted" if accepted else "rejected")) |
|
360 try: |
|
361 self._scfile = self._sc.makefile("rwb") |
|
362 except Exception as e: |
|
363 self.d.abendMsg("socket-makefile", e=e) |
|
364 self.close_sc() |
|
365 return False |
|
366 if accepted: |
|
367 try: |
|
368 if self.d.ll(4): self.d.log("confirming accept") |
|
369 self._scfile.write(b"ACCEPTED") |
|
370 self._scfile.flush() |
|
371 return True |
|
372 except Exception as e: |
|
373 self.d.abendMsg("send confirm", e=e) |
|
374 self.close_sc() |
|
375 return False |
|
376 else: |
|
377 self._scfile.write(b"REJECTED") |
|
378 self.close_sc() |
|
379 return False |
|
380 |
|
381 def conn(self, host=Parms.srvhost, port=None): |
|
382 if not port: port = self._baseport |
|
383 if self.d.ll(4): self.d.log("connecting to {}:{}...".format(host, port)) |
|
384 try: |
|
385 self._sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
386 except Exception as e: |
|
387 self.d.abend("socket alloc", e) |
|
388 if self._issl: |
|
389 if self.d.ll(4): self.d.log("sc SSL wrap, homedir={}, certfile={}, ca_certs={}" |
|
390 .format(os.getcwd(), Parms.sslCert, Parms.sslCAPath)) |
|
391 try: |
|
392 if Node.useSSLContext: |
|
393 self._sc = Node.ctx.wrap_socket(self._sc) |
|
394 else: |
|
395 self._sc = ssl.wrap_socket( |
|
396 self._sc, |
|
397 certfile=Parms.sslCert, |
|
398 ca_certs=Parms.sslCAPath, |
|
399 cert_reqs=ssl.CERT_REQUIRED, |
|
400 ssl_version=ssl.PROTOCOL_TLSv1) |
|
401 except Exception as e: |
|
402 self.d.abend("sc SSL wrap", e) |
|
403 retry = Parms.connThreshold |
|
404 connected = False |
|
405 while not connected and retry > 0: |
|
406 try: |
|
407 self._sc.connect((host, port)) |
|
408 connected = True |
|
409 except Exception as e: |
|
410 if e.errno == errno.ECONNREFUSED: |
|
411 retry = retry - 1 |
|
412 time.sleep(Parms.connTimeout) |
|
413 else: |
|
414 self.d.abend("connect to {}".format(host), e) |
|
415 if retry == 0: self.d.abend("connection to {} refused, threshold {} reached".format(host, Parms.connThreshold), None) |
|
416 fileno = self._sc.fileno() |
|
417 if Node.blocking: self._sc.settimeout(Parms.blockTimeout) |
|
418 try: |
|
419 self._scfile = self._sc.makefile("rwb") |
|
420 except Exception as e: |
|
421 self.d.abend("connect makefile", e) |
|
422 try: |
|
423 if self._scfile.read(8) != b"ACCEPTED": self.d.abend("connection not accepted by server", None) |
|
424 except Exception as e: |
|
425 self.d.abend("read socket", e) |
|
426 if self.d.ll(2): self.d.log("connected to {}:{} after {} retries, via fd {}" |
|
427 .format(host, port, Parms.connThreshold - retry, fileno)) |
|
428 |
|
429 def close_sc(self): |
|
430 if self.d.ll(4): self.d.log("closing socket...") |
|
431 try: |
|
432 if hasattr(self, '_scfile'): self._scfile.close() |
|
433 if hasattr(self, '_sc'): self._sc.close() |
|
434 except Exception as e: |
|
435 self.d.abend("closing socket", e) |
|
436 |
|
437 def close_ssc(self): |
|
438 if self.d.ll(4): self.d.log("closing SSL socket...") |
|
439 if hasattr(self, "_ssc"): |
|
440 try: self._ssc.close() |
|
441 except Exception as e: self.d.abend("closing SSL socket", e) |
|
442 |
|
443 def close(self): |
|
444 self.close_sc() |
|
445 self.close_ssc() |