倒计时锁定学习与实践
本文概览
为了方便于大家更快的知道这篇文章写了啥,这里呢说一点不太重要的废话,这边文章呢,会从 CountDownLatch
基础的知识点介绍起,然后结合一个具体的例子讲解下,最后呢,来一个实际的应用,好了,我们开始吧。另外这篇文章主要是讲的是怎么使用,并不是原理性的。文章的最后附上一个使用 CountDownLatch 实现的文件分片处理案例。
两句话的简介
CountDownLatch
是一个同步辅助操作,允许一个或者多个线程等待,直到其他线程中执行一组操作完成。
说的通俗点就是,CountDownLatch 表示所有的任务完成之后,才可以去进行下一步的执行操作。
流程是这样子的:
- 首先是创建实例
CountDownLatch countDown = new CountDownLatch(2)
- 需要同步的线程执行完之后,计数 -1;
countDown.countDown()
- 需要等待其他线程执行完毕之后,再运行的线程,调用
countDown.await()
实现阻塞同步
简单理解原理
CountDownLatch 是通过一个计数器来实现的,初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后等待的线程就可以恢复执行任务。
核心的方法解析
// 创建实例,给定的数字 2 表示:指定有2个线程同时执行任务
CountDownLatch countDown = new CountDownLatch(2)
// 计数器计数减1
countDownLatch.countDown();
// 等待其他线程执行完毕之后,再运行的线程,需要调用 await 实现阻塞同步
countDownLatch.await()
指定线程数的原理
如果了解过 AQS 的话,应该是知道在 AQS 中有一个 int 类型的 state 值,这个 state 值就是我们这里给定的线程数(比如:我们上面指定的数字 2,那么 state 就等于 2),我们看下源码:
// 这里为了方便于大家看,就省掉了一部分的源码,我们看下我们关注的那些
public class CountDownLatch {
/**
* CountDownLatch 内部维护了一个 Sync 的内部类,继承于 AQS
* CountDownLatch 的同步操作,使用的是 AQS 的state 来表示 count 值
* Uses AQS state to represent count. (源码上面的英文)
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 就是在这个地方, 将我们的 count 值赋值给了 AQS 的state
Sync(int count) {
setState(count); // 这里调用了 AQS 中的方法给 state 赋值
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 这里就是将 count - 1, 当count 值为 0 的时候,就通过 singal 去唤醒线程
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
// CountDownLatch 的构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
}
通过上面的源码我们就可以看到,其实 CountDownLatch
核心还是基于 AQS来实现的。下面,我们看下具体的例子:
示例1
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread( () -> {
try {
System.out.println("子线程:" + Thread.currentThread().getName() + "正在执行");
TimeUnit.SECONDS.sleep(2);
System.out.println("子线程:" + Thread.currentThread().getName() + "执行完毕");
countDownLatch.countDown(); // 线程执行完毕计数器 -1
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread A").start();
new Thread( () -> {
try {
System.out.println("子线程:" + Thread.currentThread().getName() + "正在执行");
TimeUnit.SECONDS.sleep(2);
System.out.println("子线程:" + Thread.currentThread().getName() + "执行完毕");
countDownLatch.countDown(); // 线程执行完毕计数器 -1
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread B").start();
// 主线程阻塞
try {
System.out.println("main 等到 子线程执行完毕");
countDownLatch.await();
System.out.println("子线程已经执行完成");
System.out.println("主线程已经执行完成。。。。。。。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
控制台输出:
main 等到 子线程执行完毕
子线程:thread B正在执行
子线程:thread A正在执行
子线程:thread B执行完毕
子线程:thread A执行完毕
子线程已经执行完成
主线程已经执行完成。。。。。。。。。
示例2
public class CountDownLatchDemo {
private CountDownLatch countDownLatch;
private int start = 10;
private int mid = 100;
private int end = 200;
private volatile int tmpRes1, tmpRes2;
private int add(int start, int end) {
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
}
private int sum(int a, int b) {
return a + b;
}
public void calculate() {
countDownLatch = new CountDownLatch(2);
Thread thread1 = new Thread(() -> {
try {
// 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + " : 开始执行");
tmpRes1 = add(start, mid);
System.out.println(Thread.currentThread().getName() +
" : calculate ans: " + tmpRes1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}, "线程1");
Thread thread2 = new Thread(() -> {
try {
// 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + " : 开始执行");
tmpRes2 = add(mid + 1, end);
System.out.println(Thread.currentThread().getName() +
" : calculate ans: " + tmpRes2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}, "线程2");
Thread thread3 = new Thread(()-> {
try {
// 这里获取到 线程1 与 线程2 中的结果再做操作
System.out.println(Thread.currentThread().getName() + " : 开始执行");
countDownLatch.await();
int ans = sum(tmpRes1, tmpRes2);
System.out.println(Thread.currentThread().getName() +
" : calculate ans: " + ans);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程3");
thread3.start();
thread1.start();
thread2.start();
}
public static void main(String[] args) throws InterruptedException {
CountDownLatchDemo demo = new CountDownLatchDemo();
demo.calculate();
Thread.sleep(1000);
}
}
特别注意
如果使用 ExecutorService
创建线程池,而没有自定义线程池,在不再需要线程池时应该调用 shutdown() 方法。这个方法会使线程池停止接受新任务,并开始关闭现有的线程。如果不调用 shutdown() 方法,线程池中的线程将会一直运行,即使它们在程序代码中不再需要。
如果您没有调用 shutdown() 方法,那么 JVM 将无法正常退出,因为还有线程在运行。这可能会导致内存泄漏和其他问题。因此,最好在使用完线程池后及时调用 shutdown() 方法。
int threadNum = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch((int) blockNum);
executorService.shutdown();
推荐的做法是自定义一个线程池,这样子效率最高,不用频繁的创建线程池,然后还需要销毁线程池。
文件分片处理
这里呢,附上一个使用 CountDownLatch 实现的文件分片处理示例:
import java.io.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 文件分片上传
*
*/
public class FileSplitAndMerge {
public static void main(String[] args) throws IOException, InterruptedException {
// 大文件路径和名称
String filePath = "F:\\test\\largefile\\海神唐三.txt";
File file = new File(filePath);
// 每个小文件的大小
long blockSize = 1024 * 1024; // 1MB
// 小文件存放目录
String dirPath = "F:\\test\\smallfiles\\";
File dir = new File(dirPath);
if (!dir.exists()) {
dir.mkdirs();
}
// 计算要分割成多少块
long fileSize = file.length();
long blockNum = (long) Math.ceil((double) fileSize / blockSize);
// 将大文件分割成多个小文件
for (int i = 0; i < blockNum; i++) {
long startIndex = i * blockSize;
long endIndex = Math.min(startIndex + blockSize, fileSize);
String fileName = dirPath + "part-" + i + ".txt";
splitFile(file, fileName, startIndex, endIndex);
}
// 使用多线程方式将小文件写入到同一目录下,并合并为一个文件
int threadNum = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch((int) blockNum);
for (int i = 0; i < blockNum; i++) {
String fileName = dirPath + "part-" + i + ".txt";
executorService.execute(new WriteTask(fileName, countDownLatch));
}
countDownLatch.await();
// 合并小文件
mergeFiles(blockNum, dirPath, "merged.txt");
// 清理临时创建的小文件
for (int i = 0; i < blockNum; i++) {
String fileName = dirPath + "part-" + i + ".txt";
File smallFile = new File(fileName);
smallFile.delete();
}
executorService.shutdown();
}
// 将大文件分割成小文件
public static void splitFile(File file, String fileName, long startIndex, long endIndex) throws IOException {
FileInputStream fis = new FileInputStream(file);
FileOutputStream fos = new FileOutputStream(fileName);
byte[] buffer = new byte[1024];
int len;
long count = 0;
while ((len = fis.read(buffer)) != -1) {
count += len;
if (count > endIndex) {
int overLen = (int) (count - endIndex);
fos.write(buffer, 0, len - overLen);
break;
} else if (count >= startIndex) {
fos.write(buffer, 0, len);
}
}
fis.close();
fos.close();
}
// 合并小文件
public static void mergeFiles(long blockNum, String dirPath, String mergedFileName) throws IOException {
FileInputStream[] inputArr = new FileInputStream[(int) blockNum];
// 打开所有小文件的输入流
for (int i = 0; i < blockNum; i++) {
String fileName = dirPath + "part-" + i + ".txt";
inputArr[i] = new FileInputStream(fileName);
}
SequenceInputStream inputStream = new SequenceInputStream(Collections.enumeration(Arrays.asList(inputArr)));
FileOutputStream outputStream = new FileOutputStream(dirPath + mergedFileName);
byte[] buffer = new byte[1024];
int len;
while ((len = inputStream.read(buffer)) != -1)
推荐阅读
-
倒计时锁定学习与实践
-
英语学习工具:词典笔的建模创新与工程实践
-
深度学习的力量:手写数学表达式识别与前景代码实践与应用
-
第五项修炼--学习型组织的艺术与实践》。彼得-圣乔治
-
深度学习原理与实践:深度学习在图像分割中的应用
-
[学习笔记Ⅰ] 第 2 章 Stack Overflow 原理与实践(上机操作)
-
深度学习原理与实践:图像透视变形中的深度学习
-
[姿势估计] 实践记录:使用 Dlib 和 mediapipe 进行人脸姿势估计 - 本文重点介绍方法 2):方法 1:基于深度学习的方法:。 基于深度学习的方法:基于深度学习的方法利用深度学习模型,如卷积神经网络(CNN)或递归神经网络(RNN),直接从人脸图像中学习姿势估计。这些方法能够学习更复杂的特征表征,并在大规模数据集上取得优异的性能。方法二:基于二维校准信息估计三维姿态信息(计算机视觉 PnP 问题)。 特征点定位:人脸姿态估计的第一步是通过特征点定位来检测和定位人脸的关键点,如眼睛、鼻子和嘴巴。这些关键点提供了人脸的局部结构信息,可用于后续的姿势估计。 旋转表示:常见的旋转表示方法包括欧拉角和旋转矩阵。欧拉角通过三个旋转角度(通常是俯仰、偏航和滚动)描述头部的旋转姿态。旋转矩阵是一个 3x3 矩阵,表示头部从一个坐标系到另一个坐标系的变换。 三维模型重建:根据特征点的定位结果,三维人脸模型可用于姿势估计。通过将人脸的二维图像映射到三维模型上,可以估算出人脸的旋转和平移信息。这就需要建立人脸的三维模型,然后通过优化方法将模型与特征点对齐,从而获得姿势估计结果。 特征点定位 特征点定位是用于检测人脸关键部位的五官基础部分,还有其他更多的特征点表示方法,大家可以参考我上一篇文章中介绍的特征点检测方案实践:人脸校正二次定位操作来解决人脸校正的问题,客户在检测关键点的代码上略有修改,坐标转换部分客户见上图 def get_face_info(image). img_copy = image.copy image.flags.writeable = False image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) results = face_detection.process(image) # 在图像上绘制人脸检测注释。 image.flags.writeable = True image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) box_info, facial = None, None if results.detections: for detection in results. for detection in results.detections: mp_drawing.Drawing.detection = 无 mp_drawing.draw_detection(image, detection) 面部 = detection.location_data.relative_keypoints 返回面部 在上述代码中,返回的数据是五官(6 个关键点的坐标),这是用 mediapipe 库实现的,下面我们可以尝试用另一个库:dlib 来实现。 使用 dlib 使用 Dlib 库在 Python 中实现人脸关键点检测的步骤如下: 确保已安装 Dlib 库,可使用以下命令: pip install dlib 导入必要的库: 加载 Dlib 的人脸检测器和关键点检测器模型: 读取图像并将其灰度化: 使用人脸检测器检测图像中的人脸: 对检测到的人脸进行遍历,并使用关键点检测器检测人脸关键点: 显示绘制了关键点的图像: 以下代码将参数 landmarks_part 添加到要返回的关键点坐标中。
-
深度学习神经网络(CNN RNN GAN)算法原理与实践
-
unity 着色器学习与实践日记(一)