Java并发-补充
本文最后更新于 2025-03-06,文章超过7天没更新,应该是已完结了~
两阶段终止模式
在一个线程 T1 中如何“优雅”终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。
1. 错误思路
使用线程对象的 stop() 方法停止线程
stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁, 其它线程将永远无法获取锁
使用 System.exit(int) 方法停止线程
目的仅是停止一个线程,但这种做法会让整个程序都停止
2.使用isInterrupted
public class 两阶段中止模式 {
public static void main(String[] args) {
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
twoPhaseTermination.start();
}
}
class TwoPhaseTermination{
private Thread monitor;
//启动监控线程
public void start(){
monitor=new Thread(()->{
while (true){//因为是监控线程 所以得一直运行 且为了有间隔得运行 设置sleep
Thread currentThread = Thread.currentThread();
if(currentThread.isInterrupted()){
System.out.println("料理后事");
break;
}
try {
Thread.sleep(1);//若该监控线程在sleep被打断,则会执行catch异常处理,打断标记清除 为false
System.out.println("执行监控功能");//若该监控线程正常运行,则打断标记为true
} catch (InterruptedException e) {
e.printStackTrace();
//重新设置打断标记 为true
currentThread.interrupt();
}
}
});
}
//停止监控线程
public void stop(){
monitor.interrupt();
}
}
11:49:42.915 c.TwoPhaseTermination [监控线程] - 将结果保存
11:49:43.919 c.TwoPhaseTermination [监控线程] - 将结果保存
11:49:44.919 c.TwoPhaseTermination [监控线程] - 将结果保存
11:49:45.413 c.TestTwoPhaseTermination [main] - stop
11:49:45.413 c.TwoPhaseTermination [监控线程] - 料理后事
也可以使用 volatile描述变量stop来判断程序是否该停止,volatile是为了保证该变量再多个线程之间的可见性。
public class AfterNodeAccess {
private Thread thread;
private volatile boolean stop = false;
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(stop) {
System.out.println("料理后事");
break;
}
try {
Thread.sleep(5000);
System.out.println("将结果保存");
} catch (InterruptedException e) {}
// 执行监控操作
}
},"监控线程");
thread.start();
}
public void stop() {
stop = true;
thread.interrupt();
}
public static void main(String[] args) {
AfterNodeAccess afterNodeAccess = new AfterNodeAccess();
afterNodeAccess.start();
Scanner scanner = new Scanner(System.in);
if(scanner.hasNextInt()){
afterNodeAccess.stop();
}
}
}
有个细节,就是在sleep期间,如果使用了stop程序,则不会等待睡眠时间直接退出。
统筹规划模式
public class 烧水泡茶 {
public static void main(String[] args) {
Thread t1=new Thread(()->{
try {
System.out.println("洗水壶");
Thread.sleep(1000);
System.out.println("烧开水");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"老王");
Thread t2=new Thread(()->{
try {
System.out.println("洗茶壶");
Thread.sleep(1000);
System.out.println("洗茶杯");
Thread.sleep(2000);
System.out.println("拿茶叶");
Thread.sleep(1000);
t1.join();//必须等老王烧开水完就可以泡茶了(等T1线程执行完)
System.out.println("泡茶");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"小王");
t1.start();
t2.start();
}
}
Balking(犹豫)模式
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
单例模式也是Balking模式的实现。
线程的状态
从Java层描述的六种状态:
重新理解一下线程状态(了解):
情况1:NEW->RUNNABLE
当调用t.start()方法时,由NEW->RUNNABLE
情况2:RUNNABLE->WAITING
t线程调用synchronized(obj)获得对象锁后
。调用obj.wait()方法时,t线程从RUNNABLE->WAITING
。调用obj.notify(),obj.notifyAll() ,t.interrupt() 时
竞争锁成功,t线程从WAITING->RUNNABLE
竞争锁失败,t线程从WAITING->BLOCKED
解释一下在有锁的情况下打断正在占有锁的线程会发生什么:
public class test {
public static void main(String[] args) throws InterruptedException {
Object a=1;
Thread t1 = new Thread(() -> {
synchronized (a) {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("t1正在运行");
// if (currentThread.isInterrupted()){
// //让t1线程睡一会,t2线程就可以抢
// TimeUnit.SECONDS.sleep(5);
// }
} catch (InterruptedException e) {
e.printStackTrace();
//因为打断正在sleep的线程 打断标记为false 所以重新设置打断标记
// currentThread.interrupt();
}
}
}
});
Thread t2 = new Thread(() -> {
synchronized (a) {
for (int i = 0; i < 1000; i++) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("t2正在运行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t1.start();
t2.start();
TimeUnit.SECONDS.sleep(2);
t1.interrupt();
}
}
如果t1线程(占有锁)被打断后,如果没有任何操作导致t1线程不会马上继续运行,那么它抢占锁的速度绝对比t2快。如果有操作延迟t1的打断时间,则t2会先抢占锁。
情况3: RUNNABLE -> WAITING
当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING
注意是当前线程在t 线程对象的监视器上等待
t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE
当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING -> RUNNABLE
情况2*: RUNNABLE -> TIMED_WAITING(同情况2一样)
t 线程用 synchronized(obj) 获取了对象锁后
调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
竞争锁成功,t 线程从 TIMED_WAITING --> RUNNABLE
竞争锁失败,t 线程从 TIMED_WAITING --> BLOCKED
情况3*:RUNNABLE -> TIMED_WAITING
当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING
注意是当前线程在t 线程对象的监视器上等待
当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前程从TIMED_WAITING-> RUNNABLE
当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis时,当前线程从 RUNNABLE --> TIMED_WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让线程从TIMED_WAITING--> RUNNABLE
情况4:RUNNABLE -> TIMED_WAITING
当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING --> RUNNABLE
保护性暂停模式(同步):
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果。
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是此模式。因为要等待另一方的结果,因此归类到同步模式。
public class 同步模式之保护性暂停 {
//线程1等下线程2下载结果
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
//等待结果
try {
System.out.println("等待结果");
int o =(int) guardedObject.get();
System.out.println("结果是:"+o);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
System.out.println("执行下载");
guardedObject.complete(1);
},"t2").start();
}
}
class GuardedObject{
//结果
private Object response;
//获取结果
public Object get() throws InterruptedException {
synchronized (this){
while (response==null){
this.wait();
}
return response;
}
}
//产生结果
public void complete(Object response){
synchronized (this){
this.response=response;
this.notifyAll();
}
}
}
带超时版的get
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
if (waitTime <= 0) {
log.debug("break...");
break; }
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400 (关键)
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response; }
}
join原理其实就是保护性暂停模式的实现
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
这是最终走进的方法,当isAlive,就wait。注意当前锁住是整个对象。我们在A线程中调用B线程的Join方法,也就是B线程充当了这把锁,但调用者是A线程,也就是说挂起来的是A线程(谁调用wait谁休眠),当B线程还活着,就一直wait,只有当B线程执行完了,才会被唤醒,所以易推测出当B线程执行完毕会有一个收尾工作:使用notify方法,不然A线程就会一直挂着了,此代码可以在JVM源码中看到。
生产者/消费者(异步)
同步模式之顺序控制
1.固定运行顺序
比如,必须先 2 后 1 打印
Thread t1 = new Thread(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { }
// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
System.out.println("1");
});
Thread t2 = new Thread(() -> {
System.out.println("2");
// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(t1);
});
t1.start();
t2.start();
2.交替输出
比如,交替输出abc
public class test {
public static void main(String[] args) {
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();
as.start(aWaitSet);
new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();
}
}
class AwaitSignal extends ReentrantLock {
public void start(Condition first) {
this.lock();
try {
System.out.println("start");
first.signal();
} finally {
this.unlock();
}
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
this.lock();
try {
current.await(); //a休息室等待,经过start方法后a休息室被呼叫
System.out.println(str);
next.signal(); //呼叫b休息室
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}
解决哲学家就餐问题
其实很简单,就是使用ReentrantLock的tryLock()方法
public class 哲学家就餐问题 {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}
class Philosopher extends Thread{
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
private void eat() throws InterruptedException {
System.out.println(super.getName()+"eating...");
Thread.sleep(1000);
}
@Override
public void run() {
while (true) {
// 获得左手筷子 如果拿不到左手筷子 那就先放下来,等待下一次循环再尝试拿
if(left.tryLock()){
// 获得右手筷子 如果拿不到右手筷子 那就先放下来,等待下一次循环再尝试拿
try {
if(right.tryLock()){
try {
eat(); //都拿到了
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
right.unlock();
}
}
}finally {
left.unlock();
}
}
}
}
}
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
double-checked locking 问题
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
if(INSTANCE == null) { // t2
// 首次访问会同步,而之后的使用没有 synchronized
synchronized(Singleton.class) {
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
对应的字节码
0 getstatic #2 <Singleton.INSTANCE>
3 ifnonnull 37 (+34)
6 ldc #3 <Singleton>
8 dup
9 astore_0
10 monitorenter
11 getstatic #2 <Singleton.INSTANCE>
14 ifnonnull 27 (+13)
17 new #3 <Singleton>
20 dup
21 invokespecial #4 <Singleton.<init>>
24 putstatic #2 <Singleton.INSTANCE>
27 aload_0
28 monitorexit
29 goto 37 (+8)
32 astore_1
33 aload_0
34 monitorexit
35 aload_1
36 athrow
37 getstatic #2 <Singleton.INSTANCE>
40 areturn
关键在 24行 将 Singleton实例 赋给 static INSTANCE这个静态变量;21行 调用Singleton这个类的构造方法。
有一个严重的问题,第一个 if 使用了 INSTANCE 变量,是在同步块之外,说明这个 INSTANCE 变量是不受synchronized控制,再加上synchronized里面的代码执行指令可能发生重排序,比如先执行24行,再执行21行,那么多线程情况下INSTANCE 变量就获得了没有执行过构造方法的一个不完整的实例。
解决方式就是给INSTANCE 变量加 volatile修饰,加入内存屏障保证 执行该变量的操作不会发生重排序。
happens-before
happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结,抛开以下 happens-before 规则,JMM 并不能保证一个线程对共享变量的写,对于其它线程对该共享变量的读可见。
如何判断是否为 happens-before?
程序次序规则: 在一个单独的线程中,按照程序代码的执行流顺序,(时间上)先执行的操作happen—before(时间上)后执行的操作
(同一个线程中前面的所有写操作对后面的操作可见)管理锁定规则:一个unlock操作happen—before后面(时间上的先后顺序)对同一个锁的lock操作。
(如果线程1解锁了monitor a,接着线程2锁定了a,那么,线程1解锁a之前的写操作都对线程2可见(线程1和线程2可以是同一个线程))
volatile变量规则:对一个volatile变量的写操作happen—before后面(时间上)对该变量的读操作。
(如果线程1写入了volatile变量v(临界资源),接着线程2读取了v,那么,线程1写入v及之前的写操作都对线程2可见(线程1和线程2可以是同一个线程))
线程启动规则:Thread.start()方法happen—before调用用start的线程前的每一个操作。
(假定线程A在执行过程中,通过执行ThreadB.start()来启动线程B,那么线程A对共享变量的修改在接下来线程B开始执行前对线程B可见。注意:线程B启动之后,线程A在对变量修改线程B未必可见。)
线程终止规则:线程的所有操作都happen—before对此线程的终止检测,可以通过Thread.join()方法结束、Thread.isAlive()的返回值等手段检测到线程已经终止执行。
(线程t1写入的所有变量,在任意其它线程t2调用t1.join(),或者t1.isAlive() 成功返回后,都对t2可见。)线程中断规则:对线程interrupt()的调用 happen—before 发生于被中断线程的代码检测到中断时事件的发生。
(线程t1写入的所有变量,调用Thread.interrupt(),被打断的线程t2,可以看到t1的全部操作)
对象终结规则:一个对象的初始化完成(构造函数执行结束)happen—before它的finalize()方法的开始。
(对象调用finalize()方法时,对象初始化完成的任意操作,同步到全部主存同步到全部cache。)
传递性:如果操作A happen—before操作B,操作B happen—before操作C,那么可以得出A happen—before操作C。
(A h-b B , B h-b C 那么可以得到 A h-b C)
在程序运行过程中,所有的变更会先在寄存器或本地cache中完成,然后才会被拷贝到主存以跨越内存栅栏(本地或工作内存到主存之间的拷贝动作),此种跨越序列或顺序称为happens-before。happens-before本质是顺序,重点是跨越内存栅栏。通常情况下,写操作必须要happens-before读操作,即写线程需要在所有读线程跨越内存栅栏之前完成自己的跨越动作,其所做的变更才能对其他线程可见。
字段更新器
保证多线程访问同一个对象的成员变量时, 成员变量的线程安全性。
AtomicReferenceFieldUpdater -----引用类型的属性
AtomicIntegerFieldUpdater -----整形的属性
AtomicLongFieldUpdater -----长整形的属性
注意:利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常。
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
@Slf4j(topic = "guizy.AtomicFieldTest")
public class AtomicFieldTest {
public static void main(String[] args) {
Student stu = new Student();
// 获得原子更新器
// 泛型
// 参数1 持有属性的类 参数2 被更新的属性的类
// newUpdater中的参数:第三个为属性的名称
AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
// 期望的为null, 如果name属性没有被别的线程更改过, 默认就为null, 此时匹配, 就可以设置name为张三
System.out.println(updater.compareAndSet(stu, null, "张三"));
System.out.println(updater.compareAndSet(stu, stu.name, "王五"));
System.out.println(stu);
}
}
class Student {
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
原子累加器 (LongAddr) (重要)
LongAddr
LongAccumulator
DoubleAddr
DoubleAccumulator
原子整数 AtomicLong 和原子累加器 LongAddr 在累加方面的性能比较
@Slf4j(topic = "guizy.Test")
public class Test {
public static void main(String[] args) {
System.out.println("----AtomicLong----");
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
System.out.println("----LongAdder----");
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
}
//adderSupplier 无参有返回
//Consumer 有参无返回
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 40; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start) / 1000_000);
}
}
LongAddr 性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
AtomicLong 都是在一个共享资源变量上进行竞争, while(true)循环进行CAS重试, 性能没有LongAdder高。
LongAdder原理 (了解)
源码之 LongAdder
public class LongAdder extends Striped64 implements Serializable{...}
LongAdder 继承了Striped64,Striped64又是什么?
Striped64是一个高并发累加的工具类。
Striped64设计核心思路就是通过内部的分散计算来避免竞争。
Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表。 没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,会将要累加的数累加到Cells数组中的某个cell元素里面。
那么我们看看Striped64 核心属性
// 存放cell的hash表,大小为2乘幂
transient volatile Cell[] cells;
// 基础值(1.无竞争时更新,2.cells数组初始化过程不可用时,也会通过cas累加到base)
transient volatile long base;
// 自旋锁,通过CAS操作加锁 0无锁 1获得锁(初始化cells数组,创建cell单元,cells扩容)
transient volatile int cellsBusy;
看看Striped64 中Cell类的源码
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
//最重要的方法, 采取 cas 方式进行累加
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
....
@sun.misc.Contended 是 Java 8 新增的一个注解,对某字段加上该注解则表示该字段会单独占用一个缓存行(Cache Line)。
为什么要添加@sun.misc.Contended注解?这就涉及到缓存行伪共享的原理了
原理之缓存行伪共享
缓存行伪共享得从缓存说起。
缓存与内存的速度比较
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)。缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性 (缓存一致性),如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
因为 Cell 是数组形式,在内存中是连续存储的,又因为缓存行大小够放两个Cell(一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value)),所以导致这两个Cell[0和1]都在一个缓存行。
这样问题来了:
Core-0 要修改 Cell[0]
Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效。
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将Cell对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
LongAdder的add方法源码
public void add(long x) {
// as 为cell数组的引用
// b 为基础值
// x 为累加值
//uncontended=true代表当前线程对应的cell单元格CAS成功,false代表因出现竞争CAS失败
Cell[] as; long b, v; int m; Cell a;
//1.cells数组不为空,则进行下一个if判断
//2.cells数组为空,则尝试进行cas base累加,失败则进行下一个if判断
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
/*
进入longAccumulate方法的三种状态
1.as == null || (m = as.length - 1) 当前cells数组为空,没有初始化 --->对应CASE-3
2.a = as[getProbe() & m]) == null 获取当前线程的hash值然后与数组长度进行&运算,获得对应cell单元格为空--->对应CASE-1
3.uncontended = a.cas(v = a.value, v + x) 前面判断得知当前对应的cell单元格存在,再次进行CAS,因竞争导致重试失败--->对应CASE-2
*/
longAccumulate(x, null, uncontended);
}
}
longAccumulate()源码
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//当前线程还没有对应的cell,需要随机生成一个h值用来将当前线程绑定到cell
if ((h = getProbe()) == 0) {
//初始化probe
ThreadLocalRandom.current();
//h 对应新的probe值,用来对应cell
h = getProbe();
wasUncontended = true;
}
//collide 为true 表示需要扩容
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
//CASE-1
/*
二次判断 当前cells数组存在
*/
if ((as = cells) != null && (n = as.length) > 0) {
//二次判断 没有cell
if ((a = as[(n - 1) & h]) == null) {
//当前cellBusy为0 没有线程竞争
if (cellsBusy == 0) {
Cell r = new Cell(x);
//获取当前cells数组的锁
if (cellsBusy == 0 && casCellsBusy()) {
//cell对象是否创建完成的标志位
boolean created = false;
try {
/*
再次判断,老样子,防止系统调度原因出现线程的二次修改
*/
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
// cell单元格创建完成,更新created为true
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
//成功创建cell则break,失败则continue继续
if (created)
break;
continue;
}
}
collide = false;
}
//CASE-2:当前线程对应的cell单元格CAS写入数据失败出现竞争
else if (!wasUncontended)
wasUncontended = true;
//CAS base尝试 成功跳出循环 失败进行下一次else-if
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果当前cell长度已经超过当前机器的CPU数量,拒绝扩容
else if (n >= NCPU || cells != as)
collide = false;
else if (!collide)
collide = true;
//对cell数组进行扩容
//获取到cells数组的锁,开始进行扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//还是二次判断
if (cells == as) {
//cells数组扩容为原来2倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);
}
//CASE-3:上一步else-if失败,说明cells数组为空。对cell数组进行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
/*
cellBusy为0表示没有线程获取到当前cell的锁,通过CAS将cellBusy更新为1获取到锁,然后开始初始化cells数组
init: cells数组是否初始化完成的标志位
*/
boolean init = false;
try {
/*
再次判断当前cell数组是否为前面as引用。这里会出现一种情况,就是线程1执行到cellBusy=0后
因为系统调度让出CPU
这时线程2更新cellBusy成功并成功获取锁然后初始化cell数组成功并释放锁,最后将cellBusy置为0
然后线程1得到cpu时间继续更新cellBusy值获取锁成功,但此时cell数组已经被线程2初始化成功了,
此时线程1再次进行初始化会覆盖掉线程2已经初始化的cell数组。
声明:其他地方的判断都是防止系统调度原因防止线程再次操作而覆盖。
*/
if (cells == as) {
Cell[] rs = new Cell[2];
//当前线程的hash值和1进行&运算,结果只会是0和1所以这个线程只会定位到cell[0]或者cell[1]的单元格
rs[h & 1] = new Cell(x);
cells = rs;
// init为true表示cell数组初始化完成
init = true;
}
} finally {
// 更新cellBusy为0,释放锁
cellsBusy = 0;
}
// cell数组初始化完成,跳出当前循环,开始下一轮循环
if (init)
break;
}
//CASE-4:上一步else-if失败,说明cellBusy设置为1失败并且获取cellBusy失败,说明出现了竞争
//再次尝试CAS base(没用cell的那种)
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}
获取最终结果通过sum方法
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
遍历cell数组的value并和base进行求和,最终得到sum值。
Unsafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
可以发现AtomicInteger以及其他的原子类, 底层都使用的是Unsafe类
使用底层的Unsafe类实现原子操作
public class test {
public static void main(String[] args) throws Exception{
// 通过反射获得Unsafe对象
Class<Unsafe> unsafeClass = Unsafe.class;
// 获得构造函数,Unsafe的构造函数为私有的 如果是共有的不用Declared
Constructor<Unsafe> constructor = unsafeClass.getDeclaredConstructor();
// 设置为允许访问私有内容
constructor.setAccessible(true);
// 创建Unsafe对象实例
Unsafe unsafe = constructor.newInstance();
Person person = new Person();
// 获得其属性 name 的偏移量
long name = unsafe.objectFieldOffset(Person.class.getDeclaredField("name"));
long age = unsafe.objectFieldOffset(Person.class.getDeclaredField("age"));
// 通过unsafe的CAS操作改变值
unsafe.compareAndSwapObject(person,name,null,"wzc");
unsafe.compareAndSwapInt(person,age,0,22);
System.out.println(person);
unsafe.compareAndSwapObject(person,name,"wzc","wq");
System.out.println(person);
}
}
class Person {
// 配合CAS操作,必须用volatile修饰
volatile String name;
volatile int age;
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
共享模型之不可变
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改。
这样的对象在 Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类:
public class test {
public static void main(String[] args) throws Exception{
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(()->{
LocalDate date = dateTimeFormatter.parse("2022-10-01", LocalDate::from);
System.out.println(date);
}).start();
}
}
}
不可变对象,实际是另一种避免竞争的方式。
不可变设计
final 的使用
另一个大家更为熟悉的 String 类也是不可变的,以它为例,说明一下不可变设计的要素
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0
final 修饰 String 类,保证子类不会破坏父类的方法
final 修饰 value[] 属性,保证只读,外界不可修改该属性
private 修饰 hash 属性,并且没有set方法,外界无法修改
保护性拷贝
但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是如何实现的,就以 substring 为例:
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 fifinal char[] value 做出了修改:
public String(char value[], int offset, int count) {
if (offset < 0) {
throw new StringIndexOutOfBoundsException(offset);
}
if (count <= 0) {
if (count < 0) {
throw new StringIndexOutOfBoundsException(count);
}
if (offset <= value.length) {
this.value = "".value;
return;
}
}
// Note: offset or count might be near -1>>>1.
if (offset > value.length - count) {
throw new StringIndexOutOfBoundsException(offset + count);
}
this.value = Arrays.copyOfRange(value, offset, offset+count);
}
结果也没有。是通过Arrays.copyOfRange() 方法 生成新的char[] value,然后对原有内容进行复制。
这种通过创建符本对象来避免共享的手段称之为 保护性拷贝
final 原理
public class TestFinal {
final int a = 20; }
字节码
0 aload_0
1 invokespecial #1 <java/lang/Object.<init>>
4 aload_0
5 bipush 20
7 putfield #2 <test.a>
<-----------写屏障
10 return
发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况。
注意:
final成员变量表示常量 ,只能被赋值一次,赋值后值不再改变(final要求地址值不能改变)
final成员变量必须在声明的时候初始化或者在构造器中初始化,否则就会报编译错误
接口中声明的所有变量本身是final的(如果你在某接口的实现类A中把x改为其他值,那么另一个实现类B中对x有依赖的方法全部都出错了,这样接口还怎么能起到“模板”的作用呢)
享元设计模式
英文名称:Flyweight pattern。当需要重用数量有限的同一类对象时会用到享元模式。
包装类
在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
注意:
Byte, Short, Long 缓存的范围都是 -128~127
Character 缓存的范围是 0~127
Integer的默认范围是 -128~127。最小值不能变 ,但最大值可以通过调整虚拟机参数
-Djava.lang.Integer.IntegerCache.high
来改变Boolean 缓存了 TRUE 和 FALSE
- 感谢你赐予我前进的力量