您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

并发包阻塞队列之ArrayBlockingQueue

2024/3/1 9:57:17发布17次查看
jdk1.7.0_79 
上一节中对并发包中的非阻塞队列concurrentlinkedqueue的入队、出队做了一个简要的分析,本文将对并发包中的阻塞队列做一个简要分析。
java并发包中的阻塞队列一共7个,当然他们都是线程安全的。
arrayblockingqueue:一个由数组结构组成的有界阻塞队列。
linkedblockingqueue:一个由链表结构组成的有界阻塞队列。
priorityblockingqueue:一个支持优先级排序的无界阻塞队列。
dealyqueue:一个使用优先级队列实现的无界阻塞队列。
synchronousqueue:一个不存储元素的阻塞队列。
linkedtransferqueue:一个由链表结构组成的无界阻塞队列。
linkedblockingdeque:一个由链表结构组成的双向阻塞队列。(摘自《java并发编程的艺术》)
在本文对arrayblockingqueue阻塞队列做一个简要解析
对于arraylinkedqueue,放眼看过去其安全性的保证是由reentrantlock保证的,有关reentrantlock的解析可参考《5.lock接口及其实现reentrantlock》,在下文我也会适当的提及。
首先来查看其构造函数: 
构造方法
public arrayblockingqueue(int capacity)
构造指定大小的有界队列
public arrayblockingqueue(int capacity, boolean fair)
构造指定大小的有界队列,指定为公平或非公平锁
public arrayblockingqueue(int capacity, boolean fair, collection<? extends e> c)
构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合
1 public arrayblockingqueue(int capacity) {   2 this(capacity, false);//默认构造非公平锁的阻塞队列  3 }   4 public arrayblockingqueue(int capacity, boolean fair) {   5 if (capacity <= 0) 6 throw new illegalargumentexception(); 7 this.items = new object[capacity]; 8 lock = new reentrantlock(fair);//初始化reentrantlock重入锁,出队入队拥有这同一个锁 9 notempty = lock.newcondition;//初始化非空等待队列,有关condition可参考《6.类似object监视器方法的condition接口》10 notfull = lock.newcondition;//初始化非满等待队列 11 } 12 public arrayblockingqueue(int capacity, boolean fair, collecation<? extends e> c) {  13 this(capacity, fair);  14 final reentrantlock lock = this.lock;  15 lock.lock();//注意在这个地方需要获得锁,这为什么需要获取锁的操作呢? 16 try {  17 int i = 0;  18 try {  19 for (e e : c) {  20 checknotnull(e);  21 item[i++] = e;//将集合添加进数组构成的队列中 22 }  23 } catch (arrayindexoutofboundsexception ex) {  24 throw new illegalargumentexception();  25 }  26 count = i;//队列中的实际数据数量 27 putindex = (i == capacity) ? 0 : i;  28 } finally {  29 lock.unlock();  30 }  31 }
在第15行,源码里给了一句注释: lock only for visibility, not mutual exclusion。这句话的意思就是给出,这个锁的操作并不是为了互斥操作,而是保证其可见性。线程t1是实例化arrayblockingqueue对象,t2是对实例化的arrayblockingqueue对象做入队操作(当然要保证t1和t2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),t1的集合c有可能只存在t1线程维护的缓存中,并没有写回主存,t2中实例化的arrayblockingqueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。
以下是arrayblockingqueue的一些出队入队操作。
队列元素的插入
抛出异常
返回值(非阻塞)
一定时间内返回值
返回值(阻塞)
插入
add(e)//队列未满时,返回true;队列满则抛出illegalstateexception(“queue full”)异常——abstractqueue
offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。
offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。
put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
//arrayblockingqueue#add public boolean add(e e) {  return super.add(e);  }
//abstractqueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出illegalstateexception异常,具体offer实现交给子类实现。 public boolean add(e e) {  if (offer(e))//offer方法由queue接口定义 return true;  else throw new illegalstateexception();  }
//arrayblockingqueue#offer,队列未满时返回true,满时返回false public boolean offer(e e) {  checknotnull(e);//检查入队元素是否为空 final reentrantlock lock = this.lock;  lock.lock();//获得锁,线程安全 try {  if (count == items.length)//队列满时,不阻塞等待,直接返回false return false;  else {  insert(e);//队列未满,直接插入 return true;  }  } finally { lock.unlock(); } }
//arrayblockingqueue#insert private void insert(e e) {  items[putindex] = x;  putindex = inc(putindex);  ++count;  notempty.signal();//唤醒非空等待队列中的线程,有关condition可参考《6.类似object监视器方法的condition接口》
}
在这里有几个arrayblockingqueue成员变量。items即队列的数组引用,putindex表示等待插入的数组下标位置。当items[putindex] = x将新元素插入队列中后,调用inc将数组下标向后移动,如果队列满则将putindex置为0:
//arrayblockingqueue#inc private int inc(int i) {  return (++i == items.length) ? 0 : i;  }
接着解析下put方法,阻塞插入队列,当队列满时不会返回false,也不会抛出异常,而是一直阻塞等待,直到有空位可插入,但它可被中断返回。
//arrayblockingqueue#put public void put(e e) throws interruptedexception {  checknotnull(e);//同样检查插入元素是否为空 final reentrantlock lock = this.lock;  lock.lockinterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockinterruptibly,该方法可被线程中断返回,lock不能被中断返回。 try {  while (count == items.length)  notfull.await();//当队列满时,使非满等待队列休眠 insert(e);//此时表示队列非满,故插入元素,同时在该方法里唤醒非空等待队列 } finally {  lock.unlock();  }  }
队列元素的删除 抛出异常
返回值(非阻塞)
一定时间内返回值
返回值(阻塞)
remove()//队列不为空时,返回队首值并移除;队列为空时抛出nosuchelementexception()异常——abstractqueue
poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。
poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值
take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。
//abstractqueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, public e remove() {  e x = poll();//poll方法由queue接口定义 if (x != null)  return x;  else throw new nosuchelementexception();  }
//arrayblockingqueue#poll,队列中有元素时返回元素,不为空时返回null public e poll() {  final reentrantlock lock = this.lock;  lock.lock();  try {  return (count == 0) ? null : extract();  } finally {  lock.unlock();  }  }
//arrayblockingqueue#extract private e extract() {  final object[] items = this.items;  e x = this.<e>cast(items[takeindex]);//移除队首元素 items[takeindex] = null;//将队列数组中的第一个元素置为null,便于gc回收 takeindex = inc(takeindex);  --count;  notfull.signal();//唤醒非满等待队列线程 return x;  }
对比add和offer方法,理解了上两个方法后remove和poll实际不难理解,同理在理解了put阻塞插入队列后,对比take阻塞删除队列元素同样也很好理解。
//arrayblockqueue#take public e take() throws interruptedexception {  final reentrantlock lock = this.lock();  lock.lockinterrupted();//这里并没有调用lock方法,而是调用了可被中断的lockinterruptibly,该方法可被线程中断返回,lock不能被中断返回。 try {  while (count == 0)//队列元素为空 notempty.await();//非空等待队列休眠 return extract();//此时表示队列非空,故删除元素,同时在里唤醒非满等待队列 } finally {  lock.unlock();  }  }
最后一个方法size。
public int size() {  final reentrantlock lock = this.lock;  lock.lock();  try {  return count;  } finally {  lock.unlock();  }  }
可以看到arrayblockingqueue队列的size方法,是直接返回的count变量,它不像concurrentlinkedqueue,concurrentlinkedqueue的size则是每次会遍历这个队列,故arrayblockingqueue的size方法比concurrentlinkedqueue的size方法效率高。而且concurrentlinkedqueue的size方法并没有加锁!也就是说很有可能其size并不准确,这在它的注释中说明了concurrentlinkedqueue的size并没有多大的用处。
以上就是并发包阻塞队列之arrayblockingqueue的详细内容。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product