新聞中心
為了避免由于一些網(wǎng)絡(luò)或等其他不可控因素,而引起的功能性問題。比如在發(fā)送請(qǐng)求時(shí),會(huì)因?yàn)榫W(wǎng)絡(luò)不穩(wěn)定,往往會(huì)有請(qǐng)求超時(shí)的問題。

站在用戶的角度思考問題,與客戶深入溝通,找到臨湘網(wǎng)站設(shè)計(jì)與臨湘網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、主機(jī)域名、網(wǎng)站空間、企業(yè)郵箱。業(yè)務(wù)覆蓋臨湘地區(qū)。
這種情況下,我們通常會(huì)在代碼中加入重試的代碼。重試的代碼本身不難實(shí)現(xiàn),但如何寫得優(yōu)雅、易用,是我們要考慮的問題。
這里要給大家介紹的是一個(gè)第三方庫 - Tenacity (標(biāo)題中的重試機(jī)制并并不準(zhǔn)確,它不是 Python 的內(nèi)置模塊,因此并不能稱之為機(jī)制),它實(shí)現(xiàn)了幾乎我們可以使用到的所有重試場景,比如:
- 在什么情況下才進(jìn)行重試?
- 重試幾次呢?
- 重試多久后結(jié)束?
- 每次重試的間隔多長呢?
- 重試失敗后的回調(diào)?
在使用它之前 ,先要安裝它
- $ pip install tenacity
1. 最基本的重試
無條件重試,重試之間無間隔
- from tenacity import retry
- @retry
- def test_retry():
- print("等待重試,重試無間隔執(zhí)行...")
- raise Exception
- test_retry()
無條件重試,但是在重試之前要等待 2 秒
- from tenacity import retry, wait_fixed
- @retry(wait=wait_fixed(2))
- def test_retry():
- print("等待重試...")
- raise Exception
- test_retry()
2. 設(shè)置停止基本條件
只重試7 次
- from tenacity import retry, stop_after_attempt
- @retry(stop=stop_after_attempt(7))
- def test_retry():
- print("等待重試...")
- raise Exception
- test_retry()
重試 10 秒后不再重試
- from tenacity import retry, stop_after_delay
- @retry(stop=stop_after_delay(10))
- def test_retry():
- print("等待重試...")
- raise Exception
- test_retry()
或者上面兩個(gè)條件滿足一個(gè)就結(jié)束重試
- from tenacity import retry, stop_after_delay, stop_after_attempt
- @retry(stop=(stop_after_delay(10) | stop_after_attempt(7)))
- def test_retry():
- print("等待重試...")
- raise Exception
- test_retry()
3. 設(shè)置何時(shí)進(jìn)行重試
在出現(xiàn)特定錯(cuò)誤/異常(比如請(qǐng)求超時(shí))的情況下,再進(jìn)行重試
- from requests import exceptions
- from tenacity import retry, retry_if_exception_type
- @retry(retry=retry_if_exception_type(exceptions.Timeout))
- def test_retry():
- print("等待重試...")
- raise exceptions.Timeout
- test_retry()
在滿足自定義條件時(shí),再進(jìn)行重試。
如下示例,當(dāng) test_retry 函數(shù)返回值為 False 時(shí),再進(jìn)行重試
- from tenacity import retry, stop_after_attempt, retry_if_result
- def is_false(value):
- return value is False
- @retry(stop=stop_after_attempt(3),
- retry=retry_if_result(is_false))
- def test_retry():
- return False
- test_retry()
4. 重試后錯(cuò)誤重新拋出
當(dāng)出現(xiàn)異常后,tenacity 會(huì)進(jìn)行重試,若重試后還是失敗,默認(rèn)情況下,往上拋出的異常會(huì)變成 RetryError,而不是最根本的原因。
因此可以加一個(gè)參數(shù)(reraise=True),使得當(dāng)重試失敗后,往外拋出的異常還是原來的那個(gè)。
- from tenacity import retry, stop_after_attempt
- @retry(stop=stop_after_attempt(7), reraise=True)
- def test_retry():
- print("等待重試...")
- raise Exception
- test_retry()
5. 設(shè)置回調(diào)函數(shù)
當(dāng)最后一次重試失敗后,可以執(zhí)行一個(gè)回調(diào)函數(shù)
- from tenacity import *
- def return_last_value(retry_state):
- print("執(zhí)行回調(diào)函數(shù)")
- return retry_state.outcome.result() # 表示返回原函數(shù)的返回值
- def is_false(value):
- return value is False
- @retry(stop=stop_after_attempt(3),
- retry_error_callback=return_last_value,
- retry=retry_if_result(is_false))
- def test_retry():
- print("等待重試中...")
- return False
- print(test_retry())
輸出如下
- 等待重試中...
- 等待重試中...
- 等待重試中...
- 執(zhí)行回調(diào)函數(shù)
- False
分享名稱:少有人知的Python"重試機(jī)制"
本文來源:http://fisionsoft.com.cn/article/dpsoios.html


咨詢
建站咨詢
