澳门新浦京8455comSpring源码分析之spring-jms模块详解

澳门新浦京8455com 22

0 概述

spring提供了二个jms集成框架,那几个框架如spring 集成jdbc
api近似,简化了jms api的选拔。

jms可以简轻便单的分为五个功用区,新闻的临蓐和音讯的耗费。JmsTemplate类用来变化音信和合营选用消息。和别的java
ee的新闻使得样式相似,对异步消息,spring也提供了成都百货上千音讯监听容器用来创制音信使得的POJO(MDPs卡塔尔。spring同不经常候也提供了创制音信监听器的扬言形式。

org.springframework.jms.core
提供了接收JMS的主导效用,它含有JmsTemplate类,该类相通于jdbc中的jdbdTemplate,它通过对财富的成立和刑释管理来简化jms开采。spring的模板类作为一种设计原则在spring框架中常见接纳,模板类对简易操作提供了助手方法;对复杂操作,通过持续回调接口提供了重大管理进度的代办。jmsTemplate雷同服从这一企划原则,它提供了发送音讯、同步开支新闻、为客户提供JMS
session和音讯临盆者的多样平价措施。

org.springframework.jms.support提供了JmsException转译功效。它将checked的JmsException档期的顺序转换到uncheckedd卓殊的镜像档案的次序。若抛出的格外不是javax.jms.JmsException的子类,那一个特别将被封装成unchecked非凡UncategorizedJmsException。

org.springframework.jms.support.converter
提供了在java对象和jms消息之间转换的架空MessageConverter。

org.springframework.jms.support.destination提供了管住jms
destination的各类主题,如对寄存在jndi的destionation提供劳动一定功效。

org.springframework.jms.annotation通过使用@JmsListener提供了对评释驱动的监听端的支撑。

org.springframework.jms.config
扶持jms命名空间的辨析,同不经常候也支撑配置监听容器和转移监听端。

末尾,org.springframework.jms.connection提供了适用于standonle应用的ConnectionFactory的贯彻。它也了spring
PlatFormTransactionManager对jms的事务管理完毕jmsTranscationmanager.
那允许jms 作为职业财富无缝的三合一到Spring事务管理机制中。

1.1     JMS简介

       JMS的完备是Java Message Service,即Java新闻服务。它根本用以在劳动者和顾客之间进行新闻传递,生产者肩负爆发音信,而购买者承当采用音讯。把它利用到骨子里的事情需求中的话我们得以在一定的时候使用临蓐者生成一音信,并张开垦送,对应的消费者在接到到对应的消息后去完结对应的作业逻辑。对于音信的传递有两种等级次序,一种是点对点的,即叁个劳动者和一个消费者一一对应;另一种是公布/订阅形式,即叁个劳动者发生新闻并开展发送后,能够由多少个客商进行收纳。

首先须求下载ActiveMQ,上边包车型客车链接给我们列出了装有版本:

种种版本为不一致的OS提供了链接:
澳门新浦京8455com 1

1. 丰盛管理类模块

澳门新浦京8455com 2

那贰个的转换在JmsUtils中来做的:

/**
     * Convert the specified checked {@link javax.jms.JMSException JMSException} to a
     * Spring runtime {@link org.springframework.jms.JmsException JmsException} equivalent.
     * @param ex the original checked JMSException to convert
     * @return the Spring runtime JmsException wrapping the given exception
     */
    public static JmsException convertJmsAccessException(JMSException ex) {
        Assert.notNull(ex, "JMSException must not be null");

        if (ex instanceof javax.jms.IllegalStateException) {
            return new org.springframework.jms.IllegalStateException((javax.jms.IllegalStateException) ex);
        }
        if (ex instanceof javax.jms.InvalidClientIDException) {
            return new InvalidClientIDException((javax.jms.InvalidClientIDException) ex);
        }
        if (ex instanceof javax.jms.InvalidDestinationException) {
            return new InvalidDestinationException((javax.jms.InvalidDestinationException) ex);
        }
        if (ex instanceof javax.jms.InvalidSelectorException) {
            return new InvalidSelectorException((javax.jms.InvalidSelectorException) ex);
        }
        if (ex instanceof javax.jms.JMSSecurityException) {
            return new JmsSecurityException((javax.jms.JMSSecurityException) ex);
        }
        if (ex instanceof javax.jms.MessageEOFException) {
            return new MessageEOFException((javax.jms.MessageEOFException) ex);
        }
        if (ex instanceof javax.jms.MessageFormatException) {
            return new MessageFormatException((javax.jms.MessageFormatException) ex);
        }
        if (ex instanceof javax.jms.MessageNotReadableException) {
            return new MessageNotReadableException((javax.jms.MessageNotReadableException) ex);
        }
        if (ex instanceof javax.jms.MessageNotWriteableException) {
            return new MessageNotWriteableException((javax.jms.MessageNotWriteableException) ex);
        }
        if (ex instanceof javax.jms.ResourceAllocationException) {
            return new ResourceAllocationException((javax.jms.ResourceAllocationException) ex);
        }
        if (ex instanceof javax.jms.TransactionInProgressException) {
            return new TransactionInProgressException((javax.jms.TransactionInProgressException) ex);
        }
        if (ex instanceof javax.jms.TransactionRolledBackException) {
            return new TransactionRolledBackException((javax.jms.TransactionRolledBackException) ex);
        }

        // fallback
        return new UncategorizedJmsException(ex);
    }

1.2     Spring整合JMS

       对JMS做了叁个简易介绍之后,接下去就讲一下Spring整合JMS的有板有眼经过。JMS只是一个正经,真正在动用它的时候大家需求有它的切切实实落到实处,这里大家就利用Apache的activeMQ来作为它的兑现。所选取的依赖性利用Maven来进行管制,具体信赖如下:

 

Xml代码  澳门新浦京8455com 3

  1. <dependencies>  
  2.         <dependency>  
  3.             <groupId>junit</groupId>  
  4.             <artifactId>junit</artifactId>  
  5.             <version>4.10</version>  
  6.             <scope>test</scope>  
  7.         </dependency>  
  8.         <dependency>  
  9.             <groupId>org.springframework</groupId>  
  10.             <artifactId>spring-context</artifactId>  
  11.             <version>${spring-version}</version>  
  12.         </dependency>  
  13.         <dependency>  
  14.             <groupId>org.springframework</groupId>  
  15.             <artifactId>spring-jms</artifactId>  
  16.             <version>${spring-version}</version>  
  17.         </dependency>  
  18.         <dependency>  
  19.             <groupId>org.springframework</groupId>  
  20.             <artifactId>spring-test</artifactId>  
  21.             <version>${spring-version}</version>  
  22.         </dependency>  
  23.         <dependency>  
  24.             <groupId>javax.annotation</groupId>  
  25.             <artifactId>jsr250-api</artifactId>  
  26.             <version>1.0</version>  
  27.         </dependency>  
  28.         <dependency>  
  29.             <groupId>org.apache.activemq</groupId>  
  30.             <artifactId>activemq-core</artifactId>  
  31.             <version>5.7.0</version>  
  32.         </dependency>  
  33. </dependencies>  

 

 

2. config模块

支持jms命名空间的剖判,同一时间也支撑配置监听容器和转移监听端。其组织如下:

澳门新浦京8455com 4

其间,在JmsNamespaceHandler中采纳到了监听容器,如下:

public class JmsNamespaceHandler extends NamespaceHandlerSupport {

    @Override
    public void init() {
        registerBeanDefinitionParser("listener-container", new JmsListenerContainerParser());
        registerBeanDefinitionParser("jca-listener-container", new JcaListenerContainerParser());
    }

}

1.2.1  activeMQ准备

       既然是采用的apache的activeMQ作为JMS的完成,那么首先咱们应该到apache官互连网下载activeMQ(http://activemq.apache.org/download.html),实行解压后运营其bin目录上边包车型客车activemq.bat文件运转activeMQ。

厂商计算机是windows的,用目录下的activemq.bat运营: 

3. connection模块

提供了适用于standonle应用的ConnectionFactory的落实。它也了spring
PlatFormTransactionManager对jms的事务管理达成jmsTranscationmanager.
那允许jms 作为职业能源无缝的合一到spring事务管理机制中。其组织如下:

澳门新浦京8455com 5

JmsTemplate须要一个ConnectionFactory的援引,ConnectionFactory作为jsm规范的一有的,是用作和jms交互作用中一个关键因素,用来作为工厂创设和jms
提供者连接客商端应用的connection,何况封装了多样类型的安排参数,这几个参数日常由jsm
提供者指明,比如ssl 配置选项。

SingleConnectionFactory
创立连接时回来同叁个老是Connectin而忽略了close(State of Qatar调用。它适用于测量检验遭遇和standalone情况,因为同一的connection能够被两个JmsTemplate调用,那一个调用能够超过多少个业务。SingleConnectionFactory有二个只怕出自于jndi的正经ConnectionFactory援引。

CachingConnectionFactory扩展了SingleConnectionFactory的效应,扩张了session的缓存,MessageProducer和messageConsumer。缓存的初步化size设置为1,使用

1.2.2配置ConnectionFactory

       ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于树立JMS服务器链接的呼吁会直接重临同二个链接,何况会忽视Connection的close方法调用。CachingConnectionFactory世袭了SingleConnectionFactory,所以它具有SingleConnectionFactory的有所机能,同时它还新扩充了缓存功能,它能够缓存Session、MessageProducer和MessageConsumer。这里大家选择SingleConnectionFactory来作为示范。

Xml代码  澳门新浦京8455com 6

  1. <bean id=”connectionFactory” class=”org.springframework.jms.connection.SingleConnectionFactory”/>  

 

       那样就定义好产生JMS服务器链接的ConnectionFactory了呢?答案是非也。Spring提供的ConnectionFactory只是Spring用于处理ConnectionFactory的,真正发出到JMS服务器链接的ConnectionFactory还得是由JMS服务商家提供,并且要求把它注入到Spring提供的ConnectionFactory中。我们这里运用的是ActiveMQ实现的JMS,所以在我们那边确确实实的可以生出Connection的就应该是由ActiveMQ提供的ConnectionFactory。所以定义多个ConnectionFactory的完整代码应该如下所示:

Xml代码  澳门新浦京8455com 7

  1. <!– 真正能够爆发Connection的ConnectionFactory,由相应的 JMS服务厂商提供–>  
  2. <bean id=”targetConnectionFactory” class=”org.apache.activemq.ActiveMQConnectionFactory”>  
  3.     <property name=”brokerURL” value=”tcp://localhost:61616″/>  
  4. </bean>  
  5.   
  6. <!– Spring用于管理真正的ConnectionFactory的ConnectionFactory –>  
  7. <bean id=”connectionFactory” class=”org.springframework.jms.connection.SingleConnectionFactory”>  
  8.     <!– 指标ConnectionFactory对应忠诚的能够生出JMS Connection的ConnectionFactory –>  
  9.     <property name=”targetConnectionFactory” ref=”targetConnectionFactory”/>  
  10. </bean>  

  

       ActiveMQ为大家提供了二个PooledConnectionFactory,通过往里面注入四个ActiveMQConnectionFactory能够用来将Connection、Session和MessageProducer池化,那样可以大大的减弱大家的财富消耗。当使用PooledConnectionFactory时,大家在概念二个ConnectionFactory时应当是之类概念:

 

Xml代码  澳门新浦京8455com 8

  1. <!– 真正能够生出Connection的ConnectionFactory,由相应的 JMS服务厂家提供–>  
  2. <bean id=”targetConnectionFactory” class=”org.apache.activemq.ActiveMQConnectionFactory”>  
  3.     <property name=”brokerURL” value=”tcp://localhost:61616″/>  
  4. </bean>  
  5.   
  6. <bean id=”pooledConnectionFactory” class=”org.apache.activemq.pool.PooledConnectionFactory”>  
  7.     <property name=”connectionFactory” ref=”targetConnectionFactory”/>  
  8.     <property name=”maxConnections” value=”10″/>  
  9. </bean>  
  10.   
  11. <bean id=”connectionFactory” class=”org.springframework.jms.connection.SingleConnectionFactory”>  
  12.     <property name=”targetConnectionFactory” ref=”pooledConnectionFactory”/>  
  13. </bean>  

澳门新浦京8455com 9

4. core模块

提供了选取JMS的为主职能。

澳门新浦京8455com 10

利用jmsTemplate时只须要后续callback接口,通过持续call接口可以定义一个高档其他左券,MessageCreator
callback接口在钦赐session创制四个新闻,它只好够被JmsTemplate调用。为满意更jms
api更目迷五色的接纳,SessionCallBack给客户提供了jms
session,ProducerCallback暴露了session和messageProducer七个。

jms
api提供三种档期的顺序的send方法,一种设置delivery格局,优先级,存活时间(time-to-live卡塔尔国作为qos参数,另一种不提供qos参数而使用暗中认可值。

1.2.3配备临蓐者

铺排好ConnectionFactory之后大家就供给配备生产者。生产者担负发生消息并发送到JMS服务器,那经常对应的是大家的二个事务逻辑服务完成类。可是大家的服务完成类是怎么开展音信的发送的啊?这日常是运用Spring为我们提供的JmsTemplate类来促成的,所以安顿生产者其实最基本的正是布署举办音信发送的JmsTemplate。对于音信发送者来说,它在出殡和安葬消息的时候要明白自个儿该往哪个地方发,为此,我们在定义JmsTemplate的时候必要往里面注入二个Spring提供的ConnectionFactory对象。

Xml代码  澳门新浦京8455com 11

  1. <!– Spring提供的JMS工具类,它能够实行音信发送、选用等 –>  
  2. <bean id=”jmsTemplate” class=”org.springframework.jms.core.JmsTemplate”>  
  3.     <!– 这么些connectionFactory对应的是大家定义的Spring提供的充足ConnectionFactory对象 –>  
  4.     <property name=”connectionFactory” ref=”connectionFactory”/>  
  5. </bean>  

 

       在真的使用JmsTemplate实行音讯发送的时候,大家供给明白音信发送的目标地,即destination。在Jms中有四个用来表示指标地的Destination接口,它个中未有任何措施定义,只是用来做一个标记而已。当大家在应用JmsTemplate实行音信发送前卫未点名destination的时候将接收私下认可的Destination。暗中认可Destination能够因而在定义jmsTemplate bean对象时经过品质defaultDestination或defaultDestinationName来拓宽注入,defaultDestinationName对应的正是一个平时字符串。在ActiveMQ中贯彻了二种等级次序的Destination,多少个是点对点的ActiveMQQueue,另贰个便是支撑订阅/公布情势的ActiveMQTopic。在概念这两种类型的Destination时我们都能够由此八个name属性来进展组织,如:

 

Xml代码  澳门新浦京8455com 12

  1. <!–那个是队列指标地,点对点的–>  
  2. <bean id=”queueDestination” class=”org.apache.activemq.command.ActiveMQQueue”>  
  3.     <constructor-arg>  
  4.         <value>queue</value>  
  5.     </constructor-arg>  
  6. </bean>  
  7. <!–这么些是大旨指标地,一对多的–>  
  8. <bean id=”topicDestination” class=”org.apache.activemq.command.ActiveMQTopic”>  
  9.     <constructor-arg value=”topic”/>  
  10. </bean>  

       要是大家定义了三个ProducerService,里面有一个向Destination发送纯文本音讯的办法sendMessage,那么我们的代码就大约是那几个样子:

 

Java代码  澳门新浦京8455com 13

  1. package com.tiantian.springintejms.service.impl;  
  2.    
  3. import javax.annotation.Resource;  
  4. import javax.jms.Destination;  
  5. import javax.jms.JMSException;  
  6. import javax.jms.Message;  
  7. import javax.jms.Session;  
  8.    
  9. import org.springframework.jms.core.JmsTemplate;  
  10. import org.springframework.jms.core.MessageCreator;  
  11. import org.springframework.stereotype.Component;  
  12.    
  13. import com.tiantian.springintejms.service.ProducerService;  
  14.    
  15. @Component  
  16. public class ProducerServiceImpl implements ProducerService {  
  17.    
  18.     private JmsTemplate jmsTemplate;  
  19.       
  20.     public void sendMessage(Destination destination, final String message) {  
  21.         System.out.println(“—————临盆者发送消息—————–“卡塔尔国;  
  22.         System.out.println(“—————临盆者发了二个新闻:” + message卡塔尔;  
  23.         jmsTemplate.send(destination, new MessageCreator() {  
  24.             public Message createMessage(Session session) throws JMSException {  
  25.                 return session.createTextMessage(message);  
  26.             }  
  27.         });  
  28.     }   
  29.   
  30.     public JmsTemplate getJmsTemplate() {  
  31.         returnjmsTemplate;  
  32.     }   
  33.   
  34.     @Resource  
  35.     public void setJmsTemplate(JmsTemplate jmsTemplate) {  
  36.         this.jmsTemplate = jmsTemplate;  
  37.     }  
  38.    
  39. }  

       我们得以看看在sendMessage方法体里面我们是由此jmsTemplate来发送信息到相应的Destination的。到此,大家调换一个简单的公文音讯并把它发送到钦点指标地Destination的劳动者就结构好了。

 

5. Listener模块

音信监听容器(MessageListenerContainer卡塔尔(قطر‎用来从jms
新闻队列中经受音讯,然后推送注册到它里面的新闻监听器(MessageListener卡塔尔国中。spring提供了三种标准的jms
音信监听容器,特色如下:

SimpleMessageListenerContainer:在运营时,创立固定数目标jms
会话和八个买主,使用专门的学业的jms
    MessageConsumer.setMessageListener(卡塔尔国方法来注册监听器,让jms
提供者来让监听器重返。

DefaultMessageListenerContainer:帮助在运作时动态适应,而且能参预到表面受拘系职业。各样选择到的音讯使用JtaTransactionManager注册为XA
事务,由此能够足够利用xa 事务语义实行管理。

1.2.4布置消费者

劳动者往钦点指标地Destination发送音信后,接下去便是消费者对点名目标地的信息举行开支了。那么消费者是何许驾驭有临蓐者发送新闻到钦点指标地Destination了吗?那是经过Spring为我们封装的新闻监听容器MessageListenerContainer实现的,它担负选择音讯,并把收到到的信息分发给真正的MessageListener进行拍卖。每种客商对应每一种指标地都亟需有照看的MessageListenerContainer。对于信息监听容器来说,除了要通晓监听哪个目标地之外,还要求掌握到哪里去监听,也正是说它还亟需驾驭去监听哪个JMS服务器,那是透过在安插MessageConnectionFactory的时候往里面注入二个ConnectionFactory来兑现的。所以大家在结构贰个MessageListenerContainer的时候有八本性情必须钦赐,一个是代表从哪里监听的ConnectionFactory;八个是意味着监听什么的Destination;三个是吸收接纳到新闻随后进行信息管理的MessageListener。Spring一共为大家提供了两连串型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。

SimpleMessageListenerContainer会在一同头的时候就创立二个会话session和顾客Consumer,并且会动用职业的JMS
MessageConsumer.setMessageListener(卡塔尔方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态的适应运维时索要和涉企外部的事务管理。包容性方面,它不行挨近于独立的JMS标准,但平时不宽容Java EE的JMS节制。

非常多情形下我们依旧利用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比较,DefaultMessageListenerContainer会动态的适应运营时索要,並且能够参预外界的事务管理。它很好的平衡了对JMS提供者必要低、先进效用如职业出席和包容Java EE情况。

概念管理信息的MessageListener

       要定义管理音信的MessageListener大家只供给实现JMS标准中的MessageListener接口就足以了。MessageListener接口中唯有一个方法onMessage方法,当选用到新闻的时候会活动调用该办法。

 

Java代码  澳门新浦京8455com 14

  1. package com.tiantian.springintejms.listener;  
  2.    
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.TextMessage;  
  7.    
  8. public class ConsumerMessageListener implements MessageListener {  
  9.    
  10.     public void onMessage(Message message) {  
  11.         //这里大家知道坐褥者发送的正是一个纯文本音信,所以那边能够一贯开展逼迫转变  
  12.         TextMessage textMsg = (TextMessage) message;  
  13.         System.out.println(“选用到七个纯文本新闻。”卡塔尔;  
  14.         try {  
  15.             System.out.println(“新闻内容是:” + textMsg.getText(卡塔尔国卡塔尔(قطر‎;  
  16.         } catch (JMSException e) {  
  17.             e.printStackTrace();  
  18.         }  
  19.     }  
  20.    
  21. }  

 

       有了MessageListener之后大家就足以在Spring的布置文件中安顿三个音信监听容器了。

Xml代码  澳门新浦京8455com 15

  1. <!–这些是队列目标地–>  
  2. <bean id=”queueDestination” class=”org.apache.activemq.command.ActiveMQQueue”>  
  3.     <constructor-arg>  
  4.         <value>queue</value>  
  5.     </constructor-arg>  
  6. </bean>  
  7. <!– 音信监听器 –>  
  8. <bean id=”consumerMessageListener” class=”com.tiantian.springintejms.listener.ConsumerMessageListener”/>      
  9.   
  10. <!– 音讯监听容器 –>  
  11. <bean id=”jmsContainer”        class=”org.springframework.jms.listener.DefaultMessageListenerContainer”>  
  12.     <property name=”connectionFactory” ref=”connectionFactory” />  
  13.     <property name=”destination” ref=”queueDestination” />  
  14.     <property name=”messageListener” ref=”consumerMessageListener” />  
  15. </bean>  

  

       大家能够见见大家定义了叁个可以称作queue的ActiveMQQueue目标地,大家的监听器正是监听了发送到那个目标地的消息。

       至此大家的生成者和买主都陈设实现了,那也就表示大家的组成已经到位了。此时完全的Spring的配备文件应该是如此的:

Xml代码  澳门新浦京8455com 16

  1. <?xml version=”1.0″ encoding=”UTF-8″?>  
  2. <beans xmlns=””  
  3.     xmlns:xsi=”” xmlns:context=””  
  4.     xmlns:jms=””  
  5.     xsi:schemaLocation=”  
  6.        
  7.        
  8.        
  9.        
  10.      ;  
  11.    
  12.     <context:component-scan base-package=”com.tiantian” />  
  13.    
  14.     <!– Spring提供的JMS工具类,它能够展开消息发送、接受等 –>  
  15.     <bean id=”jmsTemplate” class=”org.springframework.jms.core.JmsTemplate”>  
  16.         <!– 这几个connectionFactory对应的是大家定义的Spring提供的不行ConnectionFactory对象 –>  
  17.         <property name=”connectionFactory” ref=”connectionFactory”/>  
  18.     </bean>  
  19.       
  20.     <!– 真正得以生出Connection的ConnectionFactory,由相应的 JMS服务商家提供–>  
  21.     <bean id=”targetConnectionFactory” class=”org.apache.activemq.ActiveMQConnectionFactory”>  
  22.         <property name=”brokerURL” value=”tcp://localhost:61616″/>  
  23.     </bean>  
  24.       
  25.     <!– Spring用于管理真正的ConnectionFactory的ConnectionFactory –>  
  26.     <bean id=”connectionFactory” class=”org.springframework.jms.connection.SingleConnectionFactory”>  
  27.         <!– 目的ConnectionFactory对应诚实的能够生出JMS Connection的ConnectionFactory –>  
  28.         <property name=”targetConnectionFactory” ref=”targetConnectionFactory”/>  
  29.     </bean>  
  30.       
  31.     <!–这么些是队列目标地–>  
  32.     <bean id=”queueDestination” class=”org.apache.activemq.command.ActiveMQQueue”>  
  33.         <constructor-arg>  
  34.             <value>queue</value>  
  35.         </constructor-arg>  
  36.     </bean>  
  37.     <!– 音信监听器 –>  
  38.     <bean id=”consumerMessageListener” class=”com.tiantian.springintejms.listener.ConsumerMessageListener”/>  
  39.     <!– 音信监听容器 –>  
  40.     <bean id=”jmsContainer”  
  41.         class=”org.springframework.jms.listener.DefaultMessageListenerContainer”>  
  42.         <property name=”connectionFactory” ref=”connectionFactory” />  
  43.         <property name=”destination” ref=”queueDestination” />  
  44.         <property name=”messageListener” ref=”consumerMessageListener” />  
  45.     </bean>  
  46. </beans>  

 

 

       接着大家来测验一下,看看我们的结缘是或不是真的成功了,测量试验代码如下:

 

Java代码  澳门新浦京8455com 17

  1. package com.tiantian.springintejms.test;  
  2.    
  3. import javax.jms.Destination;  
  4.    
  5. import org.junit.Test;  
  6. import org.junit.runner.RunWith;  
  7. import org.springframework.beans.factory.annotation.Autowired;  
  8. import org.springframework.beans.factory.annotation.Qualifier;  
  9. import org.springframework.test.context.ContextConfiguration;  
  10. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
  11. import com.tiantian.springintejms.service.ProducerService;  
  12.    
  13. @RunWith(SpringJUnit4ClassRunner.class)  
  14. @ContextConfiguration(“/applicationContext.xml”)  
  15. public class ProducerConsumerTest {  
  16.    
  17.     @Autowired  
  18.     private ProducerService producerService;  
  19.     @Autowired  
  20.     @Qualifier(“queueDestination”)  
  21.     private Destination destination;  
  22.       
  23.     @Test  
  24.     public void testSend() {  
  25.         for (int i=0; i<2; i++) {  
  26.             producer瑟维斯.sendMessage(destination, “你好,坐蓐者!这是音讯:” + (i+1卡塔尔(قطر‎State of Qatar;  
  27.         }  
  28.     }  
  29.       
  30. }  

 

       在上头的测量检验代码中我们利用生产者发送了多少个音信,符合规律的话,消费者应该能够摄取到那多少个消息。运维测量检验代码后调控台出口如下:

 
澳门新浦京8455com 18
 

       看,调控台已经张开了金科玉律的出口,那表达大家的构成确实是已经成功了。

 

转载: 

 

端口号私下认可是61616,能够在conf/activemq.xml中看见:

6. remoting模块

澳门新浦京8455com 19

org.aopalliance.intercept.MethodInterceptor 用来抽取基于jms的长间隔服务。

JmsInvokerClientInterceptor:类别化远程触发对象和反种类化远程触发结果对象,使用java类别化方法,比如RMI.

JmsInvokerProxyFactoryBean:jms触发代理的工厂bean,揭露bean引用的代办服务,使用一定的劳务接口。

JmsInvokerServiceExporter:jms音信监听器,暴光特定服务bean作为jsm音信的终端,通过jms
触发代理获取之。

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

小结

spring提供了多个jms集成框架,这么些框架如spring 集成jdbc
api相符,简化了jms api的施用。

相关的Maven dependency:

<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

动用javax.jms.Session跟JMS Provider通信,好像说了句废话…:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
        ActiveMQConnection.DEFAULT_USER,
        ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(Boolean.TRUE,
        Session.AUTO_ACKNOWLEDGE);

接下来一些指标地、发送者、发送内容怎么着的都以由session来弄的:

Destination destination = session.createQueue("this is sparta!!");

MessageProducer producer = session.createProducer(destination);

TextMessage message0 = session.createTextMessage("这是斯巴达!!!");
TextMessage message1 = session.createTextMessage("这也是斯巴达!!!");
TextMessage message2 = session.createTextMessage("这些都是斯巴达!!!");

producer.send(message0);
producer.send(message1);
producer.send(message2);

session.commit();

有了producer,相应地也许有consumer,选拔信息方法如下:

MessageConsumer consumer = session.createConsumer(destination);
System.out.println(((TextMessage) consumer.receive(10000)).getText());

结果恐怕consumer去八个个receive了,就像接纳人亲自去肯定那样。
兴许大家能够让Listener代劳:

consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        try {
            System.out.println("listener catched:::"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

当以此consumer设置了Listener的时候就不可能再以receive(卡塔尔的艺术选用了,
再不晤面世javax.jms.IllegalStateException:Cannot synchronously receive a
message when a MessageListener is set…

一旦想利用publish/subscribe,直接将createQueue改为createTopic就可以,但供给了然Topic是无状态的。

完整code如下,发送者:

{
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
    Connection connection = connectionFactory.createConnection();
    connection.start();

    Session session = connection.createSession(Boolean.TRUE,
            Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createQueue("this is sparta!!");

    MessageProducer producer = session.createProducer(destination);
    TextMessage message0 = session.createTextMessage("这是斯巴达!!!");
    TextMessage message1 = session.createTextMessage("这也是斯巴达!!!");
    TextMessage message2 = session.createTextMessage("这些都是斯巴达!!!");
    producer.send(message0);
    producer.send(message1);
    producer.send(message2);

    session.commit();
}

接收者:

{
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");;
    Connection connection = connectionFactory.createConnection();
    connection.start();

    Session session = connection.createSession(Boolean.FALSE,
            Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createQueue("this is sparta!!");
    MessageConsumer consumer = session.createConsumer(destination);
    System.out.println(((TextMessage) consumer.receive(10000)).getText());
    System.out.println(((TextMessage) consumer.receive(10000)).getText());
    System.out.println(((TextMessage) consumer.receive(10000)).getText());
}

 

 

此番试试集成到Spring。
上边包车型客车连接是ActiveMQ官方网站提供的文书档案。

上边是本人加多的局地dependency,基本的spring依赖小编就不列举了:

<!-- jms activemq -->
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>${activemq.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>${activemq.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-spring</artifactId>
    <version>${activemq.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
</dependency>
<dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>

maven中增加时要介意还会有个xbean-spring;
事情发生早先并从未放在心上,运营开采分外提示ClassNotFound:org.apache.xbean.spring.context.v2.XBeanNamespaceHandler;

新兴自家增添了xbean-v2,结果提醒v2c,于是本身增多v2c,后来以为太傻就加了xbean-spring。

陈设方面能够运用jms和activeMq的价签:

xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"

相应的xsi:schemaLocation:

http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd

关于connectionFactory的配置能够接受amq标签:

<amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:61616" />

不过在那自身筹算试试PooledConnectionFactory;
有关org.apache.activemq.pool.PooledConnectionFactory官方网址有以下解释(简单完结,都休想翻译了卡塔尔国:

If you are not using a JCA container to manage your JMS connections,
we recommend you use our pooling JMS connection provider,
(org.apache.activemq.pool.PooledConnectionFactory) from the
activemq-pool library, which will pool the JMS resources to work
efficiently with Spring’s JmsTemplate or with EJBs.

对于其属性,上面依照javaDoc给出一些讲解:

  • MaximumActiveSessionPerConnection:每个Connection的最大Session数
  • BlockIfSessionPoolIsFull:默感觉session池满时诉求得到session会堵塞;设置false则会抛出JMSException
  • MaxConnections:最亚松森接数
  • IdleTimeout:空闲时间,默以为30秒
  • CreateConnectionOnStartup:是不是在运行时成立connection

在那间小编先用暗中同意参数评释,不理解为什么连年报MalformPrameterizedType…

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" />

上次用的行列,本次换用Topic试试…

<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
  <constructor-arg index="0" value="spartaTopic"></constructor-arg>
</bean>

理之当然也足以使用amq标签:

<amq:topic physicalName="sparta" />

比方是运用queue的话:

<amq:queue physicalName="sparta" />

莫非作者将那个放到spring里正是为着用用标签方便DI?
用里的话来说就是jmsTemplate是<spring对jms扶植大旨的有的>
(另有jmsTemplate102为适应jms1.0.2的);
和jdbcTemplate那样 jmsTemplate也可能有提供相符的优势。
举个例子说,像jdbcTemplate管理失控的jdbc代码那样,用jmsTemplate管理失控的jms代码。
抑或,要是在动用JmsTemplate是捕捉到了JMSException,JmsTemplate将捕获该非常,然后抛出多个Spring自带的JmsException的子类相当(个人以为spring提供的不是更详细的可怜新闻,只是侧着重不一致…卡塔尔国。
澳门新浦京8455com 20

比如:

  • ListenerExecutionFailedException:监听器实行破产
  • MessageConversionException:音讯调换失利
  • SynchedLocalTransactionFailedException:同步本地职业未成功
  • UncategorizedJmsException:未有契合那几个的此外意况

一经大家catch了JMSException,大家照例能够把他转为JmsException:

catch (JMSException e) {
    e.printStackTrace();
    JmsException je = JmsUtils.convertJmsAccessException(e);
}

目前试着布置jmsTemplate:

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" >
    <property name="connectionFactory" >
        <bean class="org.apache.activemq.pool.PooledConnectionFactory" />
    </property>
    <property name="defaultDestination" >
        <amq:topic physicalName="sparta" />
    </property>
</bean>

像这种类型编写代码时就变得轻松多了,在此以前那么些connectionFactory,connection,session,consumer,producer统统不见了;
自己只必要(无名内部类也许有点碍眼卡塔尔:

ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
JmsTemplate template = (JmsTemplate)context.getBean("jmsTemplate");
template.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
        ActiveMQMapMessage msg = (ActiveMQMapMessage)session.createMapMessage();
        msg.setString("msg", "This is sparta!!");
        return msg;
    }
});

收取时只须求:

template.receive();

但要求小心!那么些receive是联名接收消息的,他会一贯不通到有音讯个选择。
大概会想到MessageListener,比如我们可以给叁个MessageConsumer对象setMessageListener:

MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        try {
            System.out.println("listener catched:::"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

地点代码中的MessageListener实例,要是新建三个特意用来监听的类,完毕MessageListener接口并加上MessageDriven注脚就能够并发三个难题——他非常不足pojo。他有侵入性,我不想要任何达成接口的语法出现在代码中。

于是乎小编得以用listener-container;
近期本人创造一个类去监听,举例:

public class CustomedListener {
    void processHandle(HashMap<String,String> map){
        System.out.println("msg:::"+map.get("msg"));
    }
}

但须求专心的是方式的参数列表,他实际不是不管定义的。
上边包车型地铁publisher发送的message是ActiveMQMapMessage,那就要求小编把参数定义为地点那种样式。
下一场看一下spring中怎样安插那个Listener:

<bean id="myListener" class="pac.testcase.jms.CustomedListener"/>
<jms:listener-container connection-factory="connectionFactory">
    <jms:listener destination="sparta" ref="myListener" method="processHandle"/>
</jms:listener-container>

这么小编就没有须要去调用receive了,有音信就选取。

 

今后搜求通过JMS,在应用程序之间发送消息。
先看看spring提供的RPC方案(其实还也许有别的方案,只是没见过什么人用卡塔尔国。
必要运用到那八个类:

  • org.springframework.jms.remoting.JmsInvokerServiceExporter将bean导出为依照音讯的服务
  • org.springframework.jms.remoting.JmsInvokerProxyFactoryBean让顾客端调用服务

 

比较一下JmsInvokerServiceExporter和EvoquemiServiceExporter:
澳门新浦京8455com 21

自个儿成立叁个接口和落到实处类如下:

package pac.testcase.jms;
public interface JmsRmiService {
    String doServe(String requestedNum);
}

实现:

package pac.testcase.jms;
import org.springframework.stereotype.Service;
@Service
public class JmsRmiServiceImpl implements JmsRmiService {

    public String doServe(String content) {
        System.out.println(content.concat(" has been requested!!"));
        return "your message::".concat(content).concat(":::length:")+content.length();
    }
}

将那么些pojo声明为劳动,在spring配置文件中计划:

<bean id="serverService" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"
    p:serviceInterface="pac.testcase.jms.JmsRmiService"
    p:service-ref="JmsRmiServiceImpl">
</bean>

需将他安装为jms监听器,配置方式和平时的jmsMessageListener的布置相近:

<amq:connectionFactory id="jmsFactory" />
<jms:listener-container
    destination-type="queue"
    connection-factory="jmsFactory"
    concurrency="3"
    container-type="simple">
    <jms:listener  destination="sparta" ref="serverService"  />
</jms:listener-container>

container-type有simple和default,依照区别的type也能够利用task-Executor,这里先轻松记录一下。
先启动jms broker再启动:

new ClassPathXmlApplicationContext("classpath:applicationContext-*.xml").getBean(JmsRmiService.class);

client那边我索要四个调用代理帮本身去调用接口,也正是JmsInvokerProxyFactoryBean;
安插如下:

<amq:connectionFactory id="connectionFactory" />
<bean id="clientService" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"
    p:serviceInterface="pac.test.jms.SenderRmiService"
    p:connectionFactory-ref="connectionFactory"
    p:queueName="sparta"/>

布局中的serviceInterface是client端中依照要调用的不二等秘书技成立的一个接口。
main方法试着调用看看:

public static void main(String[] args) {
    ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
    SenderRmiService service = (SenderRmiService)context.getBean("clientService");
    System.out.println(service.doServe("这才是斯巴达!!"));
}

 

server端输出: 
澳门新浦京8455com 22

 

client端输出: 
澳门新浦京8455com 23

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图