Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法

发布时间:2022-06-29 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

上代码, 包括Pulsar的参数类, Pulsar Client, PRoducer和Consumer

================Pulsar参数类=====================@Data@RefreshScoPE@component@configurationProperties(prefix = "tdmq.pulsar")public class PulsarProperties {    /**     * 接入地址     */    private String serviceurl;    /**     * 命名空间tdc     */    private String tdcnamespace;    /**     * 角色tdc的token     */    private String tdcToken;    /**     * 集群name     */    private String cluster;
/** * topicMap */private Map<String, String> topicMap;/** * 订阅 */private Map<String, String> subMap;/** * 开关 on:Consumer可用 ||||| off:Consumer断路 */private String onOff;
}==================PulsarClient=======================
@Slf4j@Configuration@EnableConfigurationProperties(PulsarProperties.class)public class PulsarConfig {    @Autowired    PulsarProperties pulsarProperties;    @RefreshScope    @Bean    public PulsarClient getPulsarClient() {        try {            return PulsarClient.builder()                    .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))                    .serviceUrl(pulsarProperties.getServiceurl())                    .build();        } catch (PulsarClientException e) {            LOG.error("初始化Pulsar Client失败", e);        }        throw new RuntimeException("初始化Pulsar Client失败");    }}===========Producer&Consumer&发送消息的工具类=================
@Slf4j@Componentpublic class PulsarUtils {    @Autowired    PulsarProperties pulsarProperties;    @Autowired    PulsarClient client;    @Autowired    AudITCommentResultListener<String> auditCommentResultListener;    @Autowired    AuditReplyResultListener<String> auditReplyResultListener;    @Autowired    AuditResourceResultListener<String> auditResourceResultListener;    /**     * 创建一个生产者     *     * @param topic topic name     * @return Producer生产者     */    public Producer<byte[]> createProducer(String topic) {        try {            return client.newProducer()                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                    .batchingMaxpublishDelay(10, TimeUnit.MILLISECONDS)                    .sendTimeout(10, TimeUnit.SECONDS)                    .blockIfQueueFull(true)                    .create();        } catch (PulsarClientException e) {            log.error("初始化Pulsar Producer失败", e);        }        throw new RuntimeException("初始化Pulsar Producer失败");    }    /**     * 创建一个消费者     *     * @param topic           topic name     * @param subscription    sub name     * @param messageListener MessageListener的自定义实现类     * @return Consumer消费者     */    public Consumer createConsumer(String topic, String subscription,                                   MessageListener messageListener) {        try {            return client.newConsumer()                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                    .subscriptionName(subscription)                    .ackTimeout(10, TimeUnit.SECONDS)                    .subscriptionType(SubscriptionType.Shared)                    .messageListener(messageListener)                    .subscribe()                    ;        } catch (PulsarClientException e) {            log.error("初始化Pulsar Consumer失败", e);        }        throw new RuntimeException("初始化Pulsar Consumer失败");    }    /**     * 异步send一条msg     *     * @param message 消息体     */    public void sendMessage(String message, Producer<byte[]> producer) {        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {            log.info("消息发送成功, MessageiD为{}", msgId);        });    }    /**     * 同步发送一条msg     *     * @param message  消息体     * @param producer 生产者实例     */    public void sendOnce(String message, Producer<byte[]> producer) throws PulsarClientException {        MessageId send = producer.send(message.getBytes());        log.info("消息成功发送, MessageId {},message {}", send, message);    }    //-----------consumer-----------    @RefreshScope    @Bean(name = "audit-resource-result-topic")    public Consumer getAuditResourceResultTopicConsumer() {        return this.createConsumer(pulsarProperties.getTopicMap().get("audit-resource-result-topic"),                pulsarProperties.getSubMap().get("resource-sub-audit-resource-result"),                auditResourceResultListener);    }    //-----------producer-----------    @RefreshScope    @Bean(name = "resource-publish-topic")    public Producer<byte[]> getResourcePublishTopicProducer() {        return this.createProducer(pulsarProperties.getTopicMap().get("resource-publish-topic"));    }}=====================AbstractListener===============================
@Slf4j@Componentpublic abstract class AbstractListener<String> implements MessageListener<String> {    @Autowired    PulsarProperties pulsarProperties;    @override    public void received(Consumer<String> consumer, Message<String> message) {    }    /**     * 判断开关     *     * @return is equals off     */    public boolean judgeIsOff() {        return pulsarProperties.getOnOff().equals("off");    }}
=================Listener自定义实现类====================
@Slf4j@Componentpublic class AuditCommentResultListener<String> extends AbstractListener<String> {    @Autowired    CommentService commentService;    @Override    public void received(Consumer consumer, Message msg) {        try {            java.lang.String data = new java.lang.String(msg.getData());            log.info("接受到消息, MessageId {} data {}", msg.getMessageId(), data);            // 添加开关            if (super.judgeIsOff()) {                consumer.negativeAcknowlEdge(msg);                log.error("当前开关为off 拒绝消费消息, MessageId {} data {}", msg.getMessageId(), data);            }            // 处理业务逻辑            consumer.acknowledge(msg);        } catch (Exception e) {            consumer.negativeAcknowledge(msg);            log.error("拒绝消费消息, MessageId {} data {}", msg.getMessageId(), new java.lang.String(msg.getData()), e);        }    }}
=========================================================================

脚本宝典总结

以上是脚本宝典为你收集整理的Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法全部内容,希望文章能够帮你解决Pulsar+SpringCloud 让Pulsar的配置可以热更新的方法所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。