异步服务器和客户端处理来自控制台的输入
我有一个 asyncio TCP 服务器,它从客户端获取消息,在服务器上执行 stuff() 并将文本发回.服务器在正确接收和发送数据的意义上运行良好.问题是我无法从客户端的服务器取回消息,因为我有来自控制台的输入的阻塞例程(基本上 data_received 方法从不执行).只有退出命令工作正常(它关闭循环).如何解决这个问题?这是服务器和客户端代码.它基本上是 EchoClient asyncio 版本,还有一些用于练习的管道代码.
I have an asyncio TCP server that take messages from client, do stuff() on server and sends texts back. Server works well in the sense that receives and sends data correctly. Problem is that I can't takes messages back from server in the client because I have the blocking routine on input from console (basically the data_received method is never executed). Only the exit command works fine (it closes the loop). How to solve this? This is the server and client code. It's basically the EchoClient asyncio version with some more plumbing code for an exercise.
# client.py
import abc
import asyncio
import sys
MENU = '''
a) do x
b) do y
c) exit
'''
loop_ = asyncio.get_event_loop()
class XCommand:
def run(self):
self.client.send_data_to_tcp('X:') # to bytes
class YCommand(Command):
def run(self):
s = input('Input for Y ### ')
self.client.send_data_to_tcp('Y:' + s)
class ExitCommand(Command):
def run(self):
self.client.send_data_to_tcp('EXIT:')
print('Goodbye!')
loop_.close()
exit()
class CommandFactory:
_cmds = {'a': ACommand,
'b': BCommand,
'c': ExitCommand,
}
@classmethod
def get_cmd(cls, cmd):
cmd_cls = cls._cmds.get(cmd)
return cmd_cls
def show_menu(client):
print(MENU)
while True:
command = input('Insert Command$: ')
cmd_cls = CommandFactory.get_cmd(command)
if not cmd_cls:
print('Unknown: {}'.format(command))
continue
cmd_cls(client).run()
class Client(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print('Data received from server: \n{!r}'.format(data.decode()), flush=True)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
def main():
client = Client(loop_)
coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
loop_.run_until_complete(coro)
loop_.run_in_executor(None, show_menu(client)) # I've tried this...not working
loop_.run_forever()
loop_.close()
if __name__ == '__main__':
main()
# server.py
import abc
import asyncio
import sys
from asyncio_exercise.db import DB
class ACommand:
@classmethod
def run(cls, db, param1=None, param2=None):
res = db.a()
if not res:
return '>>>>>>>>>>> Empty <<<<<<<<<<<<<'
return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())
class BCommand:
@classmethod
def run(cls, db, param1=None, param2=None):
db.b(param1, param2)
return 'B Ok!'
class ExitCommand:
@classmethod
def run(cls, db, param1=None, param2=None):
loop.close()
server.close()
loop.run_until_complete(server.wait_closed())
print('Buona giornata!!!')
sys.exit(0)
class CommandFactory:
_cmds = {'X': ACommand,
'Y': BCommand,
'EXIT': ExitCommand}
@classmethod
def get_cmd(cls, cmd):
tokens = cmd.split(':')
cmd = tokens[0]
if len(tokens) == 1:
param1, param2 = None, None
else:
param1, param2 = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
cmd_cls = cls._cmds.get(cmd)
return cmd_cls, param1, param2
class Server(asyncio.Protocol):
db_filename = '../data/db'
db = DB(db_filename)
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
res = cmd_cls.run(self.db, param1, param2)
print('Sending response: {!r}'.format(res))
self.transport.write(bytes(res, encoding='UTF-8'))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(Server, '127.0.0.1', 10888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
更新:解决方案是使用 aioconsole 包和输入函数.下面使用 aioconsole 的代码(工作非常好).
UPDATE: The solution was to use aioconsole package and ainput function.Below code using aioconsole (working very good).
# server.py
import abc
import asyncio
from d_1_networking.esercizio_soluzione.SOversion.dummydb import DummyDB as DB
class Command(metaclass=abc.ABCMeta):
@abc.abstractclassmethod
def run(self, a, b, c):
raise NotImplementedError()
class XCommand(Command):
@classmethod
def run(cls, db, param1=None, param2=None):
res = db.x()
if not res:
return '>>>>>>>>>>> Empty response! <<<<<<<<<<<<<'
return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())
class YCommand(Command):
@classmethod
def run(cls, db, param1=None, param2=None):
db.y(param1)
return 'Operation Y OK: {}'.format(param1)
class QuitCommand(Command):
@classmethod
def run(cls, rubrica_db, param1=None, param2=None):
return 'Disconnected...'
class CommandFactory:
_cmds = {'X': XCommand,
'Y': YCommand,
'DISCONNECT': QuitCommand}
@classmethod
def get_cmd(cls, cmd):
tokens = cmd.split(':')
cmd = tokens[0]
if len(tokens) == 1:
nome, numero = None, None
else:
nome, numero = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
cmd_cls = cls._cmds.get(cmd)
return cmd_cls, nome, numero
class Server(asyncio.Protocol):
db_filename = '../data/exercise.db'
db = DB(db_filename)
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
res = cmd_cls.run(self.db, param1, param2)
print('Sending response: {!r}'.format(res))
self.transport.write(bytes(res, encoding='UTF-8'))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(RubricaServer, '127.0.0.1', 10888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
#dummydb.py
class DummyDB:
def __init__(self, fn):
self.fn = fn
def x(self):
return {'field_a': '55 tt TTYY 3334 gghyyujh',
'field_b': 'FF hhhnneeekk',
'field_c': '00993342489048222 news'}
def y(self, param):
return param
# client.py
import abc
from asyncio import *
from aioconsole import ainput
MENU = '''
---------------------------
A) Command X
B) Command Y (require additional input)
C) Quit program
---------------------------
'''
loop_ = get_event_loop()
class Command(metaclass=abc.ABCMeta):
asyn = False
def __init__(self, tcp_client):
self.client = tcp_client
@abc.abstractmethod
def run(self):
raise NotImplementedError()
class ACommand(Command):
def run(self):
# send X command to server
self.client.send_data_to_tcp('X:')
class BCommand(Command):
asyn = True
async def run(self):
s = await ainput('Insert data for B operation (es. name:43d3HHte3) > ')
# send Y command to server
self.client.send_data_to_tcp('Y:' + s)
class QuitCommand(Command):
def run(self):
self.client.send_data_to_tcp('DISCONNECT:')
print('Goodbye!!!')
self.client.disconnect()
exit()
class CommandFactory:
_cmds = {'A': ACommand,
'B': BCommand,
'C': QuitCommand}
@classmethod
def get_cmd(cls, cmd):
cmd = cmd.strip()
cmd_cls = cls._cmds.get(cmd)
return cmd_cls
class Client(Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def disconnect(self):
self.loop.stop()
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print('Data received from server: \n===========\n{}\n===========\n'.format(data.decode()), flush=True)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
def menu():
print(MENU)
async def main():
menu()
while True:
cmd = await ainput('Insert Command >')
cmd_cls = CommandFactory.get_cmd(cmd)
if not cmd_cls:
print('Unknown: {}'.format(cmd))
elif cmd_cls.asyn:
await cmd_cls(client).run()
else:
cmd_cls(client).run()
if __name__ == '__main__':
client = Client(loop_)
coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
loop_.run_until_complete(coro)
loop_.run_until_complete(main())
你可以考虑使用 aioconsole.ainput:
from aioconsole import ainput
async def some_coroutine():
line = await ainput(">>> ")
[...]
该项目可在 PyPI 上找到.
The project is available on PyPI.