在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 問答/Python  網(wǎng)絡(luò)安全/ 如何在Celery任務(wù)中“優(yōu)雅”地管理數(shù)據(jù)庫session?

如何在Celery任務(wù)中“優(yōu)雅”地管理數(shù)據(jù)庫session?

我有一些操作數(shù)據(jù)庫的celery任務(wù),需要在代碼開頭新建session,在代碼結(jié)尾關(guān)閉session。

當(dāng)前的方式是:

@app.task
def celery_task():
    session = DB_Session()
    try:
        foo
    except:
        bar
    finally:
        session.close()

但問題是,這樣對錯誤處理很不友好。而且一整個任務(wù)里都用try...except...finally感覺很難看。


最近看Celery的官方文檔,想到可以新建一個自定義的MyTask,并在Task.after_return()中關(guān)閉session,然后所有celery任務(wù)定義的時候都base=MyTask。如下:

class MyTask(celery.task):
    def after_return(self, *foo):
        self.session.close()

@app.task(bind=True, base=MyTask)
def celery_task(self):
    foo

我的問題是,如何才能在一個task創(chuàng)建的時候執(zhí)行新建session的操作?我在celery的文檔中沒有找到一個方法是在task創(chuàng)建的時候執(zhí)行的。

回答
編輯回答
雨萌萌

官方文檔或源碼中有沒有相關(guān)實現(xiàn)不是很清楚,但是就算沒有實現(xiàn)自己實現(xiàn)也是可以的。
相關(guān)實現(xiàn)大概率會通過decorator來做,所以自己寫一個decorator就可以了

import functools

import flask

@bp.route('status', methods=['GET'])
def status():
  celery_task.delay()
  return flask.jsonify({'status': 'ok'})

def my_task(fn):
  @functools.wraps(fn)
  def func_wrapper(*args, **kwargs):
    try:
      session = DB_Session()
      res = fn('session', *args, **kwargs)
    except:
      pass
    finally:
      session.close()
      return res
  return func_wrapper


@app.task
@my_task
def celery_task(session):
  pass
2018年5月2日 19:32