新聞中心
今天在寫zabbix storm job監(jiān)控腳本的時候用到了python的redis模塊,之前也有用過,但是沒有過多的了解,今天看了下相關(guān)的api和源碼,看到有ConnectionPool的實現(xiàn),這里簡單說下。

來安ssl適用于網(wǎng)站、小程序/APP、API接口等需要進行數(shù)據(jù)傳輸應(yīng)用場景,ssl證書未來市場廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場價格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18982081108(備注:SSL證書合作)期待與您的合作!
在ConnectionPool之前,如果需要連接redis,我都是用StrictRedis這個類,在源碼中可以看到這個類的具體解釋:
- redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
- implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
| 1 2 | r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx() |
有了ConnectionPool這個類之后,可以使用如下方法
- r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
- r.xxxx()
這里Redis是StrictRedis的子類
簡單分析如下:
在StrictRedis類的__init__方法中,可以初始化connection_pool這個參數(shù),其對應(yīng)的是一個ConnectionPool的對象:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | class StrictRedis( object ): ........ def __init__( self , host = 'localhost' , port = 6379 , db = 0 , password = None , socket_timeout = None , socket_connect_timeout = None , socket_keepalive = None , socket_keepalive_options = None , connection_pool = None , unix_socket_path = None , encoding = 'utf-8' , encoding_errors = 'strict' , charset = None , errors = None , decode_responses = False , retry_on_timeout = False , ssl = False , ssl_keyfile = None , ssl_certfile = None , ssl_cert_reqs = None , ssl_ca_certs = None ): if not connection_pool: .......... connection_pool = ConnectionPool( * * kwargs) self .connection_pool = connection_pool |
在StrictRedis的實例執(zhí)行具體的命令時會調(diào)用execute_command方法,這里可以看到具體實現(xiàn)是從連接池中獲取一個具體的連接,然后執(zhí)行命令,完成后釋放連接:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command( self , * args, * * options): "Execute a command and return a parsed response" pool = self .connection_pool command_name = args[ 0 ] connection = pool.get_connection(command_name, * * options) #調(diào)用ConnectionPool.get_connection方法獲取一個連接 try : connection.send_command( * args) #命令執(zhí)行,這里為Connection.send_command return self .parse_response(connection, command_name, * * options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance (e, TimeoutError): raise connection.send_command( * args) return self .parse_response(connection, command_name, * * options) finally : pool.release(connection) #調(diào)用ConnectionPool.release釋放連接 |
在來看看ConnectionPool類:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | class ConnectionPool( object ): ........... def __init__( self , connection_class = Connection, max_connections = None , * * connection_kwargs): #類初始化時調(diào)用構(gòu)造函數(shù) max_connections = max_connections or 2 * * 31 if not isinstance (max_connections, ( int , long )) or max_connections < 0 : #判斷輸入的max_connections是否合法 raise ValueError( '"max_connections" must be a positive integer' ) self .connection_class = connection_class #設(shè)置對應(yīng)的參數(shù) self .connection_kwargs = connection_kwargs self .max_connections = max_connections self .reset() #初始化ConnectionPool 時的reset操作 def reset( self ): self .pid = os.getpid() self ._created_connections = 0 #已經(jīng)創(chuàng)建的連接的計數(shù)器 self ._available_connections = [] #聲明一個空的數(shù)組,用來存放可用的連接 self ._in_use_connections = set () #聲明一個空的集合,用來存放已經(jīng)在用的連接 self ._check_lock = threading.Lock() ....... def get_connection( self , command_name, * keys, * * options): #在連接池中獲取連接的方法 "Get a connection from the pool" self ._checkpid() try : connection = self ._available_connections.pop() #獲取并刪除代表連接的元素,在***次獲取connectiong時,因為_available_connections是一個空的數(shù)組, 會直接調(diào)用make_connection方法 except IndexError: connection = self .make_connection() self ._in_use_connections.add(connection) #向代表正在使用的連接的集合中添加元素 return connection def make_connection( self ): #在_available_connections數(shù)組為空時獲取連接調(diào)用的方法 "Create a new connection" if self ._created_connections > = self .max_connections: #判斷創(chuàng)建的連接是否已經(jīng)達到***限制,max_connections可以通過參數(shù)初始化 raise ConnectionError( "Too many connections" ) self ._created_connections + = 1 #把代表已經(jīng)創(chuàng)建的連接的數(shù)值+1 return self .connection_class( * * self .connection_kwargs) #返回有效的連接,默認為Connection(**self.connection_kwargs) def release( self , connection): #釋放連接,鏈接并沒有斷開,只是存在鏈接池中 "Releases the connection back to the pool" self ._checkpid() if connection.pid ! = self .pid: return self ._in_use_connections.remove(connection) #從集合中刪除元素 self ._available_connections.append(connection) #并添加到_available_connections 的數(shù)組中 def disconnect( self ): #斷開所有連接池中的鏈接 "Disconnects all connections in the pool" all_conns = chain( self ._available_connections, self ._in_use_connections) for connection in all_conns: connection.disconnect() |
execute_command最終調(diào)用的是Connection.send_command方法,關(guān)閉鏈接為 Connection.disconnect方法,而Connection類的實現(xiàn):
| 1 2 3 4 5 6 7 | class Connection( object ): "Manages TCP communication to and from a Redis server" def __del__( self ): #對象刪除時的操作,調(diào)用disconnect釋放連接 try : self .disconnect() except Exception: pass |
核心的鏈接建立方法是通過socket模塊實現(xiàn):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | def _connect( self ): err = None for res in socket.getaddrinfo( self .host, self .port, 0 , socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try : sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 ) # TCP_KEEPALIVE if self .socket_keepalive: #構(gòu)造函數(shù)中默認 socket_keepalive=False,因此這里默認為短連接 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 ) for k, v in iteritems( self .socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # set the socket_connect_timeout before we connect sock.settimeout( self .socket_connect_timeout) #構(gòu)造函數(shù)中默認socket_connect_timeout=None,即連接為blocking的模式 # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout( self .socket_timeout) #構(gòu)造函數(shù)中默認socket_timeout=None return sock except socket.error as _: err = _ if sock is not None : sock.close() ..... |
關(guān)閉鏈接的方法:
| 1 2 3 4 5 6 7 8 9 10 11 | def disconnect( self ): "Disconnects from the Redis server" self ._parser.on_disconnect() if self ._sock is None : return try : self ._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close self ._sock.close() except socket.error: pass self ._sock = None |
可以小結(jié)如下
1)默認情況下每創(chuàng)建一個Redis實例都會構(gòu)造出一個ConnectionPool實例,每一次訪問redis都會從這個連接池得到一個連接,操作完成后會把該連接放回連接池(連接并沒有釋放),可以構(gòu)造一個統(tǒng)一的ConnectionPool,在創(chuàng)建Redis實例時,可以將該ConnectionPool傳入,那么后續(xù)的操作會從給定的ConnectionPool獲得連接,不會再重復(fù)創(chuàng)建ConnectionPool。
2)默認情況下沒有設(shè)置keepalive和timeout,建立的連接是blocking模式的短連接。
3)不考慮底層tcp的情況下,連接池中的連接會在ConnectionPool.disconnect中統(tǒng)一銷毀。
本文出自 “菜光光的博客” 博客,請務(wù)必保留此出處http://caiguangguang.blog./1652935/1583541
分享文章:【博文推薦】Pythonredis鏈接建立實現(xiàn)分析
標題來源:http://fisionsoft.com.cn/article/cdhcigg.html


咨詢
建站咨詢
