下面来提供几个更细的粒度锁:
1. 分段锁
借鉴concurrenthashmap的分段思想,先生成一定数量的锁,具体使用的时候再根据key来返回对应的lock。这是几个实现里最简单,性能最高,也是最终被采用的锁策略,代码如下:
/** * 分段锁,系统提供一定数量的原始锁,根据传入对象的哈希值获取对应的锁并加锁 * 注意:要锁的对象的哈希值如果发生改变,有可能导致锁无法成功释放!!! */ public class segmentlock<t> { private integer segments = 16;//默认分段数量 private final hashmap<integer, reentrantlock> lockmap = new hashmap<>(); public segmentlock() { init(null, false); } public segmentlock(integer counts, boolean fair) { init(counts, fair); } private void init(integer counts, boolean fair) { if (counts != null) { segments = counts; } for (int i = 0; i < segments; i++) { lockmap.put(i, new reentrantlock(fair)); } } public void lock(t key) { reentrantlock lock = lockmap.get(key.hashcode() % segments); lock.lock(); } public void unlock(t key) { reentrantlock lock = lockmap.get(key.hashcode() % segments); lock.unlock(); } }
2. 哈希锁
上述分段锁的基础上发展起来的第二种锁策略,目的是实现真正意义上的细粒度锁。每个哈希值不同的对象都能获得自己独立的锁。在测试中,在被锁住的代码执行速度飞快的情况下,效率比分段锁慢 30% 左右。如果有长耗时操作,感觉表现应该会更好。代码如下:
public class hashlock<t> { private boolean isfair = false; private final segmentlock<t> segmentlock = new segmentlock<>();//分段锁 private final concurrenthashmap<t, lockinfo> lockmap = new concurrenthashmap<>(); public hashlock() { } public hashlock(boolean fair) { isfair = fair; } public void lock(t key) { lockinfo lockinfo; segmentlock.lock(key); try { lockinfo = lockmap.get(key); if (lockinfo == null) { lockinfo = new lockinfo(isfair); lockmap.put(key, lockinfo); } else { lockinfo.count.incrementandget(); } } finally { segmentlock.unlock(key); } lockinfo.lock.lock(); } public void unlock(t key) { lockinfo lockinfo = lockmap.get(key); if (lockinfo.count.get() == 1) { segmentlock.lock(key); try { if (lockinfo.count.get() == 1) { lockmap.remove(key); } } finally { segmentlock.unlock(key); } } lockinfo.count.decrementandget(); lockinfo.unlock(); } private static class lockinfo { public reentrantlock lock; public atomicinteger count = new atomicinteger(1); private lockinfo(boolean fair) { this.lock = new reentrantlock(fair); } public void lock() { this.lock.lock(); } public void unlock() { this.lock.unlock(); } } }
3. 弱引用锁
哈希锁因为引入的分段锁来保证锁创建和销毁的同步,总感觉有点瑕疵,所以写了第三个锁来寻求更好的性能和更细粒度的锁。这个锁的思想是借助java的弱引用来创建锁,把锁的销毁交给jvm的垃圾回收,来避免额外的消耗。
有点遗憾的是因为使用了concurrenthashmap作为锁的容器,所以没能真正意义上的摆脱分段锁。这个锁的性能比 hashlock 快10% 左右。锁代码:
/** * 弱引用锁,为每个独立的哈希值提供独立的锁功能 */ public class weakhashlock<t> { private concurrenthashmap<t, weaklockref<t, reentrantlock>> lockmap = new concurrenthashmap<>(); private referencequeue<reentrantlock> queue = new referencequeue<>(); public reentrantlock get(t key) { if (lockmap.size() > 1000) { clearemptyref(); } weakreference<reentrantlock> lockref = lockmap.get(key); reentrantlock lock = (lockref == null ? null : lockref.get()); while (lock == null) { lockmap.putifabsent(key, new weaklockref<>(new reentrantlock(), queue, key)); lockref = lockmap.get(key); lock = (lockref == null ? null : lockref.get()); if (lock != null) { return lock; } clearemptyref(); } return lock; } @suppresswarnings("unchecked") private void clearemptyref() { reference<? extends reentrantlock> ref; while ((ref = queue.poll()) != null) { weaklockref<t, ? extends reentrantlock> weaklockref = (weaklockref<t, ? extends reentrantlock>) ref; lockmap.remove(weaklockref.key); } } private static final class weaklockref<t, k> extends weakreference<k> { final t key; private weaklockref(k referent, referencequeue<? super k> q, t key) { super(referent, q); this.key = key; } } }
4.基于key(主键)的互斥锁
keylock是对所需处理的数据的key(主键)进行加锁,只要是对不同key操作,其就可以并行处理,大大提高了线程的并行度
keylock有如下几个特性:
1、细粒度,高并行性
2、可重入
3、公平锁
4、加锁开销比reentrantlock大,适用于处理耗时长、key范围大的场景
public class keylock<k> { // 保存所有锁定的key及其信号量 private final concurrentmap<k, semaphore> map = new concurrenthashmap<k, semaphore>(); // 保存每个线程锁定的key及其锁定计数 private final threadlocal<map<k, lockinfo>> local = new threadlocal<map<k, lockinfo>>() { @override protected map<k, lockinfo> initialvalue() { return new hashmap<k, lockinfo>(); } }; /** * 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(k)} * 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashcode()}和 * {@link #equals(object)}方法 * * @param key */ public void lock(k key) { if (key == null) return; lockinfo info = local.get().get(key); if (info == null) { semaphore current = new semaphore(1); current.acquireuninterruptibly(); semaphore previous = map.put(key, current); if (previous != null) previous.acquireuninterruptibly(); local.get().put(key, new lockinfo(current)); } else { info.lockcount++; } } /** * 释放key,唤醒其他等待此key的线程 * @param key */ public void unlock(k key) { if (key == null) return; lockinfo info = local.get().get(key); if (info != null && --info.lockcount == 0) { info.current.release(); map.remove(key, info.current); local.get().remove(key); } } /** * 锁定多个key * 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生 * @param keys */ public void lock(k[] keys) { if (keys == null) return; for (k key : keys) { lock(key); } } /** * 释放多个key * @param keys */ public void unlock(k[] keys) { if (keys == null) return; for (k key : keys) { unlock(key); } } private static class lockinfo { private final semaphore current; private int lockcount; private lockinfo(semaphore current) { this.current = current; this.lockcount = 1; } } }
keylock使用示例:
private int[] accounts; private keylock<integer> lock = new keylock<integer>(); public boolean transfer(int from, int to, int money) { integer[] keys = new integer[] {from, to}; arrays.sort(keys); //对多个key进行排序,保证锁定顺序防止死锁 lock.lock(keys); try { //处理不同的from和to的线程都可进入此同步块 if (accounts[from] < money) return false; accounts[from] -= money; accounts[to] += money; return true; } finally { lock.unlock(keys); } }
测试代码如下:
//场景:多线程并发转账 public class test { private final int[] account; // 账户数组,其索引为账户id,内容为金额 public test(int count, int money) { account = new int[count]; arrays.fill(account, money); } boolean transfer(int from, int to, int money) { if (account[from] < money) return false; account[from] -= money; try { thread.sleep(2); } catch (exception e) { } account[to] += money; return true; } int getamount() { int result = 0; for (int m : account) result += m; return result; } public static void main(string[] args) throws exception { int count = 100; //账户个数 int money = 10000; //账户初始金额 int threadnum = 8; //转账线程数 int number = 10000; //转账次数 int maxmoney = 1000; //随机转账最大金额 test test = new test(count, money); //不加锁 // runner runner = test.new nonlockrunner(maxmoney, number); //加synchronized锁 // runner runner = test.new synchronizedrunner(maxmoney, number); //加reentrantlock锁 // runner runner = test.new reentrantlockrunner(maxmoney, number); //加keylock锁 runner runner = test.new keylockrunner(maxmoney, number); thread[] threads = new thread[threadnum]; for (int i = 0; i < threadnum; i++) threads[i] = new thread(runner, "thread-" + i); long begin = system.currenttimemillis(); for (thread t : threads) t.start(); for (thread t : threads) t.join(); long time = system.currenttimemillis() - begin; system.out.println("类型:" + runner.getclass().getsimplename()); system.out.printf("耗时:%dms\n", time); system.out.printf("初始总金额:%d\n", count * money); system.out.printf("终止总金额:%d\n", test.getamount()); } // 转账任务 abstract class runner implements runnable { final int maxmoney; final int number; private final random random = new random(); private final atomicinteger count = new atomicinteger(); runner(int maxmoney, int number) { this.maxmoney = maxmoney; this.number = number; } @override public void run() { while(count.getandincrement() < number) { int from = random.nextint(account.length); int to; while ((to = random.nextint(account.length)) == from) ; int money = random.nextint(maxmoney); dotransfer(from, to, money); } } abstract void dotransfer(int from, int to, int money); } // 不加锁的转账 class nonlockrunner extends runner { nonlockrunner(int maxmoney, int number) { super(maxmoney, number); } @override void dotransfer(int from, int to, int money) { transfer(from, to, money); } } // synchronized的转账 class synchronizedrunner extends runner { synchronizedrunner(int maxmoney, int number) { super(maxmoney, number); } @override synchronized void dotransfer(int from, int to, int money) { transfer(from, to, money); } } // reentrantlock的转账 class reentrantlockrunner extends runner { private final reentrantlock lock = new reentrantlock(); reentrantlockrunner(int maxmoney, int number) { super(maxmoney, number); } @override void dotransfer(int from, int to, int money) { lock.lock(); try { transfer(from, to, money); } finally { lock.unlock(); } } } // keylock的转账 class keylockrunner extends runner { private final keylock<integer> lock = new keylock<integer>(); keylockrunner(int maxmoney, int number) { super(maxmoney, number); } @override void dotransfer(int from, int to, int money) { integer[] keys = new integer[] {from, to}; arrays.sort(keys); lock.lock(keys); try { transfer(from, to, money); } finally { lock.unlock(keys); } } } }
测试结果:
(8线程对100个账户随机转账总共10000次):
类型:nonlockrunner(不加锁)
耗时:2482ms
初始总金额:1000000
终止总金额:998906(无法保证原子性)
类型:synchronizedrunner(加synchronized锁)
耗时:20872ms
初始总金额:1000000
终止总金额:1000000
类型:reentrantlockrunner(加reentrantlock锁)
耗时:21588ms
初始总金额:1000000
终止总金额:1000000
类型:keylockrunner(加keylock锁)
耗时:2831ms
初始总金额:1000000
终止总金额:1000000