這篇文章將為大家詳細(xì)講解有關(guān)JMS消息activemq的使用分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

十多年的且末網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都營(yíng)銷網(wǎng)站建設(shè)的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整且末建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“且末網(wǎng)站設(shè)計(jì)”,“且末網(wǎng)站推廣”以來(lái),每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
對(duì)于JMS,百度百科,是這樣介紹的:JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
簡(jiǎn)短來(lái)說(shuō),JMS是一種與廠商無(wú)關(guān)的 API,用來(lái)訪問(wèn)消息收發(fā)系統(tǒng)消息。它類似于JDBC(Java Database Connectivity),提供了應(yīng)用程序之間異步通信的功能。
JMS1.0是jsr 194里規(guī)定的規(guī)范(關(guān)于jsr規(guī)范,請(qǐng)點(diǎn)擊)。目前最新的規(guī)范是JSR 343,JMS2.0。
好了,說(shuō)了這么多,其實(shí)只是在說(shuō),JMS只是sun公司為了統(tǒng)一廠商的接口規(guī)范,而定義出的一組api接口。
描述如下:
JMS提供者(JMS的實(shí)現(xiàn)者,比如activemq jbossmq等)
JMS客戶(使用提供者發(fā)送消息的程序或?qū)ο螅缭?2306中,負(fù)責(zé)發(fā)送一條購(gòu)票消息到處理隊(duì)列中,用來(lái)解決購(gòu)票高峰問(wèn)題,那么,發(fā)送消息到隊(duì)列的程序和從隊(duì)列獲取消息的程序都叫做客戶)
JMS生產(chǎn)者,JMS消費(fèi)者(生產(chǎn)者及負(fù)責(zé)創(chuàng)建并發(fā)送消息的客戶,消費(fèi)者是負(fù)責(zé)接收并處理消息的客戶)
JMS消息(在JMS客戶之間傳遞數(shù)據(jù)的對(duì)象)
JMS隊(duì)列(一個(gè)容納那些被發(fā)送的等待閱讀的消息的區(qū)域)
JMS主題(一種支持發(fā)送消息給多個(gè)訂閱者的機(jī)制)
連接工廠(connectionfactory)客戶端使用JNDI查找連接工廠,然后利用連接工廠創(chuàng)建一個(gè)JMS連接。
JMS連接 表示JMS客戶端和服務(wù)器端之間的一個(gè)活動(dòng)的連接,是由客戶端通過(guò)調(diào)用連接工廠的方法建立的。
JMS會(huì)話 session 標(biāo)識(shí)JMS客戶端和服務(wù)端的會(huì)話狀態(tài)。會(huì)話建立在JMS連接上,標(biāo)識(shí)客戶與服務(wù)器之間的一個(gè)會(huì)話進(jìn)程。
JMS目的 Destinatio 又稱為消息隊(duì)列,是實(shí)際的消息源
生產(chǎn)者和消費(fèi)者
消息類型,分為隊(duì)列類型(優(yōu)先先進(jìn)先出)以及訂閱類型
從官網(wǎng)下載安裝包, http://activemq.apache.org/download.html 賦予運(yùn)行權(quán)限 chmod +x,windows可以忽略此步 運(yùn)行 ./active start | stop 啟動(dòng)后,activeMQ會(huì)占用兩個(gè)端口,一個(gè)是負(fù)責(zé)接收發(fā)送消息的tcp端口:61616,一個(gè)是基于web負(fù)責(zé)用戶界面化管理的端口:8161。這兩個(gè)端口可以在conf下面的xml中找到。http服務(wù)器使用了jettry。這里有個(gè)問(wèn)題是啟動(dòng)mq后,很長(zhǎng)時(shí)間管理界面才可以顯示出來(lái)。
==》啟動(dòng)MQ服務(wù)器:
根據(jù)操作系統(tǒng)不同,進(jìn)入相應(yīng)win64/win32位目錄,雙擊activemq.bat啟動(dòng)MQ。
ActiveMQ默認(rèn)啟動(dòng)時(shí),啟動(dòng)了內(nèi)置的jetty服務(wù)器,提供一個(gè)用于監(jiān)控ActiveMQ的admin應(yīng)用 進(jìn)入ActionMQ服務(wù)監(jiān)控地址:瀏覽器輸入http://127.0.0.1:8161/admin
Spring最厲害的地方就是它的Bean了,還有它特有的IOC(控制反轉(zhuǎn))和AOP(面向切面編程)技術(shù)。有了這些,我們就可以不用new關(guān)鍵字構(gòu)造對(duì)象,同時(shí),可以方便地使用注入往類中的屬性進(jìn)行初始化。如果你編寫過(guò)ActiveMQ之類的JMS應(yīng)用程序,無(wú)論對(duì)于消息的生產(chǎn)者還是消費(fèi)者,最重要的接口有以下兩個(gè): 1.ConnectionFactory 2.Destination
ConnectionFactory是一切的基礎(chǔ),有了它才有了Connection,然后才有Session,只有通過(guò)Session對(duì)象,我們才能創(chuàng)建消息隊(duì)列、構(gòu)建生產(chǎn)者/消費(fèi)者,繼而發(fā)送/接收消息。 Destination是一切的歸宿,它就像總線一樣,生產(chǎn)者發(fā)出消息要發(fā)到它上面,消費(fèi)者取消息也要從這上面取。
試想,如果這一切都能借助Spring強(qiáng)大的Bean管理的話,我們?cè)诰帉懗绦虻臅r(shí)候會(huì)更加的方便簡(jiǎn)潔。幸運(yùn)的是,ActiveMQ官方提供了完美的Spring框架支持,一切只需要在xml文件中配置即可~
Spring官方提供了一個(gè)叫JmsTemplate的類,這個(gè)類就專門用來(lái)處理JMS的,在該類的Bean配置標(biāo)簽中有兩個(gè)屬性connectionFactory-ref和defaultDestination-ref正好對(duì)應(yīng)JMS中的ConnectionFactory和Destination,如果你有興趣查看源碼的話,就可以發(fā)現(xiàn)JmsTemplate幫我們做了大量創(chuàng)建的工作,我們只需要用它來(lái)進(jìn)行收發(fā)信息就ok了,而ActiveMQ官方也提供了對(duì)應(yīng)的實(shí)現(xiàn)包。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd"> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"></bean> <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:targetConnectionFactory-ref="jmsConnectionFactory" p:sessionCacheSize="10"></bean> <bean id="queue_test" class="org.apache.activemq.command.ActiveMQQueue"> <!--消息隊(duì)列名稱 --> <constructor-arg value="queue_test" /> </bean> <!-- Spring JMS Template --> <!-- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="cachedConnectionFactory" p:defaultDestination-ref="queue_test"></bean> --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="jmsConnectionFactory" /> <!-- pub/sub模型(發(fā)布/訂閱) --> <property name="pubSubDomain" value="true" /> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </property> <!-- Session.AUTO_ACKNOWLEDGE 1消息自動(dòng)簽收 Session.CLIENT_ACKNOWLEDGE 2客戶端調(diào)用acknowledge方法手動(dòng)簽收 Session.DUPS_OK_ACKNOWLEDGE 3不必必須簽收,消息可能會(huì)重復(fù)發(fā)送 --> <property name="sessionAcknowledgeMode" value="2" /> </bean> <jms:listener-container container-type="default" connection-factory="jmsConnectionFactory" acknowledge="auto"> <jms:listener destination="queue_test" ref="simpleMsgListener" method="onMessage"></jms:listener> </jms:listener-container> </beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:util="http://www.springframework.org/schema/util"
xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.5.xsd
">
<context:annotation-config />
<context:component-scan base-package="com.troy" />
<!-- 讀取配置文件 -->
<bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<array>
<value>classpath:conf/spring-config.properties</value>
</array>
</property>
</bean>
<!-- 連接 activemq -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq_url}" userName="${activemq_username}" password="${activemq_password}" />
<!-- 這里可以采用連接池的方式連接PooledConnectionFactoryBean -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 配置連接 -->
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
<!-- 會(huì)話的最大連接數(shù) -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 定義消息隊(duì)列topic類型,queue的方式差不多 -->
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 定義名稱 -->
<constructor-arg index="0" value="topic" />
</bean>
<!-- 配置JMS模板(topic),Spring提供的JMS工具類,它發(fā)送、接收消息。 -->
<!-- 為了測(cè)試發(fā)送消息,保留jmsTemplate的配置,實(shí)際不存在發(fā)送,只需要配置監(jiān)聽(tīng)即可 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="topic" />
<!-- 非pub/sub模型(發(fā)布/訂閱),true為topic,false為queue -->
<property name="pubSubDomain" value="true" />
</bean>
<!-- 監(jiān)聽(tīng)方式,這種方式更實(shí)用,可以一直監(jiān)聽(tīng)消息 -->
<bean id="topicMessageListen" class="com.troy.activemq.TopicMessageListen" />
<bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<!-- 注冊(cè)activemq名稱 -->
<property name="destination" ref="topic" />
<property name="messageListener" ref="topicMessageListen" />
</bean>
</beans>import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
@Component(value = "simpleMsgListener")
public class SimpleMsgListener implements MessageListener {
//收到信息時(shí)的動(dòng)作
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的信息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}spring boot jsm官網(wǎng)
springboot與ActiveMQ整合
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
# ActiveMQ-------------------------------------------------------------------------------------------------------------- # Specify if the default broker URL should be in memory. Ignored if an explicit broker has been specified. #spring.activemq.in-memory=false # URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616` spring.activemq.broker-url=tcp://127.0.0.1:61616 # Login user of the broker. spring.activemq.user=admin # Login password of the broker. spring.activemq.password=admin # Trust all packages. #spring.activemq.packages.trust-all=false # Comma-separated list of specific packages to trust (when not trusting all packages). #spring.activemq.packages.trusted= # See PooledConnectionFactory. #spring.activemq.pool.configuration.*= # Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory. spring.activemq.pool.enabled=true # Maximum number of pooled connections. spring.activemq.pool.max-connections=50 # Connection expiration timeout in milliseconds. spring.activemq.pool.expiry-timeout=10000 # Connection idle timeout in milliseconds. spring.activemq.pool.idle-timeout=30000 spring.jms.pub-sub-domain=false
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
public class ActiveMQConfig {
@Value("${queueName}")
private String queueName;
@Value("${topicName}")
private String topicName;
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
/**
* <p>@describe: 配置隊(duì)列 </p>
* <p>@author: whh </p>
* <p>@date: 2019年8月9日 下午5:56:22</p>
* @return
*/
@Bean
public Queue queue() {
return new ActiveMQQueue(queueName);
}
/**
* <p>@describe:配置主題 </p>
* <p>@author: whh </p>
* <p>@date: 2019年8月9日 下午5:56:40</p>
* @return
*/
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
/**
* <p>@describe:連接工廠 </p>
* <p>@author: whh </p>
* <p>@date: 2019年8月9日 下午5:57:14</p>
* @return
*/
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
}
/**
* <p>@describe: 監(jiān)聽(tīng)隊(duì)列bean </p>
* <p>@author: whh </p>
* <p>@date: 2019年8月9日 下午5:57:43</p>
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
/**
* <p>@describe:監(jiān)聽(tīng)主題 bean </p>
* <p>@author: whh </p>
* <p>@date: 2019年8月9日 下午5:58:17</p>
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//設(shè)置為發(fā)布訂閱方式, 默認(rèn)情況下使用的生產(chǎn)消費(fèi)者方式
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
}import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@Service
public class JmsSendDemo {
private static final Logger LOGGER = LoggerFactory.getLogger(JmsSendDemo.class);
@Autowired
private JmsTemplate jmsTemplate;
public void execute(){
LOGGER.debug("執(zhí)行定時(shí)任務(wù)-----------------直接違規(guī)數(shù)據(jù)定時(shí):下發(fā)至醫(yī)院端首頁(yè)進(jìn)行展示-----------------");
Destination destination = (Destination) AppContext.getBean("queue_test");
jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello"));
//啟動(dòng)另一個(gè)方法
boolean flag=true;
LOGGER.debug("執(zhí)行定時(shí)任務(wù)結(jié)束------------------------------");
}
public void sendMessage(Destination destination,final String message) {
LOGGER.debug("發(fā)送消息到AMQ,消息內(nèi)容為:"+message);
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@JmsListener(destination = "mailbox", containerFactory = "myFactory")
public void receiveMessage(Email email) {
System.out.println("Received <" + email + ">");
}
}關(guān)于JMS消息activemq的使用分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
本文名稱:JMS消息activemq的使用分析
網(wǎng)頁(yè)鏈接:http://www.chinadenli.net/article22/isjccc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)公司、企業(yè)建站、全網(wǎng)營(yíng)銷推廣、關(guān)鍵詞優(yōu)化、網(wǎng)站策劃
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)