主頁 > 後端開發 > celery筆記三之task和task的呼叫

celery筆記三之task和task的呼叫

2023-06-14 08:27:13 後端開發

本文首發于公眾號:Hunter后端
原文鏈接:celery筆記三之task和task的呼叫

這一篇筆記介紹 task 和 task 的呼叫,

以下是本篇筆記目錄:

  1. 基礎的 task 定義方式
  2. 日志處理
  3. 任務重試
  4. 忽略任務運行結果
  5. task 的呼叫

1、基礎的 task 定義方式

前面兩篇筆記中介紹了最簡單的定義方式,使用 @app.task 作為裝飾器:

@app.task
def add(x, y):
    return x + y

如果是在 Django 系統中使用 celery,需要定義一個延時任務或者周期定時任務,可以使用 @shared_task 來修飾

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

在 Django 系統中使用 celery 的方式會在接下來的幾篇筆記中介紹道,

多個裝飾器

如果是 celery 的任務和其他裝飾器一起聯用,記得將 celery 的裝飾器放在最后使用,也就是串列的最前面:

@app.task
@decorator1
@decorator2
def add(x, y):
    return x + y

task名稱

每個 task 都有一個唯一的名稱用來標識這個 task,如果我們在定義的時候不指定,系統會為我們默認一個名稱,這些名稱會在 celery 的 worker 啟動的時候被系統掃描然后輸出一個串列展示,

還是上一篇筆記中我們定義的兩個 task,我們給其中一個指定 name:

#tasks1.py
from .celery import app


@app.task(name="tasks1.add")
def add(x, y):
    return x + y

可以觀察在 celery 的 worker 啟動的時候,會有一個輸出:

[tasks]
  . proj.tasks2.mul
  . tasks1.add

可以看到這個地方,系統就會使用我們定義的 name 了,

2、日志處理

我們可以在啟動 worker 的時候指定日志的輸出,定義格式如下:

celery -A proj worker -l INFO --logfile=/Users/hunter/python/celery_log/celery.log

在 task 中的定義可以使用 celery 中方法:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

也可以直接使用 logging 模塊:

import logging

logger1 = logging.getLogger(__name__)

直接在 task 中輸出:

@app.task(name="tasks1.add")
def add(x, y):
    logger.info("this is from logger")
    return x + y

然后在 worker 啟動時指定的日志檔案就會有我們列印出的日志內容:

[2022-07-24 16:28:33,210: INFO/ForkPoolWorker-7] tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a]: this is from logger
[2022-07-24 16:28:33,224: INFO/ForkPoolWorker-7] Task tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a] succeeded in 0.016244667931459844s: 3

3、任務重試

對于一個 task,我們可以對其設定 retry 引數來指定其在任務執行失敗后會重試幾次,以及隔多長時間重試,

比如對于下面的 div() 函式,我們來輸入除數為 0 的情況查看重試的功能,

當然,這里我們是故意輸入引數錯誤,在實際的專案中可能會是其他的原因造成任務失敗,比如資料庫連接失敗等

任務重試的引數也都在 @app.task() 中定義:

# tasks1.py

@app.task(autoretry_for=(Exception, ),  default_retry_delay=10, retry_kwargs={'max_retries': 5})
def div(x, y):
    return x / y

在這里,autoretry_for 表示的是某種報錯情況下重試,我們定義的 Exception 表示任何錯誤都重試,

如果只是想在某種特定的 exception 情況下重試,將那種 exception 的值替換 Exception 即可,

default_retry_delay 表示重試間隔時長,默認值是 3 * 60s,即三分鐘,是以秒為單位,這里我們設定的是 10s,

retry_kwargs 是一個 dict,其中有一個 max_retries 引數,表示的是最大重試次數,我們定為 5

然后可以嘗試呼叫這個延時任務:

from proj.tasks1 import div
div.delay(1, 0)

然后可以看到在日志檔案會有如下輸出:

[2022-07-24 16:59:35,653: INFO/ForkPoolWorker-7] Task proj.tasks1.div[1f65c410-1b2a-4127-9d83-a84b1ad9dd2c] retry: Retry in 10s: ZeroDivisionError('division by zero',)

且每隔 10s 執行一次,一共執行 5 次,5次之后還是不成功則會報錯,

retry_backoff 和 retry_backoff_max

還有一個 retry_backoff 和 retry_backoff_max 引數,這兩個引數是用于這種情況:如果你的 task 依賴另一個 service 服務,比如會呼叫其他系統的 API,然后這兩個引數可以用于避免請求過多的占用服務,

retry_backoff 引數可以設定成一個 布爾型資料,為 True 的話,自動重試的時間間隔會成倍的增長

第一次重試是 1 s后
第二次是 2s 后
第三次是 4s 后
第四次是 8s 后
...

如果 retry_backoff 引數是一個數字,比如是 3,那么后續的間隔時間則是 3 的倍數增長

第一次重試 3s 后
第二次是 6s 后
第三次是 12s 后
第四次是 24s 后

retry_backoff_max 是重試的最大的間隔時間,比如重試次數設定的很大,retry_backoff 的間隔時間重復達到了這個值之后就不再增大了,

這個值默認是 600s,也就是 10分鐘,

我們看一下下面這個例子:

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

關于重試的機制,理論上應該是按照我們前面列出來的重試時間間隔進行重試,但是如果我們這樣直接運行 div.delay(),得出的間隔時間是不定的,是在 0 到 最大值之間得出的一個隨機值,

這樣產生的原因是因為還有一個 retry_jitter 引數,這個引數默認是 True,所以時間間隔會是一個隨機值,

如果需要任務延時的間隔值是按照 retry_backoff 和 retry_backoff_max 兩個設定值來運行,那么則需要將 retry_jitter 值設為 False,

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_jitter=False, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

然后運行 div 的延時任務,就可以看到延時任務按照規律的間隔時間重試了,以下是日志:

[2022-07-24 19:00:38,588: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 2s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:40,662: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:40,664: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 4s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:44,744: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:44,746: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 8s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:52,870: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:52,872: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 16s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:09,338: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:09,340: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 32s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:41,843: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:41,845: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:02:21,923: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:02:21,925: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:03:02,001: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:03:02,003: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)

因為我們設定的重試間隔時間最大為 40s,所以這個地方延時間隔時間到了 40 之后,就不再往上繼續增長了,

4、忽略任務運行結果

有時候延時任務的結果我們并不想保存,但是我們配置了 result_backend 引數,這個時候我們有三種方式不保存運行結果,

1.ignore_result=True 不保存任務運行的結果

@app.task(ignore_result=True)
def add(x, y):
    return x + y

2.app.conf 配置

也可以通過 app.conf 的配置來禁用結果的保存:

app.conf.update(
    task_ignore_result=True
)

3.執行單個任務的時候禁用

from proj.tasks1 import add
add.apply_async((1, 2), ignore_result=True)

apply_async() 函式的作用相當于是帶引數的 delay(),或者 delay() 是簡化版的 apply_async(),這個我們下面會介紹,

5、task 的呼叫

前面簡單兩個簡單的呼叫方法,一個是 apply_async(),一個是 delay(),

簡單來說就是 delay() 是不帶引數執行的 apply_async(),

以下用 add() 函式為例介紹一下他們的用法:

delay()

純粹的延時任務,只能如下操作:

add.delay(1, 2)

apply_async()

帶引數的用法,add() 函式的引數用 () 包起來:

add.apply_async((1, 2))

也可以帶其他引數,比如上面介紹的不保存運行結果:

add.apply_async((1, 2), ignore_result=True)

這個函式還可以指定延時的時間:

countdown引數

現在開始 10s 后開始運行:

add.apply_async((1, 2), countdown=10)

eta引數

也可以用 eta 引數來指定 10s 后運行:

from datetime import datetime, timedelta

now = datetime.now()
add.apply_async((1, 2), eta=now + timedelta(seconds=10))

expires引數

這個是用來設定過期的引數:

add.apply_async((1, 2), countdown=60, expires=120)

上面的引數表示,距現在60秒后開始執行,兩分鐘后過期

如果想獲取更多后端相關文章,可掃碼關注閱讀:
image

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555074.html

標籤:其他

上一篇:現代C++學習指南-型別系統

下一篇:返回列表

標籤雲
其他(160907) Python(38226) JavaScript(25493) Java(18235) C(15237) 區塊鏈(8270) C#(7972) AI(7469) 爪哇(7425) MySQL(7248) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5875) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4591) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2435) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1984) 功能(1967) HtmlCss(1964) Web開發(1951) C++(1939) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1881) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • celery筆記三之task和task的呼叫

    > 本文首發于公眾號:Hunter后端 > 原文鏈接:[celery筆記三之task和task的呼叫](https://mp.weixin.qq.com/s/AIobDZVDWV3r_XauvmkVKA) 這一篇筆記介紹 task 和 task 的呼叫。 以下是本篇筆記目錄: 1. 基礎的 task ......

    uj5u.com 2023-06-14 08:27:13 more
  • 現代C++學習指南-型別系統

    > 在前一篇,我們提供了一個方向性的指南,但是學什么,怎么學卻沒有詳細展開。本篇將在前文的基礎上,著重介紹下怎樣學習C++的型別系統。 ### 寫在前面 在進入型別系統之前,我們應該先達成一項共識——盡可能使用C++的現代語法。眾所周知,出于兼容性的考慮,C++中很多語法都是合法的。但是隨著新版本的 ......

    uj5u.com 2023-06-14 08:16:26 more
  • 【解決一個小問題】golang 的 `-race`選項導致 unsafe代碼 panic

    **作者:張富春(ahfuzhang),轉載時請注明作者和參考鏈接,謝謝!** * [cnblogs博客](https://www.cnblogs.com/ahfuzhang/) * [zhihu](https://www.zhihu.com/people/ahfuzhang/posts) * [G ......

    uj5u.com 2023-06-14 08:15:18 more
  • 混沌演練狀態下,如何降低應用的 MTTR(平均恢復時間)

    如何在混沌演練的場景中降低應用的MTTR,必須需要根據監控定位,然后人工進行反饋進行處理嗎?是否可以自動化,是否有方案可以降低混沌演練程序中的影響?以此達到快速止血,進一步提高系統的穩定性。本篇文章將根據一些思考和實踐來解答以上問題。 ......

    uj5u.com 2023-06-14 08:15:05 more
  • 爬取豆瓣Top250圖書資料

    #爬取豆瓣Top250圖書資料 專案的實作步驟 1.專案結構 2.獲取網頁資料 3.提取網頁中的關鍵資訊 4.保存資料 **1.專案結構** ![image](https://img2023.cnblogs.com/blog/3047082/202306/3047082-20230613170853 ......

    uj5u.com 2023-06-14 08:14:56 more
  • [ARM匯編]計算機原理與數制基礎—1.1.2 二進制與十進制數制轉換

    在計算機中,我們通常使用二進制數制來表示資料,因為計算機的基本電平只有兩種狀態:高電平(通常表示為 1)和低電平(通常表示為 0)。而在我們的日常生活中,我們習慣使用十進制數制。為了方便理解,我們需要掌握二進制與十進制之間的轉換方法。 #### 二進制轉十進制 將二進制數轉換為十進制數時,我們需要將 ......

    uj5u.com 2023-06-14 08:14:48 more
  • Go 語言之 sqlx 庫使用

    # Go 語言之 sqlx 庫使用 ## 一、sqlx 庫安裝與連接 ### sqlx 介紹 sqlx is a library which provides a set of extensions on go's standard `database/sql` library. The sqlx ......

    uj5u.com 2023-06-14 08:14:39 more
  • 【python基礎】復雜資料型別-字典(嵌套)

    有時候,需要將一系列字典存盤在串列中,或將串列作為值存盤在字典中,這稱為**嵌套**。我們可以在串列中嵌套字典、在字典中嵌套串列、在字典中嵌套字典。 # 1.串列嵌套字典 我們可以把一個人的資訊放在字典中,但是多個人的資訊我們無法放在同一個字典中,所以就需要字典串列。 其語法格式: [字典1,字典2 ......

    uj5u.com 2023-06-14 08:09:16 more
  • 搭建springbootweb環境

    #搭建springboot環境(idea環境) 實作步驟: 1.基礎環境配置 2.maven配置 3.撰寫第一個程式helloworld(可能有兩個小問題) 4.運行(jar包運行,命令列運行) 一.基礎環境配置 進入idea,點擊file->new->project,在彈出的頁面上,選擇sprin ......

    uj5u.com 2023-06-14 07:58:44 more
  • 每天一道面試題:Spring的Bean生命周期

    Spring的Bean生命周期包括以下步驟: 1、實體化(Instantiation):當Spring容器接收到創建Bean的請求時,它會先實體化Bean物件。這個程序可以通過建構式、工廠方法或者反序列化等方式完成; 2、屬性賦值(Populate Properties):在實體化Bean物件后, ......

    uj5u.com 2023-06-14 07:53:30 more