欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

记录 gRpc 流操作(jedis 版本)

最编程 2024-09-30 15:51:42
...
@Override public void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) { CacheKey cacheKey= AppKey.appReport; String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC) .replace("{APPTYPE}", "0"); RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime()); ResultProto.Builder builder = ResultProto.newBuilder(); builder.setCode(ResultType.SUCCESS); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } @Override public void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) { MQListener mqListener=new MQListener(responseObserver); try { CacheKey cacheKey= AppKey.appReport; String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC) .replace("{APPTYPE}","0"); RedissonFactory.getRedis().subscribe(mqListener,key); } catch (Exception e) { } finally { responseObserver.onCompleted(); } } // 消息监听响应 public class MQListener extends JedisPubSub { public MQListener(StreamObserver<StringValue> responseObserver) { _responseObserver=responseObserver; } private StreamObserver<StringValue> _responseObserver; // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { if(!StringUtil.isNullOrEmpty(message)){ StringValue.Builder builder = StringValue.newBuilder(); builder.setValue(message); _responseObserver.onNext(builder.build()); } } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { ... } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { ... } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { ... } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { ... } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { ... } }

推荐阅读