exchange服务器之Java多线程 -- JUC包源码分析16 -- Exchanger源码分析
白羽 2018-12-11 来源 :网络 阅读 1118 评论 0

摘要:本文将带你了解exchange服务器之Java多线程 -- JUC包源码分析16 -- Exchanger源码分析,希望本文对大家学Exchange有所帮助。

    本文将带你了解exchange服务器之Java多线程 -- JUC包源码分析16 -- Exchanger源码分析,希望本文对大家学Exchange有所帮助。



今天所讲的Exchange,顾明思义,是双向的数据传输,2个线程在一个同步点,交换数据。

其使用方式,大致如下:



Exchange exchange = new Exchange(); //建1个多个线程共用的exchange对象

//把exchange对象,传给4个线程对象。每个线程在自己的run函数里面,调用exchange,把自己的数据当参数传进去,返回值是另外一个线程调用exchange塞进去的参数。
ThreadA a = new ThreadA(exchange);
run()
{
   String other = exchange.exchange(self.data)  //没有别的线程调用exchange的话,自己会阻塞在这。直到有别的线程调用exchange。
}


ThreadB b = new ThreadB(exchange);
run()
{
   String other = exchange.exchange(self.data)
}


ThreadB c = new ThreadC(exchange);
run()
{
   String other = exchange.exchange(self.data)
}

ThreadC d = new ThreadD(exchange);
run()
{
   String other = exchange.exchange(self.data)
}

在上面的例子中,4个线程并发的调用exchange,会两两交互数据。可能是A/B,  C/D,也可能A/C, B/D,也可能是A/D,  B/C。



Exchange实现

从简单的角度来考虑,Exchange只需要一个互斥变量就够了。因为可以限制,任何时候,只能有1对发生交换,不是多对,同时发生交换。

但为了提高并发度,Exchange内部用了多个变量,在其内部,称之为Slot。



public class Exchanger {

    private static final class Slot extends AtomicReference {
        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
    }

   private volatile Slot[] arena = new Slot[CAPACITY];

   。。。
 }12345678910



关键技术点1:CacheLine填充

在上面的代码中,Slot其实就是一个AtomicReference,其里面的q0, q1,..qd那些变量,都是多余的,不用的。那为什么要添加这些多余的变量呢?

是为了让不同的Slot不要落在cpu的同一个CacheLine里面。因为cpu从内存读取数据的时候,不是一个字节一个字节的读,而是按块读取,这里的块也就是“CacheLine”,一般一个CacheLine大小是64Byte。

保证一个Slot的大小 >= 64Byte,这样更改一个Slot,就不会导致另外一个Slot的cpu cache失效,从而提高性能。

知道了Slot其实就是一个AtomicReference,下面讨论第2个技术点



关键技术点2:锁分离

同ConcurrentHashMap类型,Exchange没有只定义一个slot,而是定义了一个slot的数组。这样在多线程调用exchange的时候,可以各自在不同的slot里面进行匹配。

exchange的基本思路如下: 
(1)根据每个线程的thread id,  hash计算出自己所在的slot index; 
(2)如果运气好,这个slot被人占着(slot里面有node),并且有人正在等待交换,那就和它进行交换; 
(3)slot为空的(slot里面没有node),自己占着,等人交换。没人交换,向前挪个位置,把当前slot里面内容取消,index减半,再看有没有交换; 
(4)挪到0这个位置,还没有人交互,那就阻塞,一直等着。别的线程,也会一直挪动,直到0这个位置。

所以0这个位置,是一个交易的“终结点”位置!别的位置上找不到人交易,最后都会到0这个位置。

下面是exchange的源代码:



    public V exchange(V x) throws InterruptedException {
        if (!Thread.interrupted()) {
            Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
            if (v == NULL_ITEM)
                return null;
            if (v != CANCEL)
                return (V)v;
            Thread.interrupted(); // Clear interrupt status on IE throw
        }
        throw new InterruptedException();
    }

    private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);                 
        int index = hashIndex();                  //根据thread id计算出自己要去的那个交易位置(slot)
        int fails = 0;                            

        for (;;) {
            Object y;                             
            Slot slot = arena[index];
            if (slot == null)                    
                createSlot(index);     //slot = null,创建一个slot,然后会回到for循环,再次开始
            else if ((y = slot.get()) != null &&  //slot里面有人等着(有Node),则尝试和其交换
                     slot.compareAndSet(y, null)) { //关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点
                Node you = (Node)y;               
                if (you.compareAndSet(null, item)) {//把Node里面的东西,换成自己的
                    LockSupport.unpark(you.waiter); //唤醒对方
                    return you.item; //自己把对方的东西拿走
                } //关键点2:如果你运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始                          
            }
            else if (y == null &&                 //slot里面为空(没有Node),则自己把位置占住
                     slot.compareAndSet(null, me)) {
                if (index == 0)                   //如果是0这个位置,自己阻塞,等待别人来交换
                    return timed? awaitNanos(me, slot, nanos): await(me, slot);
                Object v = spinWait(me, slot);    //不是0这个位置,自旋等待
                if (v != CANCEL)  //自旋等待的时候,运气好,有人来交换了,返回
                    return v;
                me = new Node(item);     //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环    
                int m = max.get();
                if (m > (index >>>= 1))           
                    max.compareAndSet(m, m - 1); 
            }
            else if (++fails > 1) { //失败 case1: slot有人,要交互,但被人家抢了  case2: slot没人,自己要占位置,又被人家抢了    
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                    index = m + 1;   //3次匹配失败,把index扩大,再次开始for循环             
                else if (--index < 0)
                    index = m;                   
            }
        }
    }

注意:上面唯一有点绕的地方,是Slot里面套了个Node。双方的交易地点,准确说,不是在Slot里面,是在Slot的Node里面。 
当Slot的Node = null时候,代表Slot为空,新建一个Node占住; 
当Slot的Node != null的时候,把Node拿出来,双方在Node里面交换,此时Slot已经释放出去了!



总结

Exchanger的代码虽然少,但还是蛮绕的,总结下来,有以下几个关键点: 
(1) Cache Line填充技术 
(2) 锁分离 
(3) 俩人还没交换结束的时候,就把slot给释放出去了。双方拿着那个Node,在Node里面交互的。也就是上面的for循环里面的第2个分支。 
这里面有一种情况就是:运气不好,2人正要在Node里面交换的时候,被另外一个线程抢了。那这个没抢到的,只能for循环,重新开始 
(4)0是交换的终结点位置:没有人交换的时候,会不断挪位置,挪的时候,要把当前位置的slot清空掉,一个人不能同时占2个Slot。直到挪到0这个位置,一直等待。   


本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标系统运维之Exchange频道!

本文由 @白羽 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程