我们用一段代码证明下hashmap的线程不安全,以及concurrenthashmap的线程安全性。代码逻辑很简单,开启10000个线程,每个线程做很简单的操作,就是put一个key,然后删除一个key,理论上线程安全的情况下,最后map的size()肯定为0。
推荐教程:《java学习》
map<object,object> mymap=new hashmap<>(); // concurrenthashmap mymap=new concurrenthashmap(); for (int i = 0; i <10000 ; i++) { new thread(new runnable() { @override public void run() { double a=math.random(); mymap.put(a,a); mymap.remove(a); } }).start(); } thread.sleep(15000l);//多休眠下,保证上面的线程操作完毕。 system.out.println(mymap.size());
这里显示map的size=13,但是实际上map里还有一个key。 同样的代码我们用concurrenthashmap来运行下,结果map ==0
这里也就证明了concurrenthashmap是线程安全的,我们接下来从源码分析下concurrenthashmap是如何保证线程安全的,本次源码jdk版本为1.8。
二、concurrenthashmap源码分析
3.1 concurrenthashmap的基础属性
//默认最大的容量 private static final int maximum_capacity = 1 << 30;//默认初始化的容量private static final int default_capacity = 16;//最大的数组可能长度static final int max_array_size = integer.max_value - 8;//默认的并发级别,目前并没有用,只是为了保持兼容性private static final int default_concurrency_level = 16;//和hashmap一样,负载因子private static final float load_factor = 0.75f;//和hashmap一样,链表转换为红黑树的阈值,默认是8static final int treeify_threshold = 8;//红黑树转换链表的阀值,默认是6static final int untreeify_threshold = 6;//进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容static final int min_treeify_capacity = 64;//table扩容时, 每个线程最少迁移table的槽位个数private static final int min_transfer_stride = 16;//感觉是用来计算偏移量和线程数量的标记private static int resize_stamp_bits = 16;//能够调整的最大线程数量private static final int max_resizers = (1 << (32 - resize_stamp_bits)) - 1;//记录偏移量private static final int resize_stamp_shift = 32 - resize_stamp_bits;//值为-1, 当node.hash为moved时, 代表着table正在扩容static final int moved = -1;//treebin, 置为-2, 代表此元素后接红黑树static final int treebin = -2;//感觉是占位符,目前没看出来明显的作用static final int reserved = -3;//主要用来计算hash值的static final int hash_bits = 0x7fffffff; //节点数组transient volatile node<k,v>[] table;//table迁移过程临时变量, 在迁移过程中将元素全部迁移到nexttable上private transient volatile node<k,v>[] nexttable;//基础计数器private transient volatile long basecount;//table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化//-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小private transient volatile int sizectl;//table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferindex代表当前已经迁移的元素下标private transient volatile int transferindex;//扩容时候,cas锁标记private transient volatile int cellsbusy;//计数器表,大小是2次幂private transient volatile countercell[] countercells;
上面就是concurrenthashmap的基本属性,我们大部分和hashmap一样,只是增加了部分属性,后面我们来分析增加的部分属性是起到如何的作用的。
2.2 concurrenthashmap的常用方法属性
put方法
final v putval(k key, v value, boolean onlyifabsent) { //key和value不允许为null if (key == null || value == null) throw new nullpointerexception(); //计算hash值 int hash = spread(key.hashcode()); int bincount = 0; for (node<k,v>[] tab = table;;) { node<k,v> f; int n, i, fh; //如果table没有初始化,进行初始化 if (tab == null || (n = tab.length) == 0) tab = inittable(); //计算数组的位置 else if ((f = tabat(tab, i = (n - 1) & hash)) == null) { //如果该位置为空,构造新节点添加即可 if (castabat(tab, i, null, new node<k,v>(hash, key, value, null))) break; // no lock when adding to empty bin }//如果正在扩容 else if ((fh = f.hash) == moved) //帮着一起扩容 tab = helptransfer(tab, f); else { //开始真正的插入 v oldval = null; synchronized (f) { if (tabat(tab, i) == f) { //如果已经初始化完成了 if (fh >= 0) { bincount = 1; for (node<k,v> e = f;; ++bincount) { k ek; //这里key相同直接覆盖原来的节点 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldval = e.val; if (!onlyifabsent) e.val = value; break; } node<k,v> pred = e; //否则添加到节点的最后面 if ((e = e.next) == null) { pred.next = new node<k,v>(hash, key, value, null); break; } } }//如果节点是树节点,就进行树节点添加操作 else if (f instanceof treebin) { node<k,v> p; bincount = 2; if ((p = ((treebin<k,v>)f).puttreeval(hash, key,alue)) != null) { oldval = p.val; if (!onlyifabsent) p.val = value; } } } }//判断节点是否要转换成红黑树 if (bincount != 0) { if (bincount >= treeify_threshold) treeifybin(tab, i);//红黑树转换 if (oldval != null) return oldval; break; } } } //计数器,采用cas计算size大小,并且检查是否需要扩容 addcount(1l, bincount); return null; }
我们发现concurrenthashmap的put方法和hashmap的逻辑相差不大,主要是新增了线程安全部分,在添加元素时候,采用synchronized来保证线程安全,然后计算size的时候采用cas操作进行计算。整个put流程比较简单,总结下就是:
1.判断key和vaule是否为空,如果为空,直接抛出异常。
2.判断table数组是否已经初始化完毕,如果没有初始化,进行初始化。
3.计算key的hash值,如果该位置为空,直接构造节点放入。
4.如果table正在扩容,进入帮助扩容方法。
5.最后开启同步锁,进行插入操作,如果开启了覆盖选项,直接覆盖,否则,构造节点添加到尾部,如果节点数超过红黑树阈值,进行红黑树转换。如果当前节点是树节点,进行树插入操作。
6.最后统计size大小,并计算是否需要扩容。
get方法
public v get(object key) { node<k,v>[] tab; node<k,v> e, p; int n, eh; k ek; //计算hash值 int h = spread(key.hashcode()); //如果table已经初始化,并且计算hash值的索引位置node不为空 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabat(tab, (n - 1) & h)) != null) { //如果hash相等,key相等,直接返回该节点的value if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; }//如果hash值为负值表示正在扩容,这个时候查的是forwardingnode的find方法来定位到节点。 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //循环遍历链表,查询key和hash值相等的节点。 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
get方法比较简单,主要流程如下:
1.直接计算hash值,查找的节点如果key和hash值相等,直接返回该节点的value就行。
2.如果table正在扩容,就调用forwardingnode的find方法查找节点。
3.如果没有扩容,直接循环遍历链表,查找到key和hash值一样的节点值即可。
concurrenthashmap的扩容
concurrenthashmap的扩容相对于hashmap的扩容相对复杂,因为涉及到了多线程操作,这里扩容方法主要是transfer,我们来分析下这个方法的源码,研究下是如何扩容的。
private final void transfer(node<k,v>[] tab, node<k,v>[] nexttab) { int n = tab.length, stride; //保证每个线程扩容最少是16, if ((stride = (ncpu > 1) ? (n >>> 3) / ncpu : n) < min_transfer_stride) stride = min_transfer_stride; // subdivide range if (nexttab == null) { // initiating try { //扩容2倍 @suppresswarnings("unchecked") node<k,v>[] nt = (node<k,v>[])new node<?,?>[n << 1]; nexttab = nt; } catch (throwable ex) { // try to cope with oome //出现异常情况就不扩容了。 sizectl = integer.max_value; return; } //用新数组对象接收 nexttable = nexttab; //初始化扩容下表为原数组的长度 transferindex = n; } int nextn = nexttab.length; //扩容期间的过渡节点 forwardingnode<k,v> fwd = new forwardingnode<k,v>(nexttab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nexttab for (int i = 0, bound = 0;;) { node<k,v> f; int fh; while (advance) { int nextindex, nextbound; //如果该线程已经完成了 if (--i >= bound || finishing) advance = false; //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了 else if ((nextindex = transferindex) <= 0) { i = -1; advance = false; }cas操作设置区间 else if (u.compareandswapint (this, transferindex, nextindex, nextbound = (nextindex > stride ? nextindex - stride : 0))) { bound = nextbound; i = nextindex - 1; advance = false; } } //如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) {//如果扩容完成了,进行收尾工作 nexttable = null;//清空临时数组 table = nexttab;//赋值原数组 sizectl = (n << 1) - (n >>> 1);//重新赋值sizectl return; }//如果扩容还在进行,自己任务完成就进行sizectl-1,这里是因为,扩容是通过helptransfer()和addcount()方法来调用的,在调用transfer()真正扩容之前,sizectl都会+1,所以这里每个线程完成后就进行-1。 if (u.compareandswapint(this, sizectl, sc = sizectl, sc - 1)) { //这里应该是判断扩容是否结束 if ((sc - 2) != resizestamp(n) << resize_stamp_shift) return; //结束,赋值状态 finishing = advance = true; i = n; // recheck before commit } }//如果在table中没找到,就用过渡节点 else if ((f = tabat(tab, i)) == null) //成功设置就进入下一个节点 advance = castabat(tab, i, null, fwd); else if ((fh = f.hash) == moved) //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可 advance = true; // already processed else { //这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁 synchronized (f) { if (tabat(tab, i) == f) { node<k,v> ln, hn; //如果hash值大于0 if (fh >= 0) { //为运算结果 int runbit = fh & n; node<k,v> lastrun = f; for (node<k,v> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runbit) { runbit = b; lastrun = p; } } if (runbit == 0) { ln = lastrun; hn = null; } else { hn = lastrun; ln = null; } for (node<k,v> p = f; p != lastrun; p = p.next) { int ph = p.hash; k pk = p.key; v pv = p.val; //这里的逻辑和hashmap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析hashmap的文章 if ((ph & n) == 0) ln = new node<k,v>(ph, pk, pv, ln); else hn = new node<k,v>(ph, pk, pv, hn); } settabat(nexttab, i, ln); settabat(nexttab, i + n, hn); settabat(tab, i, fwd); advance = true; }//如果是树节点,执行树节点的扩容数据转移 else if (f instanceof treebin) { treebin<k,v> t = (treebin<k,v>)f; treenode<k,v> lo = null, lotail = null; treenode<k,v> hi = null, hitail = null; int lc = 0, hc = 0; for (node<k,v> e = t.first; e != null; e = e.next) { int h = e.hash; treenode<k,v> p = new treenode<k,v> (h, e.key, e.val, null, null); //也是通过位运算判断两个链表的位置 if ((h & n) == 0) { if ((p.prev = lotail) == null) lo = p; else lotail.next = p; lotail = p; ++lc; } else { if ((p.prev = hitail) == null) hi = p; else hitail.next = p; hitail = p; ++hc; } } //这里判断是否进行树转换 ln = (lc <= untreeify_threshold) ? untreeify(lo) : (hc != 0) ? new treebin<k,v>(lo) : t; hn = (hc <= untreeify_threshold) ? untreeify(hi) : (lc != 0) ? new treebin<k,v>(hi) : t; //这里把新处理的链表赋值到新数组中 settabat(nexttab, i, ln); settabat(nexttab, i + n, hn); settabat(tab, i, fwd); advance = true; } } } } } }
concurrenthashmap的扩容还是比较复杂,复杂主要表现在,控制多线程扩容层面上,扩容的源码我没有解析的很细,一方面是确实比较复杂,本人有某些地方也不是太明白,另一方面是我觉得我们研究主要是弄懂其思想,能搞明白关键代码和关键思路即可,只要不是重新实现一套类似的功能,我想就不用纠结其全部细节了。总结下concurrenthashmap的扩容步骤如下:
1.获取线程扩容处理步长,最少是16,也就是单个线程处理扩容的节点个数。
2.新建一个原来容量2倍的数组,构造过渡节点,用于扩容期间的查询操作。
3.进行死循环进行转移节点,主要根据finishing变量判断是否扩容结束,在扩容期间通过给不同的线程设置不同的下表索引进行扩容操作,就是不同的线程,操作的数组分段不一样,同时利用synchronized同步锁锁住操作的节点,保证了线程安全。
4.真正进行节点在新数组的位置是和hashmap扩容逻辑一样的,通过位运算计算出新链表是否位于原位置或者位于原位置+扩容的长度位置,具体分析可以查看我的这篇文章。
三、总结
1.concurrenthashmap大部分的逻辑代码和hashmap是一样的,主要通过synchronized和来保证节点插入扩容的线程安全,这里肯定有同学会问,为啥使用synchronized呢?而不用采取乐观锁,或者lock呢?我个人觉得可能原因有2点:
a.乐观锁比较适用于竞争冲突比较少的场景,如果冲突比较多,那么就会导致不停的重试,这样反而性能更低。
b.synchronized在经历了优化之后,其实性能已经和lock没啥差异了,某些场景可能还比lock快。所以,我觉得这是采用synchronized来同步的原因。
2.concurrenthashmap的扩容核心逻辑主要是给不同的线程分配不同的数组下标,然后每个线程处理各自下表区间的节点。同时处理节点复用了hashmap的逻辑,通过位运行,可以知道节点扩容后的位置,要么在原位置,要么在原位置+oldlength位置,最后直接赋值即可。
以上就是concurrenthashmap为什么是线程安全的详细内容。
