记录一次gRpc流式操作(jedis版)
使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)
gRpc协议类定义
service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}
服务端写法
@Override
public void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {
CacheKey cacheKey= AppKey.appReport;
String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC)
.replace("{APPTYPE}", "0");
RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());
ResultProto.Builder builder = ResultProto.newBuilder();
builder.setCode(ResultType.SUCCESS);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override
public void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {
MQListener mqListener=new MQListener(responseObserver);
try {
CacheKey cacheKey= AppKey.appReport;
String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC)
.replace("{APPTYPE}","0");
RedissonFactory.getRedis().subscribe(mqListener,key);
} catch (Exception e) {
}
finally {
responseObserver.onCompleted();
}
}
// 消息监听响应
public class MQListener extends JedisPubSub {
public MQListener(StreamObserver<StringValue> responseObserver)
{
_responseObserver=responseObserver;
}
private StreamObserver<StringValue> _responseObserver;
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
if(!StringUtil.isNullOrEmpty(message)){
StringValue.Builder builder = StringValue.newBuilder();
builder.setValue(message);
_responseObserver.onNext(builder.build());
}
}
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
...
}
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
...
}
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
...
}
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
...
}
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
...
}
}
客户端写法
public static void receiveFacebookAndroidMsg() {
try {
log.info("facebook android msg");
// 接收消息
StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue msgProto) {
try {
log.info("facebook android msg 接收到消息: {}", msgProto.getValue());
JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());
...
} catch (Exception e) {
log.error("facebook ios msg 消费失败{}", e.getMessage());
// 发给mq重新消费
...
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error occurred: " + throwable.getMessage());
log.info("facebook android Error occurred: {}", throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Stream completed.");
log.info("facebook android Stream completed.");
}
};
log.info("接收fb android msg 开始");
ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);
log.info("接收fb android msg 成功");
} catch (Exception e) {
log.info("出错了");
}
}
原文地址:https://blog.csdn.net/laozengsky/article/details/142656685
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!