记录 gRpc 流操作
最编程
2024-10-01 14:13:46
...
package com.mykkhw.mykkhw_data_mq.service.grpc;
import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultType;
import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto;
import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.redisson.api.RBlockingQueue;
import org.springframework.util.StringUtils;
public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase {
@Override
public void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) {
RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic());
ResultProto.Builder builder = ResultProto.newBuilder();
builder.setCode(ResultType.SUCCESS);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override
public void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) {
try {
RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName());
// 循环处理消息
while (!Thread.currentThread().isInterrupted()) {
// 阻塞式获取消息,没有消息时线程会等待
String message = queue.take();
if(StringUtils.hasText(message)){
AdMsgProto.Builder builder = AdMsgProto.newBuilder();
builder.setMsg(message);
...
responseObserver.onNext(builder.build());
}
}
} catch (Exception e) {
Thread.currentThread().interrupt(); // 重新设置线程的中断标志
}
responseObserver.onCompleted();
}
}
//mq依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
<scope>compile</scope>
</dependency>
上一篇: 全填充透明背景二维码生成器