欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

重新学习 Java 高并发 "线程间如何协作(父子线程如何优雅地交互)--两个线程如何协作

最编程 2024-07-03 22:55:56
...

这里我想拿RocketMQ中的一个场景与大家来分享,通常这也是两个线程进行协作十分经典的使用示例


在RocketMQ消费端,PUSH模式消费者在消费之前需要提前做好准备:


  • 队列负载均衡
  • 消息拉取


这里简单介绍一下相关实现:在RocketMQ消费者PUSH模式启动后,消费组中的成员发生了变化,需要进行重平衡,即进行队列重新负载,主要的依据是查询当前队列的总个数与当前消费者个数,然后使用负载均衡算法(例如平均分配),各个消费者获取分配后的队列后,依次向Broker服务端拉取消息,然后提交消费线程池消费。


单一指责设计原则出发,根据负载均衡、消息拉取两个不通的职责来看,需要设计两个线程分别处理,并且这两个线程必须相互协作,因为负责队列负载的线程需要指引消息拉取线程具体拉取哪些队列。

网络异常,图片无法展示
|

从上述流程图中可以看出,RocketMQ的给出的实践要点如下:


  • 引入一个阻塞队列(多线程安全),充当任务队列,两个线程都可以访问。
  • PullMessageService通过调用阻塞队列的take()方法获取任务,如果阻塞队列(任务队列)中没有待处理任务,线程则阻塞,非常关键,避免线程空轮询,造成CPU飙升。
  • RebalanceService线程的职责是根据任务负载算法生成任务,放入任务队列中,从而能够唤醒PullMessageService线程。

大家有没有发现,这不就是典型的生产者-消费者模式吗?没错,这就是。


介绍了线程之间如何协作,接下来再介绍一下如何复用一个线程。


我们还是以RocketMQ中PullMessageService线程的实现为例进行阐述。

网络异常,图片无法展示
|

实现经典步骤:


  • 使用 while() 进行循环,这里通常有两种实现方法:
  • while(Thread.interrupted()),即检测当前线程的中断状态,如果要停止该线程,通常是在其他线程中调用要中断线程的interrupt()方法。
  • while(!this.stoped) stoped会使用volatile关键字首饰,如果要停止该线程,并且需要暴露一个方法,用于将stoped设置为true,从而跳出循环。
  • 在while方法中从一个阻塞队列中获取待执行的任务,没有任务执行时当前线程阻塞,不消耗CPU资源。


推荐阅读