這篇文章主要講解了“rocketmq中ExtProducerResetConfiguration的原理及應(yīng)用”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“rocketmq中ExtProducerResetConfiguration的原理及應(yīng)用”吧!

創(chuàng)新互聯(lián)公司專注于企業(yè)成都全網(wǎng)營銷推廣、網(wǎng)站重做改版、郾城網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5響應(yīng)式網(wǎng)站、商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為郾城等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
本文主要研究一下rocketmq的ExtProducerResetConfiguration
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@Configuration
public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);
private ConfigurableApplicationContext applicationContext;
private StandardEnvironment environment;
private RocketMQProperties rocketMQProperties;
private ObjectMapper objectMapper;
public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
StandardEnvironment environment,
RocketMQProperties rocketMQProperties) {
this.objectMapper = rocketMQMessageObjectMapper;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
if (Objects.nonNull(beans)) {
beans.forEach(this::registerTemplate);
}
}
private void registerTemplate(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
}
ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
validate(annotation, genericApplicationContext);
DefaultMQProducer mqProducer = createProducer(annotation);
// Set instanceName same as the beanName
mqProducer.setInstanceName(beanName);
try {
mqProducer.start();
} catch (MQClientException e) {
throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
beanName), e);
}
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setObjectMapper(objectMapper);
log.info("Set real producer to :{} {}", beanName, annotation.value());
}
private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
DefaultMQProducer producer = null;
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
if (producerConfig == null) {
producerConfig = new RocketMQProperties.Producer();
}
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
String groupName = environment.resolvePlaceholders(annotation.group());
groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;
String ak = environment.resolvePlaceholders(annotation.accessKey());
ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
String sk = environment.resolvePlaceholders(annotation.secretKey());
sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
annotation.enableMsgTrace(), customizedTraceTopic);
producer.setVipChannelEnabled(false);
} else {
producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
}
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());
return producer;
}
private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQTemplateConfiguration",
annotation.value()));
}
if (rocketMQProperties.getNameServer() == null ||
rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
throw new BeanDefinitionValidationException(
"Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
"global property, please use the default RocketMQTemplate!");
}
}
}ExtProducerResetConfiguration實(shí)現(xiàn)了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法會(huì)取出標(biāo)記有ExtRocketMQTemplateConfiguration的beans,然后挨個(gè)執(zhí)行registerTemplate方法
registerTemplate首先判斷RocketMQTemplate.class是否是該bean的超類,是的話則讀取ExtRocketMQTemplateConfiguration注解,校驗(yàn)注解指定的NameServer地址是否與全局的相同,相同則拋出BeanDefinitionValidationException異常
最后通過createProducer方法創(chuàng)建DefaultMQProducer,然后執(zhí)行其start方法,然后復(fù)制給該bean,并設(shè)置objectMapper
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface ExtRocketMQTemplateConfiguration {
/**
* The component name of the Producer configuration.
*/
String value() default "";
/**
* The property of "name-server".
*/
String nameServer();
/**
* Name of producer.
*/
String group() default "${rocketmq.producer.group:}";
/**
* Millis of send message timeout.
*/
int sendMessageTimeout() default -1;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
int compressMessageBodyThreshold() default -1;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* This may potentially cause message duplication which is up to application developers to resolve.
*/
int retryTimesWhenSendFailed() default -1;
/**
* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
int retryTimesWhenSendAsyncFailed() default -1;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
boolean retryNextServer() default false;
/**
* Maximum allowed message size in bytes.
*/
int maxMessageSize() default -1;
/**
* The property of "access-key".
*/
String accessKey() default "${rocketmq.producer.accessKey:}";
/**
* The property of "secret-key".
*/
String secretKey() default "${rocketmq.producer.secretKey:}";
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default true;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}";
}ExtRocketMQTemplateConfiguration注解定義了value、nameServer、group、sendMessageTimeout、compressMessageBodyThreshold、retryTimesWhenSendFailed、retryTimesWhenSendAsyncFailed、retryNextServer、maxMessageSize、accessKey、secretKey、enableMsgTrace、customizedTraceTopic屬性
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}這里定義了ExtRocketMQTemplate類,它繼承了RocketMQTemplate,同時(shí)使用了ExtRocketMQTemplateConfiguration注解
ExtProducerResetConfiguration實(shí)現(xiàn)了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法會(huì)取出標(biāo)記有ExtRocketMQTemplateConfiguration的beans,然后挨個(gè)執(zhí)行registerTemplate方法
registerTemplate首先判斷RocketMQTemplate.class是否是該bean的超類,是的話則讀取ExtRocketMQTemplateConfiguration注解,校驗(yàn)注解指定的NameServer地址是否與全局的相同,相同則拋出BeanDefinitionValidationException異常
最后通過createProducer方法創(chuàng)建DefaultMQProducer,然后執(zhí)行其start方法,然后復(fù)制給該bean,并設(shè)置objectMapper
感謝各位的閱讀,以上就是“rocketmq中ExtProducerResetConfiguration的原理及應(yīng)用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對rocketmq中ExtProducerResetConfiguration的原理及應(yīng)用這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!
標(biāo)題名稱:rocketmq中ExtProducerResetConfiguration的原理及應(yīng)用
文章鏈接:http://www.chinadenli.net/article10/iehhgo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)、網(wǎng)站制作、虛擬主機(jī)、做網(wǎng)站、微信小程序、網(wǎng)站內(nèi)鏈
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)