欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

记录 gRpc 流操作

最编程 2024-10-01 14:13:46
...
package com.mykkhw.mykkhw_data_mq.service.grpc; import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto; import com.mykkhw.mykkhw_data_protocols.Base.ResultProto; import com.mykkhw.mykkhw_data_protocols.Base.ResultType; import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto; import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc; import io.grpc.stub.StreamObserver; import org.redisson.api.RBlockingQueue; import org.springframework.util.StringUtils; public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase { @Override public void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) { RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic()); ResultProto.Builder builder = ResultProto.newBuilder(); builder.setCode(ResultType.SUCCESS); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } @Override public void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) { try { RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName()); // 循环处理消息 while (!Thread.currentThread().isInterrupted()) { // 阻塞式获取消息,没有消息时线程会等待 String message = queue.take(); if(StringUtils.hasText(message)){ AdMsgProto.Builder builder = AdMsgProto.newBuilder(); builder.setMsg(message); ... responseObserver.onNext(builder.build()); } } } catch (Exception e) { Thread.currentThread().interrupt(); // 重新设置线程的中断标志 } responseObserver.onCompleted(); } } //mq依赖 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.15.0</version> <scope>compile</scope> </dependency>