DoubleAccumulator 源码详解

news2025/8/7 19:20:49

DoubleAccumulator

简介

这个类是新增的并发统计工具,可以多线程安全计数。

他的构造方法有两个参数,分别是统计方法和初始值。所以具体的统计是加减乘除是由传入的操作方法决定的。

    public DoubleAccumulator(DoubleBinaryOperator accumulatorFunction,
                             double identity) {
        this.function = accumulatorFunction;
      	//identity参数是用来存储初始值的 例如identity=1 那么新创建完对象 get() 方法获得的值就是1
        base = this.identity = Double.doubleToRawLongBits(identity);
    }

DoubleAccumulator 继承了Striped64,Striped64的核心存储结构是Cell数组

 @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
              //获取value的地址
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

方法详解

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tWJaBGrX-1678182390812)(/Users/cool/Documents/门票md/学习/java/concurrent/atomic/DoubleAccumulator & LongAccumulator.assets/image-20230306193305640.png)]

accumulate()

统计计算,核心计算方法。

    public void accumulate(double x) {
        Cell[] as; long b, v, r; int m; Cell a;
      
        if (
          //首先判断cells是不是null 如果不是null就可以继续进行
          (as = cells) != null ||
          //如果是cells是null 
          //判断fun.apply(x)之后 是不是数据没有变化 例如 1+0 还是等于1 因为数值属于无效操作,所以不进行初始化
          //否则进行cas替换数据 如果替换成功 则进行初始化操作, 
          //   所以这个位置存在一个问题如果cas替换失败 本次操作就失败了 也不会有任何提示
            (r = Double.doubleToRawLongBits
             (function.applyAsDouble
              (Double.longBitsToDouble(b = base), x))) != b  && !casBase(b, r)) {
            //标记数据是否是无效操作
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                //看到这个方法大概就知道是怎么实现线程安全的了
                //getProbe()获取当前线程hash值 然后&上数组长度len-1 获取slot的位置
                //也就是每个线程单独操作一个cell 这样就不会多线程冲突了,java中还有很多这样的例子 例如ConcurrentHashMap
                (a = as[getProbe() & m]) == null ||
                //下面这个判断和上面的判断逻辑一样 就是看操作x是不是无效操作 
                //重新对uncontended 进行赋值
                !(uncontended =
                  (r = Double.doubleToRawLongBits
                   (function.applyAsDouble
                    (Double.longBitsToDouble(v = a.value), x))) == v ||
    							//如果不是无效操作 就替换v(原始值)和r(算后值)的值 如果替换失败了 就会执行doubleAccumulate方法
                  a.cas(v, r)))
                doubleAccumulate(x, function, uncontended);
        }
    }
    final void doubleAccumulate(double x, DoubleBinaryOperator fn,
                                boolean wasUncontended) {
        int h;
      	//getProbe()获取为0说明还没有初始化线程变量
        if ((h = getProbe()) == 0) {
            //初始化线程随机数变量
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
          	//线程都未初始化 所以强制true  
            //正常这个字段 是 调用方法cas失败了 才会进入 这个值会固定式false 是true的时候走不到这个方法
            wasUncontended = true;
        }
        //官方备注 如果最后一个slot不为空 则改为true
        boolean collide = false;                // True if last slot nonempty
        //这里就开始标准的cas操作 无限for循环
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //cells不为空的情况
            if ((as = cells) != null && (n = as.length) > 0) {
                //如果线程没有初始化过cell
                if ((a = as[(n - 1) & h]) == null) {
                  	//cellsBusy是一个锁 判断当前是否存在cell创建 0代表当前没有创建竞争
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(Double.doubleToRawLongBits(x));
                        //double-check 并且进行cas加锁 cellsBusy 设置成1 
                        //加锁的目的就是为了防止并发创建cell
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                //cells没有满就进行创建
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //释放锁
                                cellsBusy = 0;
                            }
                           //创建成功就跳出
                            if (created)
                                break;
                           //失败就继续
                           // 一般失败的情况就是cells[]满了 需要扩容了
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //上面的调用链路中 cas执行失败才会进入doubleAccumulate
                //失败的话wasUncontended就是false
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //cell存在的情况下进行数据cas替换
                else if (a.cas(v = a.value,
                               ((fn == null) ?
                                Double.doubleToRawLongBits
                                (Double.longBitsToDouble(v) + x) :
                                Double.doubleToRawLongBits
                                (fn.applyAsDouble
                                 (Double.longBitsToDouble(v), x)))))
                    break;
                //如果长度n>cpu的个数 或者 cells已经被扩容了
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //进行扩容
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            //每次扩容 容量翻倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
              	//对线程的随机值进行修改 尝试修改当前线程slot的位置 看是否存在空缺
                h = advanceProbe(h);
            }
            //如果数据为空进行初始化
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                      	//初始化容量是2
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
           //尝试进行替换
            else if (casBase(v = base,
                             ((fn == null) ?
                              Double.doubleToRawLongBits
                              (Double.longBitsToDouble(v) + x) :
                              Double.doubleToRawLongBits
                              (fn.applyAsDouble
                               (Double.longBitsToDouble(v), x)))))
                break;                          // Fall back on using base
        }
    }

get()

    public double get() {
        Cell[] as = cells; Cell a;
        double result = Double.longBitsToDouble(base);
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    result = function.applyAsDouble
                        (result, Double.longBitsToDouble(a.value));
            }
        }
        return result;
    }

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ea0vgNKO-1678182390814)(/Users/cool/Documents/门票md/学习/java/concurrent/atomic/DoubleAccumulator & LongAccumulator.assets/image-20230307173115829.png)]

这个模式和ConcurrentHashMap的size()方法非常类似

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
//size方法调用了此方法
final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

大家注意一点哈 volatile 只是保证了可见性 而不是保证原子性,所以如果在get的时候存在并发修改,最终的结果可能是不同步的。

reset()

    public void reset() {
        Cell[] as = cells; Cell a;
       //进行重置  为什么base需要重置 看下面的序列化介绍
        base = identity;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                  //进行重置
                    a.value = identity;
            }
        }
    }

getThenReset()

这个就是get和reset的结合体

 public double getThenReset() {
        Cell[] as = cells; Cell a;
        double result = Double.longBitsToDouble(base);
        base = identity;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    double v = Double.longBitsToDouble(a.value);
                    a.value = identity;
                    result = function.applyAsDouble(result, v);
                }
            }
        }
        return result;
    }

关于序列化

此类单独实现了一个序列化代理SerializationProxy,重写序列化的目的 主要是为了节省序列化的大小,我们看代码可以看到Striped64类中的字段都被transient修饰了,被transient修饰的数据是不参与序列化的,类似的还有常见的HashTable。

    private static class SerializationProxy implements Serializable {
        private static final long serialVersionUID = 7249069246863182397L;
        //真正序列化 只会序列化下面的几个字段
        private final double value;
        private final DoubleBinaryOperator function;
        private final long identity;
        SerializationProxy(DoubleAccumulator a) {
            function = a.function;
            identity = a.identity;
            value = a.get();
        }

        private Object readResolve() {
            double d = Double.longBitsToDouble(identity);
            DoubleAccumulator a = new DoubleAccumulator(function, d);
            //最终读取来的数据最终集复制给了base 所以上面reset的时候需要重新初始化base字段
          	//这个位置赋值 不用去重新创建大量的cell了起到了节省内存的目的,同时cell变少了 get()方法的速度 对应的就变快了
            a.base = Double.doubleToRawLongBits(value);
            return a;
        }
    }

总结

  • 初始cells size大小为2,创建cell会加锁
  • Cells最大size受到cpu核数限制,最大为N(cpu核数)向上取2的n次幂
  • get方法是实时计算总值,如果存在并发修改可能感知不到。
  • volatile只是保证了可见性,不保证原子性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/393976.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

​力扣解法汇总1599. 经营摩天轮的最大利润

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣 描述&#xff1a; 你正在经营一座摩天轮&#xff0c;该摩天轮共有 4 个座舱 &#xff0c;每个座舱…

Kubernetes调度之Pod亲和性

Kubernetes调度中的Pod亲和性abstract.pngPod亲和性节点亲和性&#xff0c;是基于节点的标签对Pod进行调度。而Pod亲和性则可以实现基于已经在节点上运行Pod的标签来约束新Pod可以调度到的节点。具体地&#xff0c;如果X上已经运行了一个或多个满足规则Y的Pod&#xff0c;则这个…

在ubuntu上(docker虚拟环境)部署完laravel的环境后如何运行一个基础的laravel项目

先测试laravel有没有安装成功 laravel如果报laravel command not found&#xff0c;先测试是否安装成功 find / -name laravel出现结果&#xff1a; 说明已经安装成功只是没有配环境变量 要么进这些文件夹测试那个路径下有真的laravel可执行文件&#xff0c;要么每个分别配置…

MCP2515国产替代DP2515带有SPI 接口的独立CAN 控制器

DP2515是一款独立控制器局域网络&#xff08;Controller AreaNetwork&#xff0c; CAN&#xff09;协议控制器&#xff0c;完全支持CAN V2.0B 技术规范。该器件能发送和接收标准和扩展数据帧以及远程帧。DP2515自带的两个验收屏蔽寄存器和六个验收滤波寄存器可以过滤掉不想要的…

图像处理--基于像素层面

python 图像锐化 图像锐化可以使图像的边缘更加清晰&#xff0c;增强图像的细节。常见的图像锐化算法有拉普拉斯算子、Sobel算子、Prewitt算子等。下面是使用拉普拉斯算子实现图像锐化的Python代码&#xff1a; import cv2 import numpy as npdef laplacian_sharpen(img, ksi…

MySQL日志文件

文章目录1.MySQL中的日志文件2.bin log的作用3.redo log的作用4.bin log和redo log的区别&#xff08;1&#xff09;存储的内容&#xff08;2&#xff09;功能&#xff08;3&#xff09;写入时间&#xff08;4&#xff09;写入方式5.两阶段提交6.undo log的作用1.MySQL中的日志…

springcloud3 fegin实现服务调用1

一 Fegin的作用 1.1 fegin的作用 fegin是一个声明式的web服务客户端&#xff0c;让编写web服务器客户端变得非常容易&#xff0c;只需创建一个接口并在接口中添加FeginClients注解即可。 Fegin的使用方式&#xff1a;使用fegin的注解定义接口&#xff0c;调用这个接口&#…

BI软件工具也有ChatGPT

ChatGPT最近大火&#xff0c;朋友圈、聊天群啊到处都在分享它、讨论它。我也凑了个热闹&#xff0c;先和它聊了一下孩子学习上的困惑&#xff0c;然后用它给孩子出了一套易错题型的练习题&#xff0c;缓解了我做为熊孩子家长的压力。ChatGET能做的可不止这些&#xff0c;还能写…

MTK平台开发入门到精通(休眠唤醒篇)休眠唤醒LPM框架

文章目录 一、lpm驱动源码分析二、设备属性调试文件沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇文章将介绍 lpm 驱动源码分析。 mtk 平台下,其默认的 lpm 机制的源码位置:drivers/misc/mediatek/lpm/ 一、lpm驱动源码分析 目录:drivers/misc/mediatek/lpm/…

aardio 编程语言

今天看到一篇文章《独自坚持 17 年&#xff0c;aardio 作者&#xff1a;“因妻子患癌&#xff0c;再无精力维护项目”》&#xff0c;才知道有个这个项目&#xff0c;也算是很有情怀的一个开发者&#xff0c;对程序有着真挚的热忱&#xff0c;aardio &#x1f50a; 专注于桌面软…

Unity Avatar Cover System - 如何实现一个Avatar角色的智能掩体系统

文章目录简介变量说明实现动画准备动画状态机State 状态NoneStand To CoverIs CoveringCover To Stand高度适配高度检测脚部IK简介 本文介绍如何在Unity中实现一个Avatar角色的智能掩体系统&#xff0c;效果如图所示&#xff1a; 初版1.0.0代码已上传至SKFramework框架Package…

数据库系统-关系模式

二、DB的抽象与演变 2.1 DB的标准结构 DBMS管理数据的三个层次 ● (局部模式)Entrenal Level 外部层次 User Level 用户层次 ○ 用户能够看到雨处理的数据&#xff0c;全局数据中的某一部分 ● (全局模式)Conceptual Level 概念层次 Logic Level 逻辑层次 ○ 从全局角度理解…

【Linux】编译器gcc g++和调试器gdb的使用

文章目录1.编译器gcc/g1.1C语言程序的翻译过程1.预处理2.编译3.汇编4. 链接1.2 链接方式与函数库1.动态链接与静态链接2.动态库与静态库1.3 gcc与g的使用2.调试器gdb2.1debug和release2.2gdb的安装2.3gdb的使用2.4gdb的常用指令3.总结1.编译器gcc/g 1.1C语言程序的翻译过程 1…

一个ELF文件分析和逆向的过程

CrackMe1、2分析和逆向的过程 1. CrackMe1、2相关信息 CrackMe1 1、CrackMe1是一个ELF可执行文件&#xff0c;可在Android中独立执行 2、通过adb(Android SDK中默认带有adb工具)将CrackMe1 push到远程Android设备中&#xff0c;建议放在/data/local/tmp目录下 3、打开adb shel…

关于MSVCR100.dll、MSVCR100d.dll、Msvcp100.dll、abort()R6010等故障模块排查及解决方法

一、常见故障介绍  最近在开发相机项目&#xff08;项目细节由于公司保密就不介绍了&#xff09;&#xff0c;程序运行5个来月以来首次出现msvcr100.dll故障等问题&#xff0c;于是乎开始了分析之路&#xff0c;按照度娘上的一顿操作&#xff0c;期间也是出现了各种不一样的问…

Lombok常见用法总结

目录一、下载和安装二、常见注释&#xff08;一&#xff09;Data&#xff08;二&#xff09;Getter和Setter&#xff08;三&#xff09;NonNull和NotNull&#xff08;不常用&#xff09;&#xff08;四&#xff09;ToString&#xff08;不常用&#xff09;&#xff08;五&#…

一款丧心病狂的API测试工具:Apifox!

你好&#xff0c;我是测试开发工程师——凡哥。欢迎和我交流测试领域相关问题&#xff08;测试入门、技术、python交流都可以&#xff09; 我们平时在做接口测试的时候&#xff0c;对于一些常用的接口测试工具的使用应该都非常熟悉了&#xff1a; 接口文档&#xff1a;Swagge…

Databend 开源周报 第 83 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.com 。Whats New探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。Support for WebHDFSHDFS 是大数…

ArrayList集合底层原理

ArrayList集合底层原理ArrayList集合底层原理1.介绍2.底层实现3.构造方法3.1集合的属性4.扩容机制5.其他方法6.总结ArrayList集合底层原理 1.介绍 ​ ArrayList是List接口的可变数组的实现。实现了所有可选列表操作&#xff0c;并允许包括 null 在 内的所有元素。 每个 Array…

静态库和动态库的打包与使用

静态库和动态库 静态库和动态库的打包 生成可执行程序时链接使用 运行可执行程序时加载使用 提前声明&#xff0c;笔者示例的文件有mian.c/child.c/child.h。OK&#xff0c;我们先了解一下&#xff0c;库文件是什么&#xff1f;它其实就是打包了一堆实现常用功能的代码文件. ⭐…