在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 問答/Python  網(wǎng)絡(luò)安全/ celery處理10w級別的更新記錄的任務(wù),是創(chuàng)建10w個任務(wù),還是創(chuàng)建一個任務(wù)

celery處理10w級別的更新記錄的任務(wù),是創(chuàng)建10w個任務(wù),還是創(chuàng)建一個任務(wù)掃表循環(huán)處理?各有何優(yōu)勢?

數(shù)據(jù)庫有10w個記錄,半年后可能會增加到20w,但最終應(yīng)該不會超過100w

服務(wù)器配置:
python3.6 celery+rabbitMQ
云主機 ubuntu 16.04 1G 1核
數(shù)據(jù)庫 postgresql 10, 有100個連接數(shù)的限制

表結(jié)構(gòu)如下:

1111.png

last_update字段是上次請求的時間(每條記錄我們需要至少1小時內(nèi)更新一次,允許有10分鐘的誤差)
uuid 字段決定發(fā)起請求時傳給對方api的參數(shù)

每個記錄的last_update 可能不一樣,是根據(jù)這個記錄的添加時間而定的,以后每次更新記錄,這個字段就發(fā)生變化

我們目前程序的思路是:
在celery中創(chuàng)建了一個任務(wù)A,這個任務(wù)每隔1小時工作一次,
查詢出 更新時間在1小時之前的 的所有記錄,
然后用for循環(huán) 對查詢出的記錄拼接url,把拼接的Url發(fā)送給異步任務(wù)B

任務(wù)B的目的很簡單,根據(jù)得到的url,去請求數(shù)據(jù),寫入數(shù)據(jù)庫,并更新last_update 字段

這種方式,只要創(chuàng)建2個celery任務(wù)即可,但是總感覺這樣不太健壯
網(wǎng)上說celery可以支撐百萬級別的任務(wù),我就在考慮 要不要每個記錄,創(chuàng)建一個celery任務(wù)?

斗膽發(fā)帖求助各位前輩,我這種情況,用哪種思路比較好? 大家有什么改進方案嗎?

非常感謝

回答
編輯回答
久舊酒

樓主現(xiàn)在都實現(xiàn),就已經(jīng)是每個記錄對應(yīng)一個 task實例了。
首先我們先做兩個定義:

  1. task,就是你定義的celery方法,比如:
@celery.task
def celery_task():
  pass
  1. task實例,就是實際將要運行的任務(wù)
task_instance = celery_task.delay()

任務(wù)一,查詢;任務(wù)二,遍歷及更新。
所以樓主本身的設(shè)計就是:
兩個task,百萬級(數(shù)據(jù)足夠多的話)task實例(即已經(jīng)為每個符合條件的數(shù)據(jù)創(chuàng)建了一個任務(wù)了)。

由于評論里不是很方便回答樓主的問題,就在這里做出評論里問題的回答了。
方案一:
增加celery的消費者,及將worker數(shù)量加大。
不建議,因為不可控因素較多,還可能達(dá)不到預(yù)期效果。
方案二(個人建議,可根據(jù)情況修改):
自行增加判斷標(biāo)志位。
不知道樓主是怎樣使用celery的,就假設(shè)通過redis完成的發(fā)布訂閱任務(wù)操作了。

# 以定時任務(wù)的方式啟動,沒小時執(zhí)行一次
@celery.task
def query_from_db():
  results = db.query
  for result in results:
    if redis.get(result.id):
      continue
    # 設(shè)置一個超時時間
    # update在一小時內(nèi)成功,下次執(zhí)行query_from_db任務(wù)時,仍會創(chuàng)建新的更新任務(wù)
    # update失敗,超時后,redis刪除result.id相應(yīng)記錄,即超時后會創(chuàng)建新的更新任務(wù)
    redis.set(result.id, 'something', two_hours)
    update_result.delay(result.id)
    
@celery.task
def update_result(result_id):
  result = db.query.get(result_id)
  rv = requests.get(.....)
  result.update(rv.json())
  redis.delete(result_id)
2018年4月11日 02:39