欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

倒计时锁定学习与实践

最编程 2024-04-29 19:04:38
...

本文概览

为了方便于大家更快的知道这篇文章写了啥,这里呢说一点不太重要的废话,这边文章呢,会从 CountDownLatch 基础的知识点介绍起,然后结合一个具体的例子讲解下,最后呢,来一个实际的应用,好了,我们开始吧。另外这篇文章主要是讲的是怎么使用,并不是原理性的。文章的最后附上一个使用 CountDownLatch 实现的文件分片处理案例。

两句话的简介

CountDownLatch 是一个同步辅助操作,允许一个或者多个线程等待,直到其他线程中执行一组操作完成。
说的通俗点就是,CountDownLatch 表示所有的任务完成之后,才可以去进行下一步的执行操作。

流程是这样子的:

  1. 首先是创建实例 CountDownLatch countDown = new CountDownLatch(2)
  2. 需要同步的线程执行完之后,计数 -1; countDown.countDown()
  3. 需要等待其他线程执行完毕之后,再运行的线程,调用 countDown.await() 实现阻塞同步

简单理解原理

CountDownLatch 是通过一个计数器来实现的,初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后等待的线程就可以恢复执行任务。

核心的方法解析

// 创建实例,给定的数字 2 表示:指定有2个线程同时执行任务
CountDownLatch countDown = new CountDownLatch(2)

// 计数器计数减1
countDownLatch.countDown();

// 等待其他线程执行完毕之后,再运行的线程,需要调用 await 实现阻塞同步
countDownLatch.await()

指定线程数的原理

如果了解过 AQS 的话,应该是知道在 AQS 中有一个 int 类型的 state 值,这个 state 值就是我们这里给定的线程数(比如:我们上面指定的数字 2,那么 state 就等于 2),我们看下源码:

// 这里为了方便于大家看,就省掉了一部分的源码,我们看下我们关注的那些
public class CountDownLatch {

	
	/**
	 * CountDownLatch 内部维护了一个 Sync 的内部类,继承于 AQS
     * CountDownLatch 的同步操作,使用的是 AQS  的state 来表示 count 值
     * Uses AQS state to represent count. (源码上面的英文)
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

		// 就是在这个地方, 将我们的 count 值赋值给了 AQS 的state
        Sync(int count) {
            setState(count);	// 这里调用了 AQS 中的方法给 state 赋值 
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 这里就是将 count - 1, 当count 值为 0 的时候,就通过 singal 去唤醒线程
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    // CountDownLatch 的构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
}

通过上面的源码我们就可以看到,其实 CountDownLatch 核心还是基于 AQS来实现的。下面,我们看下具体的例子:

示例1
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountDownLatchDemo {

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread( () -> {
            try {
                System.out.println("子线程:" + Thread.currentThread().getName() + "正在执行");
                TimeUnit.SECONDS.sleep(2);
                System.out.println("子线程:" + Thread.currentThread().getName() + "执行完毕");

                countDownLatch.countDown(); // 线程执行完毕计数器 -1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "thread A").start();


        new Thread( () -> {
            try {
                System.out.println("子线程:" + Thread.currentThread().getName() + "正在执行");
                TimeUnit.SECONDS.sleep(2);
                System.out.println("子线程:" + Thread.currentThread().getName() + "执行完毕");

                countDownLatch.countDown(); // 线程执行完毕计数器 -1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "thread B").start();


        // 主线程阻塞
        try {
            System.out.println("main 等到 子线程执行完毕");
            countDownLatch.await();
            System.out.println("子线程已经执行完成");

            System.out.println("主线程已经执行完成。。。。。。。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

控制台输出:

main 等到 子线程执行完毕
子线程:thread B正在执行
子线程:thread A正在执行
子线程:thread B执行完毕
子线程:thread A执行完毕
子线程已经执行完成
主线程已经执行完成。。。。。。。。。
示例2
public class CountDownLatchDemo {
    private CountDownLatch countDownLatch;
 
    private int start = 10;
    private int mid = 100;
    private int end = 200;
 
    private volatile int tmpRes1, tmpRes2;
 
    private int add(int start, int end) {
        int sum = 0;
        for (int i = start; i <= end; i++) {
            sum += i;
        }
        return sum;
    }
 
 
    private int sum(int a, int b) {
        return a + b;
    }
 
    public void calculate() {
        countDownLatch = new CountDownLatch(2);
 
        Thread thread1 = new Thread(() -> {
            try {
                // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + " : 开始执行");
                tmpRes1 = add(start, mid);
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + tmpRes1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }, "线程1");
 
        Thread thread2 = new Thread(() -> {
            try {
                // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + " : 开始执行");
                tmpRes2 = add(mid + 1, end);
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + tmpRes2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }, "线程2");
 
 
        Thread thread3 = new Thread(()-> {
            try {
                // 这里获取到 线程1 与 线程2 中的结果再做操作
                System.out.println(Thread.currentThread().getName() + " : 开始执行");
                countDownLatch.await();
                int ans = sum(tmpRes1, tmpRes2);
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + ans);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程3");
 
        thread3.start();
        thread1.start();
        thread2.start();
    }
 
 
    public static void main(String[] args) throws InterruptedException {
        CountDownLatchDemo demo = new CountDownLatchDemo();
        demo.calculate();
 
        Thread.sleep(1000);
    }
}

特别注意

如果使用 ExecutorService 创建线程池,而没有自定义线程池,在不再需要线程池时应该调用 shutdown() 方法。这个方法会使线程池停止接受新任务,并开始关闭现有的线程。如果不调用 shutdown() 方法,线程池中的线程将会一直运行,即使它们在程序代码中不再需要。
如果您没有调用 shutdown() 方法,那么 JVM 将无法正常退出,因为还有线程在运行。这可能会导致内存泄漏和其他问题。因此,最好在使用完线程池后及时调用 shutdown() 方法。

int threadNum = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch((int) blockNum);

executorService.shutdown();

推荐的做法是自定义一个线程池,这样子效率最高,不用频繁的创建线程池,然后还需要销毁线程池。

文件分片处理

这里呢,附上一个使用 CountDownLatch 实现的文件分片处理示例:

import java.io.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 文件分片上传
 *
 */
public class FileSplitAndMerge {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 大文件路径和名称
        String filePath = "F:\\test\\largefile\\海神唐三.txt";
        File file = new File(filePath);

        // 每个小文件的大小
        long blockSize = 1024 * 1024; // 1MB

        // 小文件存放目录
        String dirPath = "F:\\test\\smallfiles\\";
        File dir = new File(dirPath);
        if (!dir.exists()) {
            dir.mkdirs();
        }

        // 计算要分割成多少块
        long fileSize = file.length();
        long blockNum = (long) Math.ceil((double) fileSize / blockSize);

        // 将大文件分割成多个小文件
        for (int i = 0; i < blockNum; i++) {
            long startIndex = i * blockSize;
            long endIndex = Math.min(startIndex + blockSize, fileSize);
            String fileName = dirPath + "part-" + i + ".txt";

            splitFile(file, fileName, startIndex, endIndex);
        }

        // 使用多线程方式将小文件写入到同一目录下,并合并为一个文件
        int threadNum = Runtime.getRuntime().availableProcessors() * 2;
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        CountDownLatch countDownLatch = new CountDownLatch((int) blockNum);

        for (int i = 0; i < blockNum; i++) {
            String fileName = dirPath + "part-" + i + ".txt";
            executorService.execute(new WriteTask(fileName, countDownLatch));
        }
        countDownLatch.await();

        // 合并小文件
        mergeFiles(blockNum, dirPath, "merged.txt");

        // 清理临时创建的小文件
        for (int i = 0; i < blockNum; i++) {
            String fileName = dirPath + "part-" + i + ".txt";
            File smallFile = new File(fileName);
            smallFile.delete();
        }

        executorService.shutdown();
    }


    // 将大文件分割成小文件
    public static void splitFile(File file, String fileName, long startIndex, long endIndex) throws IOException {
        FileInputStream fis = new FileInputStream(file);
        FileOutputStream fos = new FileOutputStream(fileName);

        byte[] buffer = new byte[1024];
        int len;
        long count = 0;
        while ((len = fis.read(buffer)) != -1) {
            count += len;
            if (count > endIndex) {
                int overLen = (int) (count - endIndex);
                fos.write(buffer, 0, len - overLen);
                break;
            } else if (count >= startIndex) {
                fos.write(buffer, 0, len);
            }
        }
        fis.close();
        fos.close();
    }


    // 合并小文件
    public static void mergeFiles(long blockNum, String dirPath, String mergedFileName) throws IOException {
        FileInputStream[] inputArr = new FileInputStream[(int) blockNum];
        // 打开所有小文件的输入流
        for (int i = 0; i < blockNum; i++) {
            String fileName = dirPath + "part-" + i + ".txt";
            inputArr[i] = new FileInputStream(fileName);
        }

        SequenceInputStream inputStream = new SequenceInputStream(Collections.enumeration(Arrays.asList(inputArr)));
        FileOutputStream outputStream = new FileOutputStream(dirPath + mergedFileName);

        byte[] buffer = new byte[1024];
        int len;
        while ((len = inputStream.read(buffer)) != -1) 
						

上一篇: 大数据时代的电子商务风险控制简介||电子商务数据 API 接口

下一篇: 谷歌支付将于 2024 年 6 月 4 日停止使用 用户将迁移至谷歌钱包

推荐阅读