Hello I am running into a issue where select.select is hitting file limits with the number of sockets we're running over. To fix this, a move from select.select to select.epoll seems like the solution to over come the OS limit (well not the OS, it seems to be with python?). Anything over 1024 causes it to crash and give an error ValueError: filedescriptor out of range in select()
Either way, I am trying to rewrite the following code to support epoll instead of select.select and see if that fixes any of the issues we're noticing with a large number of traffic flowing in.
Here is what it looks like now:
Either way, I am trying to rewrite the following code to support epoll instead of select.select and see if that fixes any of the issues we're noticing with a large number of traffic flowing in.
Here is what it looks like now:
class TCPServer: request_queue_size = 20 def __init__(self, address): self.address = Address.wrap(address) self.__is_shut_down = threading.Event() self.__shutdown_request = False self.socket = socket.socket(self.address.family, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.address()) self.address = Address.wrap(self.socket.getsockname()) self.socket.listen(self.request_queue_size) self.handler_counter = Counter() def connection_thread(self, connection, client_address): with self.handler_counter: client_address = Address(client_address) try: self.handle_client_connection(connection, client_address) except: self.handle_error(connection, client_address) finally: close_socket(connection) def serve_forever(self, poll_interval=0.1): self.__is_shut_down.clear() try: while not self.__shutdown_request: try: r, w_, e_ = select.select( [self.socket], [], [], poll_interval) except select.error as ex: # pragma: no cover if ex[0] == EINTR: continue else: raise if self.socket in r: connection, client_address = self.socket.accept() t = basethread.BaseThread( "TCPConnectionHandler (%s: %s:%s -> %s:%s)" % ( self.__class__.__name__, client_address[0], client_address[1], self.address.host, self.address.port ), target=self.connection_thread, args=(connection, client_address), ) t.setDaemon(1) try: t.start() except threading.ThreadError: self.handle_error(connection, Address(client_address)) connection.close() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): self.__shutdown_request = True self.__is_shut_down.wait() self.socket.close() self.handle_shutdown() def handle_error(self, connection_, client_address, fp=sys.stderr): """ Called when handle_client_connection raises an exception. """ # If a thread has persisted after interpreter exit, the module might be # none. if traceback: exc = str(traceback.format_exc()) print(u'-' * 40, file=fp) print( u"Error in processing of request from %s" % repr(client_address), file=fp) print(exc, file=fp) print(u'-' * 40, file=fp) def handle_client_connection(self, conn, client_address): # pragma: no cover """ Called after client connection. """ raise NotImplementedError def handle_shutdown(self): """ Called after server shutdown. """ def wait_for_silence(self, timeout=5): start = time.time() while 1: if time.time() - start >= timeout: raise exceptions.Timeout( "%s service threads still alive" % self.handler_counter.count ) if self.handler_counter.count == 0: returnHere is what I have so far - but obs. not the end result I am looking for:
class TCPServer: request_queue_size = 20 def __init__(self, address): self.address = Address.wrap(address) self.__is_shut_down = threading.Event() self.__shutdown_request = False self.socket = socket.socket(self.address.family, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.address()) self.address = Address.wrap(self.socket.getsockname()) self.socket.listen(self.request_queue_size) self.handler_counter = Counter() self.epoll = select.epoll() self.epoll.register(self.socket.fileno(), select.EPOLLIN) def connection_thread(self, connection, client_address): with self.handler_counter: client_address = Address(client_address) try: self.handle_client_connection(connection, client_address) except: self.handle_error(connection, client_address) finally: close_socket(connection) def serve_forever(self, poll_interval=0.1): self.__is_shut_down.clear() try: connections = {}; requests = {}; responses = {} while not self.__shutdown_request: events = self.epoll.poll(1) for fileno, event in events: if fileno == self.socket.fileno(): connection, address = self.socket.accept() connection.setblocking(0) self.epoll.register(connection.fileno(), select.EPOLLIN) connections[connection.fileno()] = connection elif event & select.EPOLLIN: requests[fileno] += connections[fileno].recv(1024) if EOL1 in requests[fileno] or EOL2 in requests[fileno]: epoll.modify(fileno, select.EPOLLOUT) connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) elif event & select.EPOLLOUT: byteswritten = connections[fileno].send(responses[fileno]) responses[fileno] = responses[fileno][byteswritten:] if len(responses[fileno]) == 0: connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0) epoll.modify(fileno, 0) connections[fileno].shutdown(socket.SHUT_RDWR) elif event & select.EPOLLHUP: epoll.unregister(fileno) connections[fileno].close() del connections[fileno] finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): self.__shutdown_request = True self.__is_shut_down.wait() self.epoll.unregister(self.socket.fileno()) self.socket.close() self.handle_shutdown() def handle_error(self, connection_, client_address, fp=sys.stderr): """ Called when handle_client_connection raises an exception. """ # If a thread has persisted after interpreter exit, the module might be # none. if traceback: exc = str(traceback.format_exc()) print(u'-' * 40, file=fp) print( u"Error in processing of request from %s" % repr(client_address), file=fp) print(exc, file=fp) print(u'-' * 40, file=fp) def handle_client_connection(self, conn, client_address): # pragma: no cover """ Called after client connection. """ raise NotImplementedError def handle_shutdown(self): """ Called after server shutdown. """ def wait_for_silence(self, timeout=5): start = time.time() while 1: if time.time() - start >= timeout: raise exceptions.Timeout( "%s service threads still alive" % self.handler_counter.count ) if self.handler_counter.count == 0: return