1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
# -*- coding: utf-8 -*-
import gevent.monkey from gevent.server import StreamServer import gevent.queue import gevent import collections import logging
root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) log_format = logging.Formatter('%(asctime)s.%(filename)s[line:%(lineno)d] %(levelname)s %(message)s') stream_handler = logging.StreamHandler() stream_handler.setFormatter(log_format) root_logger.addHandler(stream_handler)
# 猴子补丁 gevent.monkey.patch_socket()
# 客户端类 Client = collections.namedtuple('Client', ['sock', 'address', 'queue'])
class WriteLoop(gevent.Greenlet): """用于循环投递队列消息""" def __init__(self, sk, address, queue, container): super(WriteLoop, self).__init__() self.sk = sk self.address = address self.queue = queue self.client_container = container
def _run(self): while True: data = self.queue.get() quit_number = None need_remove = False for number, client in self.client_container.items(): if client.sock != self.sk: try: client.sock.send(data) # logging.info('write {} from {} to {}'.format(data, self.address, client.address)) except Exception as e: logging.error(e) logging.error('{} disconnect'.format(self.address)) quit_number = number need_remove = True
if need_remove and quit_number: logging.info('remove {}'.format(self.client_container.get(quit_number))) del self.client_container[quit_number]
def number_generator(): """用于生成递增数字""" start = 0 while True: yield start start += 1
class TCPAdapter(gevent.Greenlet): def __init__(self, address): super(TCPAdapter, self).__init__() self.client_container = {} self.number_gen = number_generator() self.server = StreamServer(address, handle=self._handle)
def _handle(self, sock, address): logging.info('new connection, {}'.format(address)) number = next(self.number_gen) # 获取标识 queue = gevent.queue.Queue() # 初始化队列 self.client_container[number] = Client(sock, address, queue) # 创建客户端对象 write_loop = WriteLoop(sock, address, queue, self.client_container) # 队列的处理循环 write_loop.start() while True: data = sock.recv(1024) if not data: logging.info('{} disconnect'.format(address)) break # logging.info('{} put {}'.format(address, data)) queue.put(data) # 释放该客户端资源 write_loop.kill() write_loop.join() del self.client_container[number] logging.info('{} released'.format(address))
def _run(self): self.server.serve_forever()
if __name__ == '__main__': address1 = ('0.0.0.0', 9001) address2 = ('0.0.0.0', 9002) logging.info('server on {}'.format(address1)) logging.info('server on {}'.format(address2)) adapter1 = TCPAdapter(address1) adapter2 = TCPAdapter(address2) adapter1.start() adapter2.start() gevent.joinall([adapter1, adapter2])
|
近期评论