新聞中心
Java中的阻塞隊列接口BlockingQueue繼承自Queue接口。

創(chuàng)新互聯(lián)建站主要從事成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)上思,十年網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18982081108
BlockingQueue接口提供了3個添加元素方法:
- add:添加元素到隊列里,添加成功返回true,由于容量滿了添加失敗會拋出IllegalStateException異常;
- offer:添加元素到隊列里,添加成功返回true,添加失敗返回false;
- put:添加元素到隊列里,如果容量滿了會阻塞直到容量不滿。
3個刪除方法:
- poll:刪除隊列頭部元素,如果隊列為空,返回null。否則返回元素;
- remove:基于對象找到對應(yīng)的元素,并刪除。刪除成功返回true,否則返回false;
- take:刪除隊列頭部元素,如果隊列為空,一直阻塞到隊列有元素并刪除。
常用的阻塞隊列具體類有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。
本文以ArrayBlockingQueue和LinkedBlockingQueue為例,分析它們的實現(xiàn)原理。
ArrayBlockingQueue
ArrayBlockingQueue的原理就是使用一個可重入鎖和這個鎖生成的兩個條件對象進(jìn)行并發(fā)控制(classic two-condition algorithm)。
ArrayBlockingQueue是一個帶有長度的阻塞隊列,初始化的時候必須要指定隊列長度,且指定長度之后不允許進(jìn)行修改。
它帶有的屬性如下:
- // 存儲隊列元素的數(shù)組,是個循環(huán)數(shù)組
- final Object[] items;
- // 拿數(shù)據(jù)的索引,用于take,poll,peek,remove方法
- int takeIndex;
- // 放數(shù)據(jù)的索引,用于put,offer,add方法
- int putIndex;
- // 元素個數(shù)
- int count;
- // 可重入鎖
- final ReentrantLock lock;
- // notEmpty條件對象,由lock創(chuàng)建
- private final Condition notEmpty;
- // notFull條件對象,由lock創(chuàng)建
- private final Condition notFull;
數(shù)據(jù)的添加
ArrayBlockingQueue有不同的幾個數(shù)據(jù)添加方法,add、offer、put方法。
add方法:
- public boolean add(E e) {
- if (offer(e))
- return true;
- else
- throw new IllegalStateException("Queue full");
- }
add方法內(nèi)部調(diào)用offer方法如下:
- public boolean offer(E e) {
- checkNotNull(e); // 不允許元素為空
- final ReentrantLock lock = this.lock;
- lock.lock(); // 加鎖,保證調(diào)用offer方法的時候只有1個線程
- try {
- if (count == items.length) // 如果隊列已滿
- return false; // 直接返回false,添加失敗
- else {
- insert(e); // 數(shù)組沒滿的話調(diào)用insert方法
- return true; // 返回true,添加成功
- }
- } finally {
- lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用offer方法
- }
- }
insert方法如下:
- private void insert(E x) {
- items[putIndex] = x; // 元素添加到數(shù)組里
- putIndex = inc(putIndex); // 放數(shù)據(jù)索引+1,當(dāng)索引滿了變成0
- ++count; // 元素個數(shù)+1
- notEmpty.signal(); // 使用條件對象notEmpty通知,比如使用take方法的時候隊列里沒有數(shù)據(jù),被阻塞。這個時候隊列insert了一條數(shù)據(jù),需要調(diào)用signal進(jìn)行通知
- }
put方法:
- public void put(E e) throws InterruptedException {
- checkNotNull(e); // 不允許元素為空
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly(); // 加鎖,保證調(diào)用put方法的時候只有1個線程
- try {
- while (count == items.length) // 如果隊列滿了,阻塞當(dāng)前線程,并加入到條件對象notFull的等待隊列里
- notFull.await(); // 線程阻塞并被掛起,同時釋放鎖
- insert(e); // 調(diào)用insert方法
- } finally {
- lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用put方法
- }
- }
ArrayBlockingQueue的添加數(shù)據(jù)方法有add,put,offer這3個方法,總結(jié)如下:
add方法內(nèi)部調(diào)用offer方法,如果隊列滿了,拋出IllegalStateException異常,否則返回true
offer方法如果隊列滿了,返回false,否則返回true
add方法和offer方法不會阻塞線程,put方法如果隊列滿了會阻塞線程,直到有線程消費了隊列里的數(shù)據(jù)才有可能被喚醒。
這3個方法內(nèi)部都會使用可重入鎖保證原子性。
數(shù)據(jù)的刪除
ArrayBlockingQueue有不同的幾個數(shù)據(jù)刪除方法,poll、take、remove方法。
poll方法:
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock(); // 加鎖,保證調(diào)用poll方法的時候只有1個線程
- try {
- return (count == 0) ? null : extract(); // 如果隊列里沒元素了,返回null,否則調(diào)用extract方法
- } finally {
- lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用poll方法
- }
- }
poll方法內(nèi)部調(diào)用extract方法:
- private E extract() {
- final Object[] items = this.items;
- E x = this.
cast(items[takeIndex]); // 得到取索引位置上的元素 - items[takeIndex] = null; // 對應(yīng)取索引上的數(shù)據(jù)清空
- takeIndex = inc(takeIndex); // 取數(shù)據(jù)索引+1,當(dāng)索引滿了變成0
- --count; // 元素個數(shù)-1
- notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數(shù)據(jù)的時候隊列已滿,被阻塞。這個時候消費了一條數(shù)據(jù),隊列沒滿了,就需要調(diào)用signal進(jìn)行通知
- return x; // 返回元素
- }
take方法:
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly(); // 加鎖,保證調(diào)用take方法的時候只有1個線程
- try {
- while (count == 0) // 如果隊列空,阻塞當(dāng)前線程,并加入到條件對象notEmpty的等待隊列里
- notEmpty.await(); // 線程阻塞并被掛起,同時釋放鎖
- return extract(); // 調(diào)用extract方法
- } finally {
- lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用take方法
- }
- }
remove方法:
- public boolean remove(Object o) {
- if (o == null) return false;
- final Object[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lock(); // 加鎖,保證調(diào)用remove方法的時候只有1個線程
- try {
- for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素
- if (o.equals(items[i])) { // 兩個對象相等的話
- removeAt(i); // 調(diào)用removeAt方法
- return true; // 刪除成功,返回true
- }
- }
- return false; // 刪除成功,返回false
- } finally {
- lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用remove方法
- }
- }
removeAt方法:
- void removeAt(int i) {
- final Object[] items = this.items;
- if (i == takeIndex) { // 如果要刪除數(shù)據(jù)的索引是取索引位置,直接刪除取索引位置上的數(shù)據(jù),然后取索引+1即可
- items[takeIndex] = null;
- takeIndex = inc(takeIndex);
- } else { // 如果要刪除數(shù)據(jù)的索引不是取索引位置,移動元素元素,更新取索引和放索引的值
- for (;;) {
- int nexti = inc(i);
- if (nexti != putIndex) {
- items[i] = items[nexti];
- i = nexti;
- } else {
- items[i] = null;
- putIndex = i;
- break;
- }
- }
- }
- --count; // 元素個數(shù)-1
- notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數(shù)據(jù)的時候隊列已滿,被阻塞。這個時候消費了一條數(shù)據(jù),隊列沒滿了,就需要調(diào)用signal進(jìn)行通知
- }
ArrayBlockingQueue的刪除數(shù)據(jù)方法有poll,take,remove這3個方法,總結(jié)如下:
poll方法對于隊列為空的情況,返回null,否則返回隊列頭部元素。
remove方法取的元素是基于對象的下標(biāo)值,刪除成功返回true,否則返回false。
poll方法和remove方法不會阻塞線程。
take方法對于隊列為空的情況,會阻塞并掛起當(dāng)前線程,直到有數(shù)據(jù)加入到隊列中。
這3個方法內(nèi)部都會調(diào)用notFull.signal方法通知正在等待隊列滿情況下的阻塞線程。
LinkedBlockingQueue
LinkedBlockingQueue是一個使用鏈表完成隊列操作的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表。
內(nèi)部使用放鎖和拿鎖,這兩個鎖實現(xiàn)阻塞(“two lock queue” algorithm)。
它帶有的屬性如下:
- // 容量大小
- private final int capacity;
- // 元素個數(shù),因為有2個鎖,存在競態(tài)條件,使用AtomicInteger
- private final AtomicInteger count = new AtomicInteger(0);
- // 頭結(jié)點
- private transient Node
head; - // 尾節(jié)點
- private transient Node
last; - // 拿鎖
- private final ReentrantLock takeLock = new ReentrantLock();
- // 拿鎖的條件對象
- private final Condition notEmpty = takeLock.newCondition();
- // 放鎖
- private final ReentrantLock putLock = new ReentrantLock();
- // 放鎖的條件對象
- private final Condition notFull = putLock.newCondition();
ArrayBlockingQueue只有1個鎖,添加數(shù)據(jù)和刪除數(shù)據(jù)的時候只能有1個被執(zhí)行,不允許并行執(zhí)行。
而LinkedBlockingQueue有2個鎖,放鎖和拿鎖,添加數(shù)據(jù)和刪除數(shù)據(jù)是可以并行進(jìn)行的,當(dāng)然添加數(shù)據(jù)和刪除數(shù)據(jù)的時候只能有1個線程各自執(zhí)行。
數(shù)據(jù)的添加
LinkedBlockingQueue有不同的幾個數(shù)據(jù)添加方法,add、offer、put方法。
add方法內(nèi)部調(diào)用offer方法:
- public boolean offer(E e) {
- if (e == null) throw new NullPointerException(); // 不允許空元素
- final AtomicInteger count = this.count;
- if (count.get() == capacity) // 如果容量滿了,返回false
- return false;
- int c = -1;
- Node
node = new Node(e); // 容量沒滿,以新元素構(gòu)造節(jié)點 - final ReentrantLock putLock = this.putLock;
- putLock.lock(); // 放鎖加鎖,保證調(diào)用offer方法的時候只有1個線程
- try {
- if (count.get() < capacity) { // 再次判斷容量是否已滿,因為可能拿鎖在進(jìn)行消費數(shù)據(jù),沒滿的話繼續(xù)執(zhí)行
- enqueue(node); // 節(jié)點添加到鏈表尾部
- c = count.getAndIncrement(); // 元素個數(shù)+1
- if (c + 1 < capacity) // 如果容量還沒滿
- notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數(shù)據(jù)了,隊列還沒滿
- }
- } finally {
- putLock.unlock(); // 釋放放鎖,讓其他線程可以調(diào)用offer方法
- }
- if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費數(shù)據(jù),count會變化。這里的if條件表示如果隊列中還有1條數(shù)據(jù)
- signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數(shù)據(jù),可以進(jìn)行消費
- return c >= 0; // 添加成功返回true,否則返回false
- }
put方法:
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException(); // 不允許空元素
- int c = -1;
- Node
node = new Node(e); // 以新元素構(gòu)造節(jié)點 - final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly(); // 放鎖加鎖,保證調(diào)用put方法的時候只有1個線程
- try {
- while (count.get() == capacity) { // 如果容量滿了
- notFull.await(); // 阻塞并掛起當(dāng)前線程
- }
- enqueue(node); // 節(jié)點添加到鏈表尾部
- c = count.getAndIncrement(); // 元素個數(shù)+1
- if (c + 1 < capacity) // 如果容量還沒滿
- notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數(shù)據(jù)了,隊列還沒滿
- } finally {
- putLock.unlock(); // 釋放放鎖,讓其他線程可以調(diào)用put方法
- }
- if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費數(shù)據(jù),count會變化。這里的if條件表示如果隊列中還有1條數(shù)據(jù)
- signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數(shù)據(jù),可以進(jìn)行消費
- }
LinkedBlockingQueue的添加數(shù)據(jù)方法add,put,offer跟ArrayBlockingQueue一樣,不同的是它們的底層實現(xiàn)不一樣。
ArrayBlockingQueue中放入數(shù)據(jù)阻塞的時候,需要消費數(shù)據(jù)才能喚醒。
而LinkedBlockingQueue中放入數(shù)據(jù)阻塞的時候,因為它內(nèi)部有2個鎖,可以并行執(zhí)行放入數(shù)據(jù)和消費數(shù)據(jù),不僅在消費數(shù)據(jù)的時候進(jìn)行喚醒插入阻塞的線程,同時在插入的時候如果容量還沒滿,也會喚醒插入阻塞的線程。
數(shù)據(jù)的刪除
LinkedBlockingQueue有不同的幾個數(shù)據(jù)刪除方法,poll、take、remove方法。
poll方法:
- public E poll() {
- final AtomicInteger count = this.count;
- if (count.get() == 0) // 如果元素個數(shù)為0
- return null; // 返回null
- E x = null;
- int c = -1;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock(); // 拿鎖加鎖,保證調(diào)用poll方法的時候只有1個線程
- try {
- if (count.get() > 0) { // 判斷隊列里是否還有數(shù)據(jù)
- x = dequeue(); // 刪除頭結(jié)點
- c = count.getAndDecrement(); // 元素個數(shù)-1
- if (c > 1) // 如果隊列里還有元素
- notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數(shù)據(jù),可以再次消費
- }
- } finally {
- takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調(diào)用poll方法
- }
- if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數(shù)據(jù),count會變化。這里的if條件表示如果隊列中還可以再插入數(shù)據(jù)
- signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數(shù)據(jù)
- return x;
- }
take方法:
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調(diào)用take方法的時候只有1個線程
- try {
- while (count.get() == 0) { // 如果隊列里已經(jīng)沒有元素了
- notEmpty.await(); // 阻塞并掛起當(dāng)前線程
- }
- x = dequeue(); // 刪除頭結(jié)點
- c = count.getAndDecrement(); // 元素個數(shù)-1
- if (c > 1) // 如果隊列里還有元素
- notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數(shù)據(jù),可以再次消費
- } finally {
- takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調(diào)用take方法
- }
- if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數(shù)據(jù),count會變化。這里的if條件表示如果隊列中還可以再插入數(shù)據(jù)
- signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數(shù)據(jù)
- return x;
- }
remove方法:
- public boolean remove(Object o) {
- if (o == null) return false;
- fullyLock(); // remove操作要移動的位置不固定,2個鎖都需要加鎖
- try {
- for (Node
trail = head, p = trail.next; // 從鏈表頭結(jié)點開始遍歷 - p != null;
- trail = p, p = p.next) {
- if (o.equals(p.item)) { // 判斷是否找到對象
- unlink(p, trail); // 修改節(jié)點的鏈接信息,同時調(diào)用notFull的signal方法
- return true;
- }
- }
- return false;
- } finally {
- fullyUnlock(); // 2個鎖解鎖
- }
- }
LinkedBlockingQueue的take方法對于沒數(shù)據(jù)的情況下會阻塞,poll方法刪除鏈表頭結(jié)點,remove方法刪除指定的對象。
需要注意的是remove方法由于要刪除的數(shù)據(jù)的位置不確定,需要2個鎖同時加鎖。
新聞名稱:Java阻塞隊列實現(xiàn)原理分析
標(biāo)題路徑:http://fisionsoft.com.cn/article/dpsipig.html


咨詢
建站咨詢
