exchange服务器之Java多线程 -- JUC包源码分析16 -- Exchanger源码分析
白羽
2018-12-11
来源 :网络
阅读 1118
评论 0
摘要:本文将带你了解exchange服务器之Java多线程 -- JUC包源码分析16 -- Exchanger源码分析,希望本文对大家学Exchange有所帮助。
本文将带你了解exchange服务器之Java多线程 -- JUC包源码分析16 -- Exchanger源码分析,希望本文对大家学Exchange有所帮助。
今天所讲的Exchange,顾明思义,是双向的数据传输,2个线程在一个同步点,交换数据。 其使用方式,大致如下: Exchange exchange = new Exchange(); //建1个多个线程共用的exchange对象 //把exchange对象,传给4个线程对象。每个线程在自己的run函数里面,调用exchange,把自己的数据当参数传进去,返回值是另外一个线程调用exchange塞进去的参数。 ThreadA a = new ThreadA(exchange); run() { String other = exchange.exchange(self.data) //没有别的线程调用exchange的话,自己会阻塞在这。直到有别的线程调用exchange。 } ThreadB b = new ThreadB(exchange); run() { String other = exchange.exchange(self.data) } ThreadB c = new ThreadC(exchange); run() { String other = exchange.exchange(self.data) } ThreadC d = new ThreadD(exchange); run() { String other = exchange.exchange(self.data) } 在上面的例子中,4个线程并发的调用exchange,会两两交互数据。可能是A/B, C/D,也可能A/C, B/D,也可能是A/D, B/C。 Exchange实现 从简单的角度来考虑,Exchange只需要一个互斥变量就够了。因为可以限制,任何时候,只能有1对发生交换,不是多对,同时发生交换。 但为了提高并发度,Exchange内部用了多个变量,在其内部,称之为Slot。 public class Exchanger { private static final class Slot extends AtomicReference { long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; } private volatile Slot[] arena = new Slot[CAPACITY]; 。。。 }12345678910 关键技术点1:CacheLine填充 在上面的代码中,Slot其实就是一个AtomicReference,其里面的q0, q1,..qd那些变量,都是多余的,不用的。那为什么要添加这些多余的变量呢? 是为了让不同的Slot不要落在cpu的同一个CacheLine里面。因为cpu从内存读取数据的时候,不是一个字节一个字节的读,而是按块读取,这里的块也就是“CacheLine”,一般一个CacheLine大小是64Byte。 保证一个Slot的大小 >= 64Byte,这样更改一个Slot,就不会导致另外一个Slot的cpu cache失效,从而提高性能。 知道了Slot其实就是一个AtomicReference,下面讨论第2个技术点 关键技术点2:锁分离 同ConcurrentHashMap类型,Exchange没有只定义一个slot,而是定义了一个slot的数组。这样在多线程调用exchange的时候,可以各自在不同的slot里面进行匹配。 exchange的基本思路如下: (1)根据每个线程的thread id, hash计算出自己所在的slot index; (2)如果运气好,这个slot被人占着(slot里面有node),并且有人正在等待交换,那就和它进行交换; (3)slot为空的(slot里面没有node),自己占着,等人交换。没人交换,向前挪个位置,把当前slot里面内容取消,index减半,再看有没有交换; (4)挪到0这个位置,还没有人交互,那就阻塞,一直等着。别的线程,也会一直挪动,直到0这个位置。 所以0这个位置,是一个交易的“终结点”位置!别的位置上找不到人交易,最后都会到0这个位置。 下面是exchange的源代码: public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) { Object v = doExchange(x == null? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; Thread.interrupted(); // Clear interrupt status on IE throw } throw new InterruptedException(); } private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); int index = hashIndex(); //根据thread id计算出自己要去的那个交易位置(slot) int fails = 0; for (;;) { Object y; Slot slot = arena[index]; if (slot == null) createSlot(index); //slot = null,创建一个slot,然后会回到for循环,再次开始 else if ((y = slot.get()) != null && //slot里面有人等着(有Node),则尝试和其交换 slot.compareAndSet(y, null)) { //关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点 Node you = (Node)y; if (you.compareAndSet(null, item)) {//把Node里面的东西,换成自己的 LockSupport.unpark(you.waiter); //唤醒对方 return you.item; //自己把对方的东西拿走 } //关键点2:如果你运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始 } else if (y == null && //slot里面为空(没有Node),则自己把位置占住 slot.compareAndSet(null, me)) { if (index == 0) //如果是0这个位置,自己阻塞,等待别人来交换 return timed? awaitNanos(me, slot, nanos): await(me, slot); Object v = spinWait(me, slot); //不是0这个位置,自旋等待 if (v != CANCEL) //自旋等待的时候,运气好,有人来交换了,返回 return v; me = new Node(item); //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环 int m = max.get(); if (m > (index >>>= 1)) max.compareAndSet(m, m - 1); } else if (++fails > 1) { //失败 case1: slot有人,要交互,但被人家抢了 case2: slot没人,自己要占位置,又被人家抢了 int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; //3次匹配失败,把index扩大,再次开始for循环 else if (--index < 0) index = m; } } } 注意:上面唯一有点绕的地方,是Slot里面套了个Node。双方的交易地点,准确说,不是在Slot里面,是在Slot的Node里面。 当Slot的Node = null时候,代表Slot为空,新建一个Node占住; 当Slot的Node != null的时候,把Node拿出来,双方在Node里面交换,此时Slot已经释放出去了! 总结 Exchanger的代码虽然少,但还是蛮绕的,总结下来,有以下几个关键点: (1) Cache Line填充技术 (2) 锁分离 (3) 俩人还没交换结束的时候,就把slot给释放出去了。双方拿着那个Node,在Node里面交互的。也就是上面的for循环里面的第2个分支。 这里面有一种情况就是:运气不好,2人正要在Node里面交换的时候,被另外一个线程抢了。那这个没抢到的,只能for循环,重新开始 (4)0是交换的终结点位置:没有人交换的时候,会不断挪位置,挪的时候,要把当前位置的slot清空掉,一个人不能同时占2个Slot。直到挪到0这个位置,一直等待。
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标系统运维之Exchange频道!
本文由 @白羽 发布于职坐标。未经许可,禁止转载。
看完这篇文章有何感觉?已经有0 人表态,0% 的人喜欢
快给朋友分享吧~
评论(0)