class Application(object):
#创建服务端socket,并绑定IP和端口并添加相应设置,注:未开始通过while监听accept,等待客户端连接 def listen(self, port, address="", **kwargs):
from tornado.httpserver import HTTPServerserver = HTTPServer(self, **kwargs)server.listen(port, address)
详细代码:
class HTTPServer(object):
def__init__(self, request_callback, no_keep_alive=False, io_loop=None,xheaders=False, ssl_options=None):
#Application对象
self.request_callback = request_callback
#是否长连接
self.no_keep_alive = no_keep_alive
#IO循环
self.io_loop = io_loop
self.xheaders = xheaders
#Http和Http
self.ssl_options = ssl_options
self._socket = None
self._started = False
def listen(self, port, address=""):
self.bind(port, address)
self.start(1)
def bind(self, port, address=None, family=socket.AF_UNSPEC):
assertnot self._socket
#创建服务端socket对象,IPV4和TCP连接
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags)
#配置socket对象
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setblocking(0)
#绑定IP和端口 self._socket.bind((address, port))
#最大阻塞数量
self._socket.listen(128)
def start(self, num_processes=1):
assertnot self._started
self._started = True
if num_processes is None or num_processes <= 0:
num_processes = _cpu_count()
if num_processes > 1 and ioloop.IOLoop.initialized():
logging.error("Cannot run in multiple processes: IOLoop instance ""has already been initialized. You cannot call ""IOLoop.instance() before calling start()")
num_processes = 1
#如果进程数大于1if num_processes > 1:
logging.info("Pre-forking %d server processes", num_processes)
for i in range(num_processes):
if os.fork() == 0:
import random
from binascii import hexlify
try:
# If available, use the same method as# random.py
seed = long(hexlify(os.urandom(16)), 16)
except NotImplementedError:
# Include the pid to avoid initializing two# processes to the same value
seed(int(time.time() * 1000) ^ os.getpid())
random.seed(seed)
self.io_loop = ioloop.IOLoop.instance()
self.io_loop.add_handler(
self._socket.fileno(), self._handle_events,
ioloop.IOLoop.READ)
return
os.waitpid(-1, 0)
#进程数等于1,默认else:
ifnot self.io_loop:
#设置成员变量self.io_loop为IOLoop的实例,注:IOLoop使用methodclass完成了一个单例模式
self.io_loop = ioloop.IOLoop.instance()
#执行IOLoop的add_handler方法,将socket句柄、self._handle_events方法和IOLoop.READ当参数传入 self.io_loop.add_handler(self._socket.fileno(),
self._handle_events,
ioloop.IOLoop.READ)
def _handle_events(self, fd, events):
while True:
try:
#====important=====#
connection, address = self._socket.accept()
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
returnraiseif self.ssl_options isnot None:
assert ssl, "Python 2.6+ and OpenSSL required for SSL"try:
#====important=====#
connection = ssl.wrap_socket(connection,server_side=True,do_handshake_on_connect=False,**self.ssl_options)
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_EOF:
return connection.close()
else:
raiseexcept socket.error, err:
if err.args[0] == errno.ECONNABORTED:
return connection.close()
else:
raisetry:
if self.ssl_options isnot None:
stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
else:
stream = iostream.IOStream(connection, io_loop=self.io_loop)
#====important=====# HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders)
except:
logging.error("Error in connection callback", exc_info=True)
HTTPServer
class IOLoop(object):
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
def__init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
self._set_close_exec(self._impl.fileno())
self._handlers = {}
self._events = {}
self._callbacks = []
self._timeouts = []
self._running = False
self._stopped = False
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake# the I/O loop when it is idleif os.name != 'nt':
r, w = os.pipe()
self._set_nonblocking(r)
self._set_nonblocking(w)
self._set_close_exec(r)
self._set_close_exec(w)
self._waker_reader = os.fdopen(r, "rb", 0)
self._waker_writer = os.fdopen(w, "wb", 0)
else:
self._waker_reader = self._waker_writer = win32_support.Pipe()
r = self._waker_writer.reader_fd
self.add_handler(r, self._read_waker, self.READ)
@classmethod
def instance(cls):
ifnot hasattr(cls, "_instance"):
cls._instance = cls()
return cls._instance
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = stack_context.wrap(handler)
self._impl.register(fd, events | self.ERROR)
IOLoop
def wrap(fn):
'''Returns a callable object that will resore the current StackContext
when executed.
Use this whenever saving a callback to be executed later in a
different execution context (either in a different thread or
asynchronously in the same thread).
'''if fn is None:
return None
# functools.wraps doesn't appear to work on functools.partial objects#@functools.wraps(fn)def wrapped(callback, contexts, *args, **kwargs):
# If we're moving down the stack, _state.contexts is a prefix# of contexts. For each element of contexts not in that prefix,# create a new StackContext object.# If we're moving up the stack (or to an entirely different stack),# _state.contexts will have elements not in contexts. Use# NullContext to clear the state and then recreate from contexts.if (len(_state.contexts) > len(contexts) or
any(a[1] isnot b[1]
for a, b in itertools.izip(_state.contexts, contexts))):
# contexts have been removed or changed, so start over
new_contexts = ([NullContext()] +
[cls(arg) for (cls,arg) in contexts])
else:
new_contexts = [cls(arg)
for (cls, arg) in contexts[len(_state.contexts):]]
if len(new_contexts) > 1:
with contextlib.nested(*new_contexts):
callback(*args, **kwargs)
elif new_contexts:
with new_contexts[0]:
callback(*args, **kwargs)
else:
callback(*args, **kwargs)
if getattr(fn, 'stack_context_wrapped', False):
return fn
contexts = _state.contexts
result = functools.partial(wrapped, fn, contexts)
result.stack_context_wrapped = True
return result
class IOLoop(object):
def start(self):
"""Starts the I/O loop.
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""if self._stopped:
self._stopped = False
return
self._running = True
while True:
# Never use an infinite timeout here - it can stall epoll
poll_timeout = 0.2
# Prevent IO event starvation by delaying new callbacks# to the next iteration of the event loop.
callbacks = self._callbacks
self._callbacks = []
for callback in callbacks:
self._run_callback(callback)
if self._callbacks:
poll_timeout = 0.0
if self._timeouts:
now = time.time()
while self._timeouts and self._timeouts[0].deadline <= now:
timeout = self._timeouts.pop(0)
self._run_callback(timeout.callback)
if self._timeouts:
milliseconds = self._timeouts[0].deadline - now
poll_timeout = min(milliseconds, poll_timeout)
ifnot self._running:
breakif self._blocking_signal_threshold isnot None:
# clear alarm so it doesn't fire while poll is waiting for# events. signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception, e:
# Depending on python version and IOLoop implementation,# different exception types may be thrown and there are# two ways EINTR might be signaled:# * e.errno == errno.EINTR# * e.args is like (errno.EINTR, 'Interrupted system call')if (getattr(e, 'errno', None) == errno.EINTR or
(isinstance(getattr(e, 'args', None), tuple) and
len(e.args) == 2 and e.args[0] == errno.EINTR)):
continueelse:
raiseif self._blocking_signal_threshold isnot None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
# Pop one fd at a time from the set of pending fds and run# its handler. Since that handler may perform actions on# other file descriptors, there may be reentrant calls to# this IOLoop that update self._events self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except (KeyboardInterrupt, SystemExit):
raiseexcept (OSError, IOError), e:
if e.args[0] == errno.EPIPE:
# Happens when the client closes the connectionpasselse:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
except:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold isnot None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)