CSp/CS.py
changeset 0 5c129dd80d4f
equal deleted inserted replaced
-1:000000000000 0:5c129dd80d4f
       
     1 #!/usr/bin/python3
       
     2 # coding=utf8
       
     3 
       
     4 # ring nonssl: cca 8000  rings/sec*3nodes
       
     5 # ring    ssl: cca 2600  rings/sec*3nodes
       
     6 # mash nonssl: cca 2000 mashes/sec*3nodes
       
     7 # mash    ssl: cca  300 mashes/sec*3nodes
       
     8 # u mashe je čas na přenosy úměrný počtu uzlů,
       
     9 #    kdežto čas na connect/close je úměrný počtu spojů tj. kvadrátu počtu uzlů
       
    10 
       
    11 import time
       
    12 import socket
       
    13 import errno
       
    14 import os
       
    15 import signal
       
    16 import sys
       
    17 import pickle
       
    18 import ssl
       
    19 import select
       
    20 import random
       
    21 import multiprocessing
       
    22 import threading
       
    23 
       
    24 
       
    25 class Debug():
       
    26     def __init__(self, debid):
       
    27         self.debid = debid
       
    28     def log(self, level, *msg):
       
    29         if level <= maxDebLev:
       
    30             log_lock.acquire()
       
    31             print("{:10.6f} {}:".format(time.time()-t0, self.debid), *msg, file=sys.stderr)
       
    32             sys.stderr.flush()
       
    33             log_lock.release()
       
    34     def abend(self, s):
       
    35         self.log(0, s)
       
    36         os.kill(0, signal.SIGTERM)
       
    37 
       
    38         
       
    39 class Node(Debug):
       
    40     def __init__(self, debid, forwarding, topo, port, p0, pn, issl):
       
    41         Debug.__init__(self, "{} node {}".format(debid, port))
       
    42         self.topo = topo
       
    43         self.locPort = port
       
    44         self.p0 = p0
       
    45         self.pn = pn
       
    46         self.ssc = None
       
    47         self.cli_side = {}
       
    48         self.srv_side = {}
       
    49         self.kicker = (self.locPort == self.pn)
       
    50         self.forwarding = forwarding                # in forwarding kicker task indicates when TTL reached 0
       
    51         self.closing = False
       
    52         self.payload = None 
       
    53         self.issl = issl
       
    54         if issl:                                                                                                                                           
       
    55             self.log(4, "setting SSL context...")  
       
    56             self.sslCert = cePath + "/certs/{}.pem".format(port)
       
    57             self.sslKey = cePath + "/keys/{}.key".format(port)                                                                                
       
    58             try:                                                                                                                                                
       
    59                 self.ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)                                                                                                  
       
    60                 self.ctx.verify_mode = ssl.CERT_REQUIRED
       
    61                 self.log(5, "cert={}, key={}".format(self.sslCert, self.sslKey))                                                                                                        
       
    62                 self.ctx.load_cert_chain(self.sslCert, self.sslKey)
       
    63                 self.ctx.load_verify_locations(None, caPath)                                                                              
       
    64             except ssl.SSLError as e: self.abend("SSL context: {}".format(str(e)))                                                                                                                            
       
    65     def run(self):        
       
    66         self.bind()
       
    67         self.payload = Data(self.debid, self.locPort)
       
    68         if self.kicker:
       
    69             nxt = self.next_node()
       
    70             self.log(1, "kicker ready to send '{}' to {}".format(self.payload.digest(), nxt))                                                                                                                                                                                                                                                                                       
       
    71             if not nxt in self.cli_side: self.conn(nxt)
       
    72             self.payload.put(self.cli_side[nxt])
       
    73         if self.forwarding[0].value: wait_list = (self.ssc,)
       
    74         while wait_list:
       
    75             self.log(4, "select...")
       
    76             self.log(5, "waitlist:", *(sc.fileno() for sc in wait_list))
       
    77             ready_list = select.select(wait_list, (), (), sel_TO)
       
    78             self.log(5, "readylist:", *(sc.fileno() for sc in ready_list[0]))
       
    79             for sc in ready_list[0]:
       
    80                 self.log(5, "sc {} ready...".format(sc.fileno()))
       
    81                 if sc == self.ssc: sc = self.acc()                    
       
    82                 self.forward(sc)
       
    83             wait_list = ()
       
    84             self.log(4, "forwarding={}".format(self.forwarding[0].value)) 
       
    85             if self.forwarding[0].value: wait_list += (self.ssc,)            # when off, no new connection will come on ssc 
       
    86             else: self.close_cli()
       
    87             for sc in self.srv_side.values(): wait_list += (sc,)
       
    88         self.close_srv()
       
    89         signal.signal(signal.SIGUSR2, sighand)
       
    90         last = 0
       
    91         ctr_lock.acquire()
       
    92         active.value -= 1
       
    93         if active.value == 0: last = 1        
       
    94         ctr_lock.release()
       
    95         if last: os.kill(0, signal.SIGUSR2)
       
    96         else: signal.pause()
       
    97         self.log(2, "ended")
       
    98         os._exit(0)
       
    99     def forward(self, sc):
       
   100         if not self.payload.get(sc):
       
   101             self.log(5, "delete srv_side[{}]".format(sc.fileno()))
       
   102             del self.srv_side[sc]
       
   103             self.log(4, "closing {}...".format(sc.fileno()))
       
   104             sc.close()
       
   105         else:
       
   106             self.log(5, "received data from {}".format(self.payload.rport))
       
   107             ctr_lock.acquire(); 
       
   108             forwards.value += 1; 
       
   109             ctr_lock.release()
       
   110             if self.kicker:
       
   111                 self.payload.dttl()
       
   112                 self.log(3, "received from {}: {}, ttl={}".format(self.topo, self.payload.digest(), self.payload.ttl))                                       
       
   113                 if self.payload.ttl <= 0:                                                                                                                   
       
   114                     self.log(1, "received after passing all {}: {}".\
       
   115                              format("mashes" if self.topo == "mash" else "rings", self.payload.digest()))
       
   116                     self.forwarding[0].value = 0                                                                                                                    
       
   117                     return                                                                                                                     
       
   118             nxt = self.next_node()
       
   119             self.log(5, "forwarding to {}...".format(nxt))
       
   120             if pacing: time.sleep(pace)                                                                                                                                                                                                                                                                                       
       
   121             if not nxt in self.cli_side: self.conn(nxt)
       
   122             self.payload.put(self.cli_side[nxt])
       
   123             self.log(5, "forwarded to {}".format(nxt))
       
   124     def next_node(self):
       
   125         if self.topo == "ring":                                                                                                                                 
       
   126             if self.kicker: nxt = self.p0                                                                                                                      
       
   127             else: nxt = self.locPort + 1                                                                                                                       
       
   128         else:                                                                                                                                                   
       
   129             nxt = self.locPort                                                                                                                                 
       
   130             while nxt == self.locPort:                                                                                                                        
       
   131                 nxt = random.randint(self.p0, self.pn)                                                                                                         
       
   132         return nxt        
       
   133     def bind(self):
       
   134             self.log(4, "binding...")
       
   135             try:
       
   136                 self.ssc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   137                 self.ssc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
       
   138             except Exception as e: 
       
   139                 self.abend("ssc alloc: {}".format(e.strerror))
       
   140             if self.issl:
       
   141                 self.log(4, "ssc SSL wrap")
       
   142                 try: self.ssc = self.ctx.wrap_socket(self.ssc)
       
   143                 except ssl.SSLError as e: self.abend("ssc SSL wrap: {}".format(str(e)))
       
   144             try:
       
   145                 self.ssc.bind(("127.0.0.1", self.locPort))
       
   146                 self.ssc.listen(1)
       
   147             except Exception as e: self.abend("bind: {}".format(e.strerror))
       
   148             self.log(2, "bound")
       
   149     def conn(self, remPort):
       
   150         self.log(4, "connecting to {}...".format(remPort))
       
   151         try: sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   152         except Exception as e: self.abend("socket alloc: {}".format(e.strerror))
       
   153         if self.issl:
       
   154             self.log(4, "sc SSL wrap")
       
   155             try: sc = self.ctx.wrap_socket(sc)
       
   156             except ssl.SSLError as e: self.abend("sc SSL wrap: {}".format(str(e)))
       
   157         retry = connThreshold
       
   158         connected = False
       
   159         while not connected and retry > 0:
       
   160             try:
       
   161                 sc.connect(("127.0.0.1", remPort))
       
   162                 connected = True
       
   163             except Exception as e:
       
   164                 if e.errno == errno.ECONNREFUSED:
       
   165                     retry = retry - 1
       
   166                     time.sleep(conn_TO)
       
   167                 else: self.abend("connect: {}".format(str(e)))
       
   168         if retry == 0: self.abend("connection refused, threshold {} reached".format(connThreshold))
       
   169         ctr_lock.acquire();
       
   170         connects.value += 1;
       
   171         ctr_lock.release()
       
   172         try: self.cli_side[remPort] = sc.makefile("wb")
       
   173         except Exception as e: self.abend("client side makefile: {}".format(str(e)))
       
   174         self.log(2, "connected to {} after {} retries".format(remPort, connThreshold - retry))
       
   175     def acc(self):
       
   176         self.log(4, "accepting...")
       
   177         try:
       
   178             ac = self.ssc.accept()
       
   179             sc = ac[0]
       
   180         except Exception as e: self.abend("accept: {}".format(str(e)))
       
   181         try: sc = sc.makefile("rb")
       
   182         except Exception as e: self.abend("srv side makefile: {}".format(str(e)))
       
   183         self.srv_side[sc] = sc
       
   184         self.log(2, "accepted on sc={}, addr={}".format(sc.fileno(), ac[1]))
       
   185         return sc
       
   186     def close_srv(self):
       
   187         self.log(5, "closing ssc...")
       
   188         if self.ssc: self.ssc.close()
       
   189     def close_cli(self):
       
   190         def do_close():
       
   191             self.log(5, "closing clients...")
       
   192             scs = self.cli_side.values()
       
   193             self.cli_side.clear()
       
   194             for sc in scs: sc.close()
       
   195             self.log(4, "all clients closed")
       
   196         if not self.closing:
       
   197             threading.Thread(target = do_close, name = "client {} close".format(self.locPort)).start()
       
   198             self.closing = True
       
   199 
       
   200 
       
   201 class Data(Debug):
       
   202     def __init__(self, debid, port):
       
   203         self.debid = debid + " payload"
       
   204         self.clear()
       
   205         self.ttl = ittl
       
   206         self.lport = port
       
   207         self.rport = 0
       
   208         self.text = ""
       
   209     def clear(self):
       
   210         (self.ttl, self.rport, self.text) = 3 * (None,)
       
   211     def put(self, sc):
       
   212         self.log(5, "sending via {}...".format(sc.fileno()))
       
   213         if self.ttl == None: self.ttl = ittl
       
   214         if self.text == None: self.text = itext
       
   215         try:
       
   216             pickle.dump((self.ttl, self.lport, self.text), sc)
       
   217             sc.flush()
       
   218         except Exception as e: self.abend("send: {}".format(str(e)))
       
   219     def get(self, sc):
       
   220         self.log(5, "reading from {}...".format(sc.fileno()))
       
   221         self.clear()
       
   222         try:
       
   223             (self.ttl, self.rport, self.text) = pickle.load(sc)
       
   224             return True
       
   225         except Exception as e:
       
   226             if isinstance(e, EOFError): return False
       
   227             else: self.abend("receive: {}".format(str(e)))
       
   228     def dttl(self):
       
   229         self.ttl -= 1
       
   230         return self.ttl
       
   231     def digest(self):
       
   232         return self.text if len(self.text) < 24 else self.text[0:8]+"--------"+self.text[-8:]
       
   233     def toString(self):
       
   234         return "ttl={}, from port={}, text={}".\
       
   235             format(str(self.ttl), str(self.rport), self.digest())
       
   236 
       
   237 class Constellation(Debug):
       
   238     def __init__(self, issl, topo, p0, n):
       
   239         Debug.__init__(self, "{}SSL {}".format("" if issl else "non", topo.upper()))
       
   240     def run(self, issl, topo, p0, n):
       
   241         signal.signal(signal.SIGUSR2, signal.SIG_IGN)
       
   242         forwarding = [multiprocessing.Value('i', 1, lock=False)]                    # list is passed by reference
       
   243         if n == 1:
       
   244             self.log(0, "one-node configuration is not implemented")
       
   245         else:
       
   246             self.log(1, "{} nodes starting...".format(n))
       
   247             p0 += 500 if issl else 0
       
   248             pn = p0 + n - 1
       
   249             for port in range(p0, p0 + n):
       
   250                 pid = os.fork()
       
   251                 if not pid: Node(self.debid, forwarding, topo, port, p0, pn, issl).run()
       
   252                 else: self.log(4, "node {} started in process {}".format(port, pid))
       
   253         self.log(2, "all nodes established")
       
   254         while 1:
       
   255             try:
       
   256                 p = os.wait()
       
   257                 if p[1] & 255:
       
   258                     self.log(4, "pid {} killed by {}".format(p[0], p[1] & 255))
       
   259                 else:
       
   260                     self.log(4, "pid {} returned  {}".format(p[0], p[1] >> 8))
       
   261             except: break
       
   262         os._exit(0)
       
   263 
       
   264 def ga(key, default):
       
   265     return os.environ[key] if key in os.environ else default
       
   266 def gi(key, default):
       
   267     return int(ga(key, default))
       
   268 def sighand(signal, frame):
       
   269     pass
       
   270 
       
   271 debug = Debug("client/server demo")
       
   272 log_lock = multiprocessing.Lock()
       
   273 ctr_lock = multiprocessing.Lock()
       
   274 forwards = multiprocessing.Value('i', 0)
       
   275 connects = multiprocessing.Value('i', 0)
       
   276 active = multiprocessing.Value('i', 0)
       
   277 t0 = time.time()
       
   278 maxDebLev = gi('DEB', 0)
       
   279 mn = rn = gi('N', 0)
       
   280 rp0 = gi('RP0', 11000)
       
   281 rn = gi('RN', 3)
       
   282 mp0 = gi('MP0', 12000)
       
   283 mn = gi('MN', 3)
       
   284 itext = ga('T', "bla bla")
       
   285 ittl = gi('TTL', 3)
       
   286 pace = float(os.environ["P"]) if "P" in os.environ else 0
       
   287 pacing = 1 if pace > 0 else 0
       
   288 random.seed(gi('RS', 0))
       
   289 connThreshold = 77
       
   290 conn_TO = 0.01
       
   291 sel_TO = 1
       
   292 issl = gi('SSL', 0)
       
   293 caPath = "/home/local/etc/ssl/certs/"
       
   294 sslPathSuff = "/../CS/"
       
   295 cePath = os.environ["CEP"] if "CEP" in os.environ else os.path.dirname(sys.argv[0]) + sslPathSuff
       
   296 active.value = mn + rn
       
   297 if issl > 1: active.value *= 2
       
   298 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
       
   299 debug.log(1, "pgm={}, ttl={}, pace={}, seed={}, SSL mask={}, debug={}".format(sys.argv[0], ittl, pace, gi('RS', 0), issl, maxDebLev))
       
   300 if issl > 0: debug.log(3, "ssl path: {}, CA path: {}".format(cePath, caPath))
       
   301           
       
   302 if issl < 2: issl = (issl,)
       
   303 else: issl = (0, 1)
       
   304 if ittl > 0:
       
   305     for ss in issl:
       
   306         topo = ("mash", "ring")
       
   307         p0 = (mp0, rp0)
       
   308         n = (mn, rn)
       
   309         for p in zip(topo, p0, n): 
       
   310             if not os.fork(): 
       
   311                 Constellation(ss, *p).run(ss, *p)
       
   312     while 1:
       
   313         try: os.wait()
       
   314         except: break
       
   315 debug.log(1, "final balance: forwards={}, connections={}".format(forwards.value, connects.value))