在龙卷风的Python APScheduler运行异步功能
我想开发一个小应用程序,它会从API收集天气数据。我已经使用APScheduler执行函数每隔x分钟。我使用Python龙卷风框架。
I am trying to develop a small app which will gather weather data from an API. I have used APScheduler to execute the function every x minutes. I use Python Tornado framework.
我得到的错误是:
INFO Job "GetWeather (trigger: interval[0:01:00], next run at: 2015-03-28 11:40:58 CET)" executed successfully
ERROR Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x0335C978>, <tornado.concurrent.Future object at 0x03374430>)
Traceback (most recent call last):
File "C:\Python34\Lib\site-packages\tornado\ioloop.py", line 568, in _run_callback
ret = callback()
File "C:\Python34\Lib\site-packages\tornado\stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
greenlet.error: cannot switch to a different thread
我认为这是从协程从GetWeather()来作为,如果我删除了这一切asycn功能,它的工作原理。
Which I think is coming from the Coroutine from GetWeather() as, if I remove all asycn features from it, it works.
我使用的电机来读取所需的坐标和MongoDB中通过他们通过API和存储的天气数据。
I am using Motor to read needed coordinates and pass them through the API and store weather data in MongoDB.
import os.path, logging
import tornado.web
import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
from tornado import gen
from tornado.options import define, options
from apscheduler.schedulers.tornado import TornadoScheduler
import motor
client = motor.MotorClient()
db = client['apitest']
console_log = logging.getLogger(__name__)
define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")
class MainRequest (tornado.web.RequestHandler):
def get(self):
self.write("Hello")
scheduler = TornadoScheduler()
class ScheduledTasks(object):
def get(self):
print("This is the scheduler");
def AddJobs():
scheduler.add_job(GetWeather, 'interval', minutes=1)
def StartScheduler():
scheduler.start();
def StopScheduler():
scheduler.stop();
class Weather(tornado.web.RequestHandler):
def get(self):
self.write("This is the Weather Robot!")
GetWeather()
@gen.coroutine
def GetWeather():
'''
Getting city weather from forecast.io API
'''
console_log.debug('Start: weather robot')
cursor = FindCities()
while (yield cursor.fetch_next):
city = cursor.next_object()
lat = str(city["lat"])
lon = str(city["lon"])
http_client = AsyncHTTPClient()
response = yield http_client.fetch("https://api.forecast.io/forecast/3925d0668cf520768ca855951f1097cd/%s,%s" %(lat, lon))
if response.error:
print ("Error:", response.error)
# Store all cities with errors in order to save them in the log file
else:
json = tornado.escape.json_decode(response.body)
temperature = json["currently"]["temperature"]
summary = json["currently"]["summary"]
db.cities.update({'_id': city["_id"]}, {'$set': {'temperature': temperature, 'summary': summary}})
console_log.debug('End: weather robot')
return
def FindCities():
'''
cities = [{
"_id" : ObjectId("55165d07258058ee8dca2172"),
"name" : "London",
"country" : "United Kingdom",
"lat" : 51.507351,
"lon" : -0.127758
},
{
"_id" : ObjectId("55165d07258058ee8dca2173"),
"name" : "Barcelona",
"country" : "Spain",
"lat" : 41.385064,
"lon" : 2.173403
}
'''
cities = db.cities.find().sort([('_id', -1)])
return cities
def main():
logging.basicConfig(level=logging.DEBUG,format='%(levelname)-8s %(message)s')
app = tornado.web.Application(
[
(r'/robots/weather', Weather),
(r'/', MainRequest)
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
AddJobs()
StartScheduler()
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
任何想法,我做错了什么?正如我在APScheduler code看到,TornadoScheduler()运行在龙卷风IOLoop ...(https://bitbucket.org/agronholm/apscheduler/src/a34075b0037dba46735bae67f598ec6133003ef1/apscheduler/schedulers/tornado.py?at=master)
哦!我忘了说,这个想法是能够通过APScheduler或手动两种执行任务。
Oh! I forgot to say that the idea is to be able to execute the task via APScheduler or manually both.
非常感谢!
默认情况下,TornadoScheduler奔跑安排在一个线程池的任务。特定的任务,但是,使用IOLoop等期望在相同的线程中运行。为了解决这个问题,你可以使用龙卷风IOLoop的add_callback()方法来安排任务是在IOLoop的线程尽快运行。
By default, TornadoScheduler runs scheduled tasks in a thread pool. Your specific task, however, uses the IOLoop and so expects to be run in the same thread. To fix this, you can use the add_callback() method of the tornado IOLoop to schedule a task to be run in the IOLoop's thread as soon as possible.
像这样:
def your_scheduled_task():
IOLoop.instance().add_callback(your_real_task_function)
甚至更好:
scheduler.add_job(IOLoop.instance().add_callback, 'interval', minutes=1, args=[GetWeather])