Java 多线程流水线模式
流水线
什么是流水线?
在计算机中,对于一条具体的指令执行过程,通常可以分为五个部分:取指令,指令译码,取操作数,运算 (ALU),写结果。 前三步由指令控制器完成,后两步则由运算器完成。 按照传统的方式,所有指令顺序执行,那么先是指令控制器工作,完成第一条指令的前三步,然后运算器工作,完成后两步,第一条指令执行完毕。然后第二条指令又是先指令控制器工作,完成前三步,然后运算器,完成第二条指令的后两部…… 传统方式有个很大的缺点就是,指令只能一条一条地执行,仔细分析一下就会发现,这种方式存在很大的资源浪费:即同一时刻,要么指令控制器工作,运算器闲着;要么运算器工作,指令控制器闲着。这样一方面资源得不到有效利用,另一方面就是工作效率很低。 流水线的出现就是为了解决这个问题,下面我们来看一下流水线的工作模式: 假设有两个指令INS_A和INS_B,它们的执行分别要经过A,B,C,D四个过程,假设A到D四个过程分别由四个硬件元件完成。按照传统的方式,它们的流程如下: 这种方式的缺点就是,只能一条指令一条指令的执行,并且当指令执行到过程B的时候,处理过程A和CD的元件是处于空闲状态的。
流水线方式如下:
说明一下,通过流水线的方式,当INS_A指令执行完过程A之后,处理过程A的元件就空闲了,此时我们就开始处理指令INS_B的A阶段,这样一来,INS_B指令只需要等到INS_A的A过程执行完成就可以继续执行了,这样以来就在很大程度上提高了效率。
流水线中断
使用流水线能够很大程度提高程序执行效率,这点是毋庸置疑的,但是,在系统中,每当引入一个新的模式或者组件的时候,我们就需要对应处理该模式或者组件所带来的问题,那么引入流水线的一个很大的问题就是流水线中断。
产生中断一般是由两个原因造成的,一个是“相关”,一个是“条件转移”。 相关:指的是一条指令的执行需要依赖前一条指令的执行结果。拿上面的例子,假设INS_B指令在执行过程C的时候,需要使用INS_A指令过程D的结果,那么指令INS_B在执行到C的时候,由于A指令的D过程还没有执行完成,所以此时INS_B就不能继续执行,否则就会拿到错误结果。所以此时就需要将流水线中断,从而等待指令INS_A的D过程的结束。如下图所示:
条件转移:如果一条指令是条件转移指令,即指令执行结果是根据条件发生变化的,那么系统就不清楚下面应该执行哪一条指令,这时就必须等第一条指令的判断结果出来才能执行第二条指令。条件转移所造成的流水线停顿甚至比相关还要严重的多。
所以:流水线虽然能够提高整体运行效率,但是在某些情况下,需要中断流水线,以保证程序运行正确,而流水线的中断又会降低系统运行效率。
流水线在Java中的应用
目前在java提供的实现中,并没有用到流水线模式,但是我们知道JVM虚拟机在运行时对我们的代码进行的指令重排序,目的就是为了减少流水线的中断,从而提高流水线运行效率。
在实际的项目开发中,我们也可以学习流水线模式的思想,将业务流程拆分成多个子流程,然后采用流水线的方式进行,以减少程序等待。 比如有一个操作涉及到1)查询数据库,2)本地处理数据,3)远程RPC通知结果 三个过程,其中过程1和过程3都涉及到网络IO操作,所以整体运行是比较耗时的。但是我们可以采用流水线作业的方式,这样就可以充分利用三部分的资源。提高系统的整体运行效率。
Commons Pipeline
apache基金会下的一个项目,提供了流水线操作的框架。参考这里 PS:这个框架貌似好久都没有维护过了,不知道为啥。。
自己实现一个简单的流水线
在黄文海先生的《Java多线程实战指南(设计模式篇)》书中,详细讲解了流水线的java实现,另外,可以参考它书中的源码,点击这里
我也简单写了一个,假设有一个任务需要分为三步执行,那么可以将每一步抽象成一个任务阶段(TaskStage),然后每个任务阶段都被提交到队列中,然后三个线程分别处理三个队列中的数据。 TaskStage
package pipeline;
/**
* 任务阶段,比如一个任务分为三个阶段,每一个阶段都有一个TaskStage与之对应
*/
public interface TaskStage {
/**
* 处理任务,返回值为该任务阶段的下一个阶段
*/
public TaskStage process();
}
三个具体的任务阶段: 数据库查询任务阶段
package pipeline;
/**
* 查询数据库阶段
*/
public class DBQueryTaskStage implements TaskStage {
@Override
public TaskStage process() {
String result = queryDB();
System.out.println(result);
return new CalculateTaskStage(result);
}
/**
* 模拟查询数据库过程
*
* @return
*/
private String queryDB() {
try {
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "DBQueryTaskStage";
}
}
计算处理结果任务阶段
package pipeline;
public class CalculateTaskStage implements TaskStage {
private String data;
public CalculateTaskStage(String data) {
super();
this.data = data;
}
@Override
public TaskStage process() {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = calculate(data);
System.out.println(result);
return new RpcTaskStage(result);
}
private String calculate(String data2) {
return data2 + ",CaculateTaskStage";
}
}
远程RPC调用任务阶段
package pipeline;
public class RpcTaskStage implements TaskStage {
private String data;
public RpcTaskStage(String data) {
super();
this.data = data;
}
@Override
public TaskStage process() {
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(data + ",RpcTaskStage");
return null;
}
}
接下来需要有一个流水线类,用于任务调度
package pipeline;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* 整条流水线
*/
public class Pipeline {
// 用于存放任务的第一阶段
private BlockingDeque<TaskStage> firstStageQueue = new LinkedBlockingDeque<TaskStage>();
// 用于存储任务的第二阶段
private BlockingDeque<TaskStage> secondStageQueue = new LinkedBlockingDeque<TaskStage>();
// 用于存储任务的第三阶段
private BlockingDeque<TaskStage> thirdStageQueue = new LinkedBlockingDeque<TaskStage>();
public Pipeline() {
super();
// 启动三个线程,分别处理三个阶段的任务
new Thread() {
@Override
public void run() {
while (true) {
TaskStage taskStage = null;
try {
taskStage = firstStageQueue.poll(80, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if (taskStage != null) {
TaskStage nextStage = taskStage.process();
try {
secondStageQueue.put(nextStage);
} catch (InterruptedException e) {
}
} else { // 取出来的数据为空,则终止(这种情况可能会有问题,这里做简单演示)
break;
}
}
}
}.start();
new Thread() {
@Override
public void run() {
while (true) {
TaskStage taskStage = null;
try {
taskStage = secondStageQueue.poll(80, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if (taskStage != null) {
TaskStage nextStage = taskStage.process();
try {
thirdStageQueue.put(nextStage);
} catch (InterruptedException e) {
}
} else {
break;
}
}
}
}.start();
new Thread() {
@Override
public void run() {
while (true) {
TaskStage taskStage = null;
try {
taskStage = thirdStageQueue.poll(80, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (taskStage != null) {
taskStage.process();
} else {
break;
}
}
}
}.start();
}
public void process(TaskStage firstStage) {
try {
firstStageQueue.put(firstStage);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
主程序入口:
package pipeline;
import java.util.concurrent.locks.LockSupport;
public class Main {
public static void main(String[] args) {
Pipeline line = new Pipeline();
for (int i = 0; i < 10; i++) {
line.process(new DBQueryTaskStage());
}
LockSupport.park();
}
}
PS:上面的程序只是简单演示,会有一些线程调度的问题。
参考资料
- 流水线
- 《Java多线程实战指南(设计模式篇)》黄文海
推荐阅读
-
java 设计模式在行动,拟人化万物的适配器模式_java 平庸的实现模式(1)
-
35 岁实现财务*,腾讯程序员手握2300万提前退休?-1000万房产、1000万腾讯股票、加上300万的现金,一共2300万的财产。有网友算了一笔账,假设1000万的房产用于自住,剩下1300万资产按照平均税后20-50万不等进行计算,大约花上26-60年左右的时间才能赚到这笔钱。也就是说,普通人可能奋斗一辈子,才能赚到这笔钱。在很多人还在为中年危机而惶惶不可终日的时候,有的人的35岁,就已经安全着陆,试问哪个打工人不羡慕?但问题是有这样财富积累必然有像样的实力做靠山。没有人可以不劳而获。 看到这里,肯定有人说,那么对于普通人来说,卷可能真就成了唯一的出路。但是卷也有轻松的卷,“偷懒”的卷法,对于程序员而言,刨除掉一时无法改掉的开会传统占用的大部分时间,如何把有限的时间和精力放在真正重要的架构设计、需求设计上,而不是重复的造*,编码、改bug、手动测试。因此在科技改变生活的今天,学会使用AI工具成为程序员们的必备技能。 以全栈式全自动的软件开发工具飞算SoFlu软件机器人为例,作为全球首款面向微服务架构设计和最佳实践的软件机器人,SoFlu软件机器人改变了原来手工编码的作业模式,通过可视化拖拽方式以及参数配置就能实现等同于编写复杂代码的业务逻辑,在设计业务逻辑时就完成了微服务应用开发,做到“业务即图,图即代码”,实现“软件开发,十倍提效”。 而作为飞算SoFlu软件机器人的一个重要组成部分,FuncGPT(慧函数)支持所有类型函数创建。通过自然语言描述Java函数需求,实时生成高质量、高可读性的Java函数代码。生成代码可直接复制到IDEA,或一键导入Java全自动开发工具函数库。 FuncGPT(慧函数)具有五大核心能力: ● 自然语言:通过自然语言即可生成函数,降低软件开发门槛。 ● 秒级函数开发:全自动秒级开发函数,秒级完成,效率千倍提升。 ● 一站式函数开发应用:函数生成即可直接应用,效率、安全有保障。 ● 质量可靠:生成的函数代码符合业界规范、具备良好的可读性,符合最优解。 ● 灵活生成:可以根据开发人员的具体需求生成并支持调整和修改。 以“将文件流转换为File对象”这个需求为例,以下是FuncGPT(慧函数)秒级生成的代码: / 类名:FileConverter // 函数名:convertStreamToFile // 函数功能:将文件流转换为File对象 // POM依赖包:无 import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; public class FileConverter { /** * 将文件流转换为File对象 * @param inputStream 文件流 * @param filePath 文件路径 * @param fileName 文件名 * @return 转换后的File对象 * @throws IOException 如果发生I/O异常 */ public static File convertStreamToFile(InputStream inputStream, String filePath, String fileName) throws IOException { File file = new File(filePath + File.separator + fileName); // 创建File对象 try (FileOutputStream outputStream = new FileOutputStream(file)) { // 创建文件输出流 byte buffer = new byte[1024]; int bytesRead; while ((bytesRead = inputStream.read(buffer)) != -1) { // 从文件流读取数据并写入文件 outputStream.write(buffer, 0, bytesRead); } } return file; // 返回转换后的File对象 } } // 函数示例 // 将文件流转换为File对象示例 // 入参:inputStream,文件流 // 入参:filePath,文件路径 // 入参:fileName,文件名 // 出参:file,转换后的File对象 // 调用示例: // InputStream inputStream = new FileInputStream("example.txt"); // String filePath = "C:\\Users\\User\\Documents"; // String fileName = "example.txt"; // File file = FileConverter.convertStreamToFile(inputStream, filePath, fileName); // System.out.println(file.getAbsolutePath); // 输出结果:例如,将文件流转换为File对象后,文件的绝对路径为:C:\Users\User\Documents\example.txt // 则输出结果为:C:\Users\User\Documents\example.txt 通过分析,不难发现以上代码:
-
Java23 设计模式 - 策略模式的行为模式
-
用通俗易懂的语言理解 Java 设计模式的指南
-
原子类如何用于 Java 函数的并发和多线程?
-
Java23 设计模式 - 备忘录模式的行为模式
-
一种结构设计模式,允许在对象中动态添加新行为。它通过创建一个封装器来实现这一目的,即把对象放入一个装饰器类中,然后把这个装饰器类放入另一个装饰器类中,以此类推,形成一个封装器链。这样,我们就可以在不改变原始对象的情况下动态添加新行为或修改原始行为。 在 Java 中,实现装饰器设计模式的步骤如下: 定义一个接口或抽象类作为被装饰对象的基类。 公共接口 Component { void operation; } } 在本例中,我们定义了一个名为 Component 的接口,该接口包含一个名为 operation 的抽象方法,该方法定义了被装饰对象的基本行为。 定义一个实现基类方法的具体装饰对象。 公共类 ConcreteComponent 实现 Component { public class ConcreteComponent implements Component { @Override public void operation { System.out.println("ConcreteComponent is doing something...") ; } } 定义一个抽象装饰器类,该类继承于基类,并将装饰对象作为一个属性。 公共抽象类装饰器实现组件 { protected Component 组件 public Decorator(Component component) { this.component = component; } } @Override public void operation { component.operation; } } } 在这个示例中,我们定义了一个名为 Decorator 的抽象类,它继承了 Component 接口,并将被装饰对象作为一个属性。在操作方法中,我们调用了被装饰对象上的同名方法。 定义一个具体的装饰器类,继承自抽象装饰器类并实现增强逻辑。 公共类 ConcreteDecoratorA extends Decorator { public ConcreteDecoratorA(Component 组件) { super(component); } } public void operation { super.operation System.out.println("ConcreteDecoratorA 正在添加新行为......") ; } } 在本例中,我们定义了一个名为 ConcreteDecoratorA 的具体装饰器类,它继承自装饰器抽象类,并实现了操作方法的增强逻辑。在操作方法中,我们首先调用被装饰对象上的同名方法,然后添加新行为。 使用装饰器增强被装饰对象。 公共类 Main { public static void main(String args) { Component 组件 = new ConcreteComponent; component = new ConcreteDecoratorA(component); 组件操作 } } 在这个示例中,我们首先创建了一个被装饰对象 ConcreteComponent,然后通过 ConcreteDecoratorA 类创建了一个装饰器,并将被装饰对象作为参数传递。最后,调用装饰器的操作方法,实现对被装饰对象的增强。 使用场景 在 Java 中,装饰器模式被广泛使用,尤其是在 I/O 中。Java 中的 I/O 库使用装饰器模式实现了不同数据流之间的转换和增强。 让我们打开文件 a.txt,从中读取数据。InputStream 是一个抽象类,FileInputStream 是专门用于读取文件流的子类。BufferedInputStream 是一个支持缓存的数据读取类,可以提高数据读取的效率,具体代码如下: @Test public void testIO throws Exception { InputStream inputStream = new FileInputStream("C:/bbb/a.txt"); // 实现包装 inputStream = new BufferedInputStream(inputStream); byte bytes = new byte[1024]; int len; while((len = inputStream.read(bytes)) != -1){ System.out.println(new String(bytes, 0, len)); } } } } 其中 BufferedInputStream 对读取数据进行了增强。 这样看来,装饰器设计模式和代理模式似乎有点相似,接下来让我们讨论一下它们之间的区别。 第三,与代理模式的区别: 代理模式的目的是控制对对象的访问,它在对象外部提供一个代理对象来控制对原对象的访问。代理对象和原始对象通常实现相同的接口或继承相同的类,以确保两者可以相互替换。 装饰器模式的目的是动态增强对象的功能,而这是通过对象内部的包装器来实现的。在装饰器模式中,装饰器类和被装饰对象通常实现相同的接口或继承自相同的类,以确保两者可以相互替代。装饰器模式也被称为封装器模式。 在代理模式中,代理类附加了与原类无关的功能。
-
03-JAVA 设计模式 - 策略模式
-
03-JAVA 设计模式 - 状态模式
-
2024 Java 面试准备 - 多线程 (1)