脚本宝典收集整理的这篇文章主要介绍了Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
上代码, 包括Pulsar的参数类, Pulsar Client, PRoducer和Consumer
================Pulsar参数类=====================@Data@RefreshScoPE@component@configurationProperties(prefix = "tdmq.pulsar")public class PulsarProperties { /** * 接入地址 */ private String serviceurl; /** * 命名空间tdc */ private String tdcnamespace; /** * 角色tdc的token */ private String tdcToken; /** * 集群name */ private String cluster;
/** * topicMap */private Map<String, String> topicMap;/** * 订阅 */private Map<String, String> subMap;/** * 开关 on:Consumer可用 ||||| off:Consumer断路 */private String onOff;
}==================PulsarClient=======================
@Slf4j@Configuration@EnableConfigurationProperties(PulsarProperties.class)public class PulsarConfig { @Autowired PulsarProperties pulsarProperties; @RefreshScope @Bean public PulsarClient getPulsarClient() { try { return PulsarClient.builder() .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken())) .serviceUrl(pulsarProperties.getServiceurl()) .build(); } catch (PulsarClientException e) { LOG.error("初始化Pulsar Client失败", e); } throw new RuntimeException("初始化Pulsar Client失败"); }}===========Producer&Consumer&发送消息的工具类=================
@Slf4j@Componentpublic class PulsarUtils { @Autowired PulsarProperties pulsarProperties; @Autowired PulsarClient client; @Autowired AudITCommentResultListener<String> auditCommentResultListener; @Autowired AuditReplyResultListener<String> auditReplyResultListener; @Autowired AuditResourceResultListener<String> auditResourceResultListener; /** * 创建一个生产者 * * @param topic topic name * @return Producer生产者 */ public Producer<byte[]> createProducer(String topic) { try { return client.newProducer() .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic) .batchingMaxpublishDelay(10, TimeUnit.MILLISECONDS) .sendTimeout(10, TimeUnit.SECONDS) .blockIfQueueFull(true) .create(); } catch (PulsarClientException e) { log.error("初始化Pulsar Producer失败", e); } throw new RuntimeException("初始化Pulsar Producer失败"); } /** * 创建一个消费者 * * @param topic topic name * @param subscription sub name * @param messageListener MessageListener的自定义实现类 * @return Consumer消费者 */ public Consumer createConsumer(String topic, String subscription, MessageListener messageListener) { try { return client.newConsumer() .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic) .subscriptionName(subscription) .ackTimeout(10, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Shared) .messageListener(messageListener) .subscribe() ; } catch (PulsarClientException e) { log.error("初始化Pulsar Consumer失败", e); } throw new RuntimeException("初始化Pulsar Consumer失败"); } /** * 异步send一条msg * * @param message 消息体 */ public void sendMessage(String message, Producer<byte[]> producer) { producer.sendAsync(message.getBytes()).thenAccept(msgId -> { log.info("消息发送成功, MessageiD为{}", msgId); }); } /** * 同步发送一条msg * * @param message 消息体 * @param producer 生产者实例 */ public void sendOnce(String message, Producer<byte[]> producer) throws PulsarClientException { MessageId send = producer.send(message.getBytes()); log.info("消息成功发送, MessageId {},message {}", send, message); } //-----------consumer----------- @RefreshScope @Bean(name = "audit-resource-result-topic") public Consumer getAuditResourceResultTopicConsumer() { return this.createConsumer(pulsarProperties.getTopicMap().get("audit-resource-result-topic"), pulsarProperties.getSubMap().get("resource-sub-audit-resource-result"), auditResourceResultListener); } //-----------producer----------- @RefreshScope @Bean(name = "resource-publish-topic") public Producer<byte[]> getResourcePublishTopicProducer() { return this.createProducer(pulsarProperties.getTopicMap().get("resource-publish-topic")); }}=====================AbstractListener===============================
@Slf4j@Componentpublic abstract class AbstractListener<String> implements MessageListener<String> { @Autowired PulsarProperties pulsarProperties; @override public void received(Consumer<String> consumer, Message<String> message) { } /** * 判断开关 * * @return is equals off */ public boolean judgeIsOff() { return pulsarProperties.getOnOff().equals("off"); }}
=================Listener自定义实现类====================
@Slf4j@Componentpublic class AuditCommentResultListener<String> extends AbstractListener<String> { @Autowired CommentService commentService; @Override public void received(Consumer consumer, Message msg) { try { java.lang.String data = new java.lang.String(msg.getData()); log.info("接受到消息, MessageId {} data {}", msg.getMessageId(), data); // 添加开关 if (super.judgeIsOff()) { consumer.negativeAcknowlEdge(msg); log.error("当前开关为off 拒绝消费消息, MessageId {} data {}", msg.getMessageId(), data); } // 处理业务逻辑 consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); log.error("拒绝消费消息, MessageId {} data {}", msg.getMessageId(), new java.lang.String(msg.getData()), e); } }}
=========================================================================
以上是脚本宝典为你收集整理的Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法全部内容,希望文章能够帮你解决Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。