# Java拾遗-并发-Lock
一个个小例子深入了解知识点
代码地址
# 5. synchronized
/**
* 线程锁 - synchronized - 可重入 - 不能重入就会导致死锁
*
* 锁升级 - 偏向锁 -> 轻量级锁 -> 重量级锁
*
* 偏向锁没有实际加锁,记录最初的线程ID,每次判断是不是最初的那个线程,是的话直接放行
* 不是就把当前线程升级轻量级锁,进行CAS获取,没获取到再进入自旋等待
* 自旋10圈后,还没获取到锁就升级重量级锁
*
* 执行时间短(加锁代码),线程数少,用自旋锁(自旋占用CPU)
* 执行时间长,线程数多,用系统锁
*
* synchronized锁细化,控制代码少,性能更好
* synchronized锁粗化,一个方法里太多synchronized细化锁,还不如直接当前方法一个大锁即可
*
* 下面的代码
* m1()方法用的object对象锁,这样需要去创建很多对象,直接使用this即可
* 所以m1()和m2()是相等的
*
* 如果m1()和m2()方法是整个锁,可以直接写在方法上,参考m3()
* 所以m1()和m2()和m3()是相等的
*
* m4()和m5()是相等的,m4是静态方法,不能使用this,只能使用类.class
* 其实本身还是Object(this),类.class本身在初始化时会有一个Object的对象
*
* m6()方法object对象锁不能变化,object改变了,锁就变了,这个必须注意
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/15 10:20
*/
public class T05_synchronized {
private static class SynchronizedTest {
private static Integer count = 10;
private Object object = new Object();
public void m1() {
synchronized (object) {
count--;
System.out.println(Thread.currentThread().getName() + ",count = " + count);
}
}
public void m2() {
synchronized (this) {
count--;
System.out.println(Thread.currentThread().getName() + ",count = " + count);
}
}
public synchronized void m3() {
count--;
System.out.println(Thread.currentThread().getName() + ",count = " + count);
}
public static void m4() {
synchronized (SynchronizedTest.class) {
count--;
System.out.println(Thread.currentThread().getName() + ",count = " + count);
}
}
public synchronized static void m5() {
count--;
System.out.println(Thread.currentThread().getName() + ",count = " + count);
}
public void m6() {
synchronized (object) {
count--;
System.out.println(Thread.currentThread().getName() + ",count = " + count);
}
object = new Object();
}
}
}
# 6. DeadLock
/**
* 线程死锁 - synchronized
*
* 两个线程互相持有对方需要锁,导致锁住,无法继续往下执行
*
* 两个对象锁Object是static的,所以下面New了两次DeadLockTest类,两个对象锁Object还是同一个
* 如果两个对象锁Object不是static,那下面New了两次DeadLockTest类,就会有四个对象锁,不会出现死锁
*
* 嵌套死锁,A拿了B需要的锁,B拿了C需要的锁,C拿了D需要的锁,D...
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/15 10:20
*/
public class T06_DeadLock {
private static class DeadLockTest implements Runnable {
private static Object object1 = new Object();
private static Object object2 = new Object();
private String name = "default";
public DeadLockTest(String name) {
this.name = name;
}
@Override
public void run() {
if ("WangMing".equals(name)) {
synchronized (object1) {
try {
Thread.sleep(1100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object2) {
System.out.println(name);
}
}
}
if ("XiaoMing".equals(name)) {
synchronized (object2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object1) {
System.out.println(name);
}
}
}
}
}
public static void main(String[] args) {
Thread deadLock1 = new Thread(new DeadLockTest("WangMing"));
Thread deadLock2 = new Thread(new DeadLockTest("XiaoMing"));
deadLock1.start();
deadLock2.start();
}
}
# 7. volatile
volatile 的几个例子
# 7.1. volatile介绍
/**
* 关键字 - volatile
* https://blog.csdn.net/u012723673/article/details/80682208
* https://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
*
* 两大作用 1-保证线程可见性(使一个变量在多个线程间可见) 2-防止指令重排序
*
* 为什么需要保证线程可见性
*
* 共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),
* 本地内存保存了被该线程使用到的主内存的副本拷贝,线程对变量的所有操作都必须在工作内存中进行,
* 而不能直接读写主内存中的变量
*
* 对于普通的共享变量来讲,线程A将其修改为某个值发生在线程A的本地内存中,此时还未同步到主内存中去,
* 而线程B已经缓存了该变量的旧值,所以就导致了共享变量值的不一致,
* 解决这种共享变量在多线程模型中的不可见性问题,较粗暴的方式自然就是加锁,
* 但是此处使用synchronized或者Lock这些方式太重量级了,比较合理的方式其实就是volatile
*
* 简单来说,使用volatile关键字,会让所有线程都会读到变量的修改值
* 在下面的代码中,mark是存在于堆内存的volatileTest对象中
* 当线程t开始运行的时候,会把mark值从内存中读到t线程的工作区,在运行过程中直接使用这个copy,并不会每次都去
* 读取堆内存,这样,当主线程修改mark的值之后,t线程感知不到,所以不会停止运行
* 使用volatile,将会强制所有线程都去堆内存中读取mark的值
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/16 10:20
*/
public class T07_volatile_1 {
private static class VolatileTest {
public /*volatile*/ Boolean mark = Boolean.TRUE;
public void m() {
while (mark) {
}
}
}
public static void main(String[] args) {
VolatileTest volatileTest = new VolatileTest();
// lambda表达式 new Thread(new Runnable( run() {volatileTest.m()}
new Thread(volatileTest::m, "t").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
volatileTest.mark = Boolean.FALSE;
}
}
# 7.2. volatile修饰引用
/**
* 关键字 - volatile
* https://blog.csdn.net/u012723673/article/details/80682208
* https://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
*
* 两大作用 1-保证线程可见性(使一个变量在多个线程间可见) 2-防止指令重排序
*
* volatile修饰引用类型(包括数组)只能保证引用本身的可见性,不能保证内部字段的可见性
* 下面代码有两个例子
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/16 10:20
*/
public class T07_volatile_2 {
private static class VolatileTest {
public volatile static VolatileTest volatileTest = new VolatileTest();
public /*volatile*/ Boolean mark = Boolean.TRUE;
public void m() {
while (mark) {
}
}
public static void main(String[] args) {
// VolatileTest volatileTest = new VolatileTest();
new Thread(volatileTest::m, "t").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
volatileTest.mark = Boolean.FALSE;
}
}
// 分割线
private static class Data {
int a, b;
public Data(int a, int b) {
this.a = a;
this.b = b;
}
}
volatile static Data data;
public static void main(String[] args) {
Thread writer = new Thread(()->{
for (int i = 0; i < 10000; i++) {
data = new Data(i, i);
}
});
Thread reader = new Thread(()->{
while (data == null) {}
int x = data.a;
int y = data.b;
if(x != y) {
System.out.printf("a = %s, b = %s%n", x, y);
}
});
reader.start();
writer.start();
try {
reader.join();
writer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
# 7.3. volatile不能代替sync
/**
* 关键字 - volatile
* https://blog.csdn.net/u012723673/article/details/80682208
* https://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
*
* 两大作用 1-保证线程可见性(使一个变量在多个线程间可见) 2-防止指令重排序
*
* volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized
*
* 在下面的代码中,count是永远到不了10000的,因为count++不是原子性的
* 但是给count++加上synchronized就是10000
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/16 10:20
*/
public class T07_volatile_3 {
private static class VolatileTest {
public volatile static Integer count = 0;
public /*synchronized*/ void m() {
for (int i = 0; i < 1000; i++) {
/*synchronized (this) {*/
count++;
/*}*/
}
}
public static void main(String[] args) {
VolatileTest volatileTest = new VolatileTest();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threadList.add(new Thread(volatileTest::m));
}
threadList.forEach(thread -> {
thread.start();
});
threadList.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 输入永远到不了1000
System.out.println(count);
}
}
}
# 8. CAS
CAS 的几个例子,JUC 里 Atomic 包下的类底层都是 CAS 实现的
# 8.1. CAS介绍
/**
* CAS - 无锁优化(自旋) - 乐观锁
* 判断内存中某个地址的值是否为预期值,如果是就改变成新值,整个过程具有原子性
* https://www.cnblogs.com/fengzheng/p/9018152.html
* https://www.jianshu.com/p/db8dce09232d
*
* CAS操作包含三个操作数:内存位置、预期原值和新值。
* 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。
*
* 举例,3个参数,当前值(实际值)value,预期值(旧值)expect,要修改的值(更新后的值)update
* 先判断当前值value(1)是不是预期值expect(1),是的话就当前值value改成要修改的值update(2),操作结束
* 如果当前值value(2)不是预期值expect(1),说明当前值value(1)被其他线程改了变成value(2),就继续自旋
* 重新判断当前值value(2)是不是预期值expect(2),是的话就改成要修改的值update(3),如此直到成功
*
* 借用AtomicXXX类(JUC并发包下的原子类)来了解CAS,因为AtomicXXX类底层用的都是CAS实现的
* JDK8查看incrementAndGet方法内部实现,是使用Unsafe(操作内存)下的CompareAndSwap(比较并交换)(native)方法
*
* AtomicXXX类本身方法都是原子性的,但不能保证多个方法连续调用是原子性的
* CAS适合冲突较少的情况,如果太多线程在同时自旋,那么长时间循环会导致CPU开销很大
*
* 下面的代码是之前 T07_volatile_3 的改版,改用AtomicInteger类,可以不再使用volatile和synchronized
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/17 14:42
*/
public class T08_CAS_1_Atomic {
private static class AtomicIntegerTest {
public AtomicInteger count = new AtomicInteger(0);
public void m() {
for (int i = 0; i < 1000; i++) {
count.incrementAndGet();
}
}
public static void main(String[] args) {
AtomicIntegerTest atomicIntegerTest = new AtomicIntegerTest();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threadList.add(new Thread(atomicIntegerTest::m));
}
threadList.forEach(thread -> {
thread.start();
});
threadList.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 输入永远到不了1000
System.out.println(atomicIntegerTest.count);
}
}
}
# 8.2. CAS的ABA问题
/**
* CAS - 无锁优化(自旋) - 乐观锁
*
* ABA问题
* https://www.cnblogs.com/dream2true/archive/2019/04/23/10759763.html
* 如果内存地址V初次读取的值是A,在CAS等待期间它的值曾经被改成了B
* 后来又被改回为A,那CAS操作就会误认为它从来没有被改变过
*
* 举例,本来值是1,当前线程打算改为2,还没改的时候,前面两个线程,其中一个改为3,另一个又改回1,
* 当前线程继续修改,发现是1,就感觉之前没人修改一样
*
* 对于简单对象没什么影响,像布尔值、整型值等
* 引用对象可能会存在问题
* 举例一,你和女朋友1分手,你又找了个女朋友2,再分手,再和女朋友1复合,结果女朋友2怀孕了
* 举例二,你和你女友分手,女友又找了个男朋友后,和你复合,女朋友怀孕发现不是你的孩子
* 所以引用对象可能会存在问题
*
* ABA问题以及解决
* 使用带版本号的原子引用AtomicStampedRefence<V>,或者叫时间戳的原子引用,类似于乐观锁
*
* 下面的代码
* 普通原子引用类在另一个线程完成ABA之后继续修改(把A改成了C),带版本号原子引用有效的解决了这个问题
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/17 14:42
*/
public class T08_CAS_2_AtomicStamped {
private static AtomicReference<String> atomicReference = new AtomicReference<>("A");
private static AtomicStampedReference<String> stampReference = new AtomicStampedReference<>("A", 1);
public static void main(String[] args) {
new Thread(() -> {
// 获取到版本号1
int stamp = stampReference.getStamp();
System.out.println("t1获取到的版本号:" + stamp);
try {
// 暂停1秒,确保t1,t2版本号相同
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// ABA
atomicReference.compareAndSet("A", "B");
atomicReference.compareAndSet("B", "A");
// ABA
stampReference.compareAndSet("A", "B", stamp, stamp + 1);
stampReference.compareAndSet("B", "A", stamp + 1, stamp + 2);
// 输出版本号
System.out.println("t1线程ABA之后的版本号:" + stampReference.getStamp());
}, "t1").start();
new Thread(() -> {
// 获取到版本号为1
int stamp = stampReference.getStamp();
System.out.println("t2获取到的版本号:" + stamp);
try {
// 暂停2秒,等待t1执行完成ABA,版本号为3
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 普通原子类直接修改成功为C
System.out.print("普通原子类无法解决ABA问题");
System.out.print(",操作结果: " + atomicReference.compareAndSet("A", "C"));
System.out.println(",值结果: " + atomicReference.get());
// 版本号的原子类无法修改成功,因为版本号已经变3了,当前版本号传的是1
System.out.print("版本号的原子类解决ABA问题");
System.out.print(",操作结果: " + stampReference.compareAndSet("A", "C", stamp, stamp + 1));
System.out.println(",值结果: " + stampReference.getReference());
}, "t2").start();
}
}
# 8.3. LongAdder效率对比
/**
* CAS - 无锁优化(自旋) - 乐观锁
*
* AtomicLong,synchronized,LongAdder效率对比
*
* 各有各的快,线程太多的话,LongAdder快,synchronized慢
*
* AtomicLong底层CAS
* synchronized底层锁升级,偏向锁 -> 轻量级锁 -> 重量级锁
* LongAdder底层分段数组CAS,Striped64类
*
* LongAdder有一个根据当前并发状况动态改变的Cell数组,Cell对象里面有一个long类型的value用来存储值,
* 开始没有并发争用的时候或者是cells数组正在初始化的时候,会使用CAS来将值累加到成员变量的base上,
* 在并发争用的情况下,LongAdder会初始化cells数组,,在Cell数组中选定一个Cell加锁,
* 数组有多少个cell,就允许同时有多少线程进行修改,最后将数组中每个Cell中的value相加,在加上base的值,为最终值
* 可以看到获取值的时候调用sum方法,进行base + 数组遍历的值
* cell数组还能根据当前线程争用情况进行扩容,初始长度为2,每次扩容会增长一倍,直到扩容到大于等于CPU数量就不再扩容
*
* LongAdder类与AtomicLong类的区别在于高并发时,将对单一变量的CAS操作分散为对数组cells中多个元素的CAS操作,
* 取值时进行求和;而在并发较低时仅对base变量进行CAS操作,与AtomicLong类原理相同
*
* https://www.jianshu.com/p/ec045c38ef0c
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/17 14:42
*/
public class T08_CAS_3_LongAdder {
private static AtomicLong count1 = new AtomicLong(0L);
private static long count2 = 0L;
private static LongAdder count3 = new LongAdder();
public static void main(String[] args) throws Exception {
Thread[] threads = new Thread[1000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int k = 0; k < 100000; k++) {
count1.incrementAndGet();
}
});
}
long start = System.currentTimeMillis();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
long end = System.currentTimeMillis();
TimeUnit.SECONDS.sleep(10);
System.out.println("Atomic: " + count1.get() + " time " + (end - start));
// -----------------------------------------------------------
Object lock = new Object();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int k = 0; k < 100000; k++) {
synchronized (lock) {
count2++;
}
}
}
});
}
start = System.currentTimeMillis();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
end = System.currentTimeMillis();
System.out.println("Sync: " + count2 + " time " + (end - start));
// ----------------------------------
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int k = 0; k < 100000; k++) {
count3.increment();
}
});
}
start = System.currentTimeMillis();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
end = System.currentTimeMillis();
TimeUnit.SECONDS.sleep(10);
System.out.println("LongAdder: " + count1.longValue() + " time " + (end - start));
}
}
# 9. JUC
JUC 下提供的一些类
# 9.1. ReentrantLock
/**
* ReentrantLock - 底层使用的AQS
*
* 下面代码使用ReentrantLock锁代替了synchronized锁
* 使用ReentrantLock需要手动释放锁,再下面使用公平锁和非公平锁交替执行
*
* 使用synchronized锁定的话如果遇到异常,JVM会自动释放锁,但是Lock必须手动释放锁,因此要在finally中进行锁的释放
*
* 公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁
* 非公平锁保证:老的线程排队使用锁,但是无法保证新线程抢占已经在排队的线程的锁
* https://blog.csdn.net/m47838704/article/details/80013056
* https://www.jianshu.com/p/2ada27eee90b
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/20 11:25
*/
public class T09_JUC_1_ReentrantLock {
private static class ReentrantLockTest {
// 构造参数为true为公平锁,默认为非公平锁
public Lock lock = new ReentrantLock();
public void m1() {
try {
lock.lock();
for (int i = 0; i < 5; i++) {
System.out.println(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
public void m2() {
try {
lock.lock();
System.out.println("m2");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
// 启动一个线程执行m1
new Thread(reentrantLockTest::m1).start();
// 启动一个线程执行m2
new Thread(reentrantLockTest::m2).start();
}
}
public static void main(String[] args) {
// 构造参数为true为公平锁,默认为非公平锁
Lock lock = new ReentrantLock(true);
new Thread(() -> {
for (int i = 0; i < 1000; i++) {
try {
lock.lock();
System.out.println("t1 " + i);
} finally {
lock.unlock();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 1000; i++) {
try {
lock.lock();
System.out.println("t2 " + i);
} finally {
lock.unlock();
}
}
}).start();
}
}
# 9.2. ReentrantLock2
/**
* ReentrantLock - 底层使用的AQS
*
* 下面代码使用ReentrantLock锁代替了synchronized锁
* 使用ReentrantLock需要手动释放锁
*
* 使用synchronized锁定的话如果遇到异常,JVM会自动释放锁,但是Lock必须手动释放锁,因此要在finally中进行锁的释放
*
* 还可以使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
* 可以根据tryLock的返回值来判定是否锁定
* 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/20 11:25
*/
public class T09_JUC_2_ReentrantLock {
private static class ReentrantLockTest {
// 构造参数为true为公平锁,默认为非公平锁
public Lock lock = new ReentrantLock(true);
public void m1() {
try {
lock.lock();
for (int i = 5; i < 10; i++) {
System.out.println("m1 " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
public void m2() {
boolean lockMark = Boolean.FALSE;
try {
// 使用tryLock进行尝试锁定,3s内一直尝试获取锁,获取到直接往下执行,
// 超过3s没获取到也往下执行
lockMark = lock.tryLock(3, TimeUnit.SECONDS);
if (lockMark) {
System.out.println("m2 Lock");
} else {
System.out.println("m2 NotLock");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 锁定了才进行释放
if (lockMark) {
lock.unlock();
}
}
}
public void m3() {
// 尝试获取锁
if (lock.tryLock()) {
try {
// 拿到锁
System.out.println("m3 Lock");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} else {
// 不能获取锁
System.out.println("m3 NotLock");
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
// 启动一个线程执行m1
new Thread(reentrantLockTest::m1).start();
// 启动一个线程执行m2
new Thread(reentrantLockTest::m2).start();
// 启动一个线程执行m3
new Thread(reentrantLockTest::m3).start();
}
}
}
# 9.3. ReentrantLock3
/**
* ReentrantLock - 底层使用的AQS
*
* 下面代码使用ReentrantLock锁代替了synchronized锁
* 使用ReentrantLock需要手动释放锁
*
* 使用synchronized锁定的话如果遇到异常,JVM会自动释放锁
* 但是Lock必须手动释放锁,因此要在finally中进行锁的释放
*
* lockInterruptibly() - 待补充
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/20 15:36
*/
public class T09_JUC_3_ReentrantLock {
private static class ReentrantLockTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
// 起一个线程一直等待
Thread t1 = new Thread(() -> {
try {
lock.lock();
System.out.println("t1 start");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
System.out.println("t1 end");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("t1 interrupt");
} finally {
lock.unlock();
}
});
t1.start();
// 再起一个线程打断t1
Thread t2 = new Thread(() -> {
try {
// 强制打断拿到锁的线程,并且获取锁
lock.lockInterruptibly();
System.out.println("t2 start");
TimeUnit.SECONDS.sleep(1);
System.out.println("t2 end");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("t2 interrupt");
} finally {
lock.unlock();
}
});
t2.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.interrupt();
}
}
}
# 9.4. CountDownLatch
/**
* CountDownLatch - 底层使用的AQS
*
* 下面代码使用CountDownLatch和join
* CountDownLatch看做一个计数器,初始化给一个数
* countDown()计数减一
* await()方法必须计数为0了才能继续执行
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/20 17:00
*/
public class T09_JUC_4_CountDownLatch {
public static void main(String[] args) {
usingJoin();
usingCountDownLatch();
}
private static void usingCountDownLatch() {
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int j = 0; j < 10000; j++) result += j;
latch.countDown();
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end latch");
}
private static void usingJoin() {
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int j = 0; j < 10000; j++) result += j;
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("end join");
}
}
# 9.5. CyclicBarrier
/**
* CyclicBarrier - 底层使用的ReentrantLock
* 循环栅栏 - 给定一个线程数,参与线程执行到了这个数量就执行特定方法
* https://www.jianshu.com/p/333fd8faa56e
*
* 一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务
* 比如同时三个线程去读取数据,必须这三个线程读取完了才能把三个线程的数据合并为一个文件
*
* CyclicBarrier与CountDownLatch 区别
* CountDownLatch是一次性的,CyclicBarrier是可循环利用的
* CountDownLatch参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束
* CyclicBarrier参与的线程职责是一样的
*
* 下面代码使用CyclicBarrier设定20个线程一次执行特定方法输出20个线程了
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/20 18:50
*/
public class T09_JUC_5_CyclicBarrier {
public static void main(String[] args) {
// CyclicBarrier barrier = new CyclicBarrier(20);
CyclicBarrier barrier = new CyclicBarrier(20,
() -> System.out.println("20个线程了"));
/*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
@Override
public void run() {
System.out.println("20个线程了");
}
});*/
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
System.out.println(i);
}
}
}
# 9.6. Phaser
/**
* Phaser - 阶段器 - 用来解决控制多个线程分阶段共同完成任务的情景问题
* 其作用相比CountDownLatch和CyclicBarrier更加灵活
*
* https://blog.csdn.net/u010739551/article/details/51083004
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/20 18:50
*/
public class T09_JUC_6_Phaser {
static MarriagePhaser marriagePhaser = new MarriagePhaser();
public static void main(String[] args) {
marriagePhaser.bulkRegister(7);
for (int i = 0; i < 5; i++) {
new Thread(new Person("p" + i)).start();
}
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("所有人吃完了!" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("所有人离开了!" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
public Person(String name) {
this.name = name;
}
public void arrive() {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s 到达现场!\n", name);
marriagePhaser.arriveAndAwaitAdvance();
}
public void eat() {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s 吃完!\n", name);
marriagePhaser.arriveAndAwaitAdvance();
}
public void leave() {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s 离开!\n", name);
marriagePhaser.arriveAndAwaitAdvance();
}
private void hug() {
if (name.equals("新郎") || name.equals("新娘")) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s 洞房!\n", name);
marriagePhaser.arriveAndAwaitAdvance();
} else {
// 结束
marriagePhaser.arriveAndDeregister();
// marriagePhaser.register()
}
}
@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
}
# 9.7. ReadWriteLock
/**
* ReadWriteLock - 底层使用的AQS
* 读写锁 - Synchronized存在明显的一个性能问题就是读与读之间互斥,简言之就是,
* 可以做到读和读互不影响,读和写互斥,写和写互斥,提高读写效率
* https://www.jianshu.com/p/9cd5212c8841
*
* 下面代码18个读取线程,2个写入线程
* 使用reentrantLock执行的话读与读也存在互斥,执行了近10S
* 而使用ReadWriteLock,读与读没有互斥,只需要1S
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/21 10:11
*/
public class T09_JUC_7_ReadWriteLock {
public static ReentrantLock reentrantLock = new ReentrantLock();
public static int value = 0;
public static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public static Lock readLock = readWriteLock.readLock();
public static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(499);
System.out.println(Thread.currentThread().getName() + ":" + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(500);
value = v;
System.out.println(Thread.currentThread().getName() + ":" + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
Runnable readRunnable = () -> {
// read(reentrantLock);
read(readLock);
};
Runnable writeRunnable = () -> {
// write(reentrantLock, new Random().nextInt());
write(writeLock, new Random().nextInt());
};
// 18个线程读
for (int i = 0; i < 18; i++) {
new Thread(readRunnable).start();
}
// 2个线程写
for (int i = 0; i < 2; i++) {
new Thread(writeRunnable).start();
}
}
}
# 9.8. Semaphore
/**
* Semaphore - 底层使用的AQS
* 计数信号量 - 常用于限制可以访问某些资源的线程数量(控制线程的并发数量),例如限流
* https://www.cnblogs.com/klbc/p/9500947.html
*
* 下面代码中
* 在semaphore.acquire()和semaphore.release()之间的代码,同一时刻只允许制定个数的线程进入
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/21 10:45
*/
public class T09_JUC_8_Semaphore {
public static void main(String[] args) {
// 参数只允许一个线程同时执行,并且是公平锁
Semaphore semaphore = new Semaphore(1, true);
new Thread(() -> {
try {
// 开始
semaphore.acquire();
System.out.println("t1 start");
Thread.sleep(1000);
System.out.println("t1 end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放
semaphore.release();
}
}).start();
new Thread(() -> {
try {
// 开始
semaphore.acquire();
System.out.println("t2 start");
Thread.sleep(1000);
System.out.println("t2 end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放
semaphore.release();
}
}).start();
new Thread(() -> {
try {
// 开始
semaphore.acquire();
System.out.println("t3 start");
Thread.sleep(1000);
System.out.println("t3 end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放
semaphore.release();
}
}).start();
}
}
# 9.9. Exchanger
/**
* Exchanger - 用于两个工作线程之间交换数据的封装工具类
* https://www.jianshu.com/p/990ae2ab1ae0
* 简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,
* 则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据
*
* 下面代码将两个线程里的字符串进行了交换
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/21 10:45
*/
public class T09_JUC_9_Exchanger {
public static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(() -> {
String s = "T1";
System.out.println(Thread.currentThread().getName() + ":" + s);
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + s);
}, "t1").start();
new Thread(() -> {
String s = "T2";
System.out.println(Thread.currentThread().getName() + ":" + s);
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + s);
}, "t2").start();
}
}
# 9.10. LockSupport
/**
* LockSupport - 线程阻塞工具类 - 底层Unsafe的native方法
* https://www.jianshu.com/p/1f16b838ccd8
* https://www.jianshu.com/p/f1f2cd289205
*
* park()和unpark()可以实现类似wait()和notify()的功能,但是并不和wait()和notify()交叉,
* 也就是说unpark()不会对wait()起作用,notify()也不会对park()起作用
*
* park()和unpark()的使用不会出现死锁的情况
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/21 10:45
*/
public class T09_JUC_10_LockSupport {
public static void main(String[] args) {
Thread t = new Thread(() -> {
// 每次输出停止500ms,到5的时候park停止
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 5) {
LockSupport.park();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
// 停止5s后unpark线程t,让t继续执行
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(t);
}
}
# 10. AQS
/**
* AQS - AbstractQueuedSynchronizer
* https://www.cnblogs.com/qlsem/p/11487783.html
* https://blog.csdn.net/qq_36520235/article/details/81263037
*
* 在Lock中,用到了一个同步队列AQS,全称AbstractQueuedSynchronizer,
* 它是一个同步工具也是Lock用来实现线程同步的核心组件
*
* 从使用层面来说,AQS的功能分为两种:独占和共享
* 独占锁: 每次只能有一个线程持有锁,比如前面给大家演示的ReentrantLock就是以独占方式实现的互斥锁
* 共享锁: 允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock
*
* AQS队列内部维护的是一个FIFO的双向链表(CLH同步队列),这种结构的特点是每个数据结构都有两个指针,
* 分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。
* 每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到AQS队列中去
* 当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)
*
* CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,
* AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,
* 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
* 在CLH同步队列中,一个节点表示一个线程,
* 它保存着线程的引用(thread)、状态(status)、前驱节点(prev)、后继节点(next)
*
* 下面代码自行实现了一把锁
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/21 16:25
*/
public class T10_AQS {
public static class CustomSync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
public static class CustomLock implements Lock {
private CustomSync sync = new CustomSync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
public static int m = 0;
public static Lock lock = new CustomLock();
public static void main(String[] args) throws Exception {
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try {
// lock.lock();
for (int j = 0; j < 100; j++) {
m++;
}
} finally {
// lock.unlock();
}
});
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
System.out.println(m);
}
}