ActiveMQ+jms实战.docx
- 文档编号:10321974
- 上传时间:2023-05-25
- 格式:DOCX
- 页数:24
- 大小:27.74KB
ActiveMQ+jms实战.docx
《ActiveMQ+jms实战.docx》由会员分享,可在线阅读,更多相关《ActiveMQ+jms实战.docx(24页珍藏版)》请在冰点文库上搜索。
ActiveMQ+jms实战
ActiveMQ+jms实战
在介绍在介绍ActiveMQ之前,首先简要介绍一下JMS规范。
JMS的简介:
(1)
JMS(JavaMessageService,Java消息服务)是一组Java应用程序接口(JavaAPI),它提供创建、发送、接收、读取消息的服务。
JMS使您能够通过消息收发服务从一个JMS客户机向另一个JML客户机交流消息。
JMS是一种与厂商无关的API,用来访问消息收发系统。
它类似于JDBC(JavaDatabaseConnectivity):
这里,JDBC是可以用来访问许多不同关系数据库的API,而JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。
许多厂商目前都支持JMS,包括IBM的MQSeries、BEA的WeblogicJMSservice和Progress的SonicMQ.
(2)
JMS典型的应用场景:
操作可异步执行.
发email了, 发msn消息了.
或者一些比较耗时的操作, 比如要在某目录下生成一个大报表. 操作者把指令发出去就完事.
[2]
JMS的基本构件:
(1)
Broker
什么是Broker呢?
可以把JMSBrokers看成是服务器端。
这个服务器可以独立运行.也可以随着其他容器
以内嵌方式云心,如下配置:
使用显示的Java代码创建
BrokerServicebroker=newBrokerService();
//configurethebroker
broker.addConnector("tcp:
//localhost:
61616");
broker.start();
使用BrokerFacotry创建
BrokerServicebroker=BrokerFactory.getInstance().createBroker(someURI);
使用SpringBean创建
org/apache/activemq/xbean/activemq.xml”/> 还可以使用XBean或Spring2.0等多种配置方式配置, 通过ActiveMQConnectionFactory还可以隐含的创建内嵌的broker,这个broker就不是一个独立的服务了。 false是QueneDestination,true是TopicDestination 上面的defaultDestination是指默认发送和接收的目的地,我们也可以不指定,而是通过目的地名称让jmsTemplate自动帮我们创建. (2) 1连接工厂 连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。 2连接 JMSConnection封装了客户与JMS提供者之间的一个虚拟的连接。 3会话 JMSSession是生产和消费消息的一个单线程上下文。 会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。 会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。 (3) 目的地: 目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。 JMS1.0.2规范中定义了两种 消息传递域: Point-to-Point消息(P2P),点对点;发布订阅消息(PublishSubscribemessaging,简称Pub/Sub) 两者的区别: P2P消息模型是在点对点之间传递消息时使用。 如果应用程序开发者希望每一条消息都能够被处理,那么应该使用P2P消息模型。 与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。 P2P消息,每个消息只能有一个消费者。 Pub/Sub模型在一到多的消息广播时使用。 如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。 换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。 Pub/Sub,每个消息可以有多个消费者。 在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。 (3) 3.1 消息生产者 消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。 3.2 消息消费者 消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。 消息的消费可以采用以下两种 方法之一: ? 异步消费。 客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。 (异步操作) ? 同步消费。 通过调用消费者的receive方法从目的地中显式提取消息。 receive方法可以一直阻塞到消息到达。 3.3 消息是JMS中的一种类型对象,由两部分组成: 报头和消息主体。 报头由路由信息以及有关该消息的元数据组成。 消息主体则携带着应用程序的数据或有效负载。 根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带: 简单文本(TextMessage)、可序列化的对象(ObjectMessage)、属性集合 (MapMessage)、字节流(BytesMessage)、原始值流(StreamMessage),还有无有效负载的消息(Message)。 (4) JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。 更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。 [3] ActiveMQ简介: ActiveMQ是开源的JMS实现,Geronimo应用服务器就是使用的ActiveMQ提供JMS服务。 安装 在http: //activemq.apache.org/download.html下载5.0.0发行包,解压即可, 启动 window环境运行解压目录下的/bin/activemq.bat 测试 ActiveMQ默认使用的TCP连接端口是61616,通过查看该端口的信息可以测试ActiveMQ是否成功启动 window环境运行 netstat-an|find"61616" 监控 ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。 admin: http: //127.0.0.1: 8161/admin/ demo: http: //127.0.0.1: 8161/demo/ 点击demo应用中的“Marketdatapublisher”,就会发一些测试的消息。 转到admin页面的topicsmenu下面(queue和topic的区别见),可以看到消息在增长。 ActiveMQ5.0的配置文件在解压目录下的/conf目录下面。 主要配置文件为activemq.xml. [4] 实例一: (没有结合spring框架) publicclassQueueProducer{ /* *创建的简图 ConnectionFactory—->Connection—>Session—>Message Destination+Session————————————>Producer Destination+Session————————————>MessageConsumer */ publicstaticvoidmain(String[]args){ //ConnectionFactory: 连接工厂,JMS用它创建连接 ConnectionFactoryconnectionFactory; //Connection: JMS客户端到JMSProvider的连接 Connectionconnection=null; //Session: 一个发送或接收消息的线程 Sessionsession; //Destination: 消息的目的地;消息发送给谁. Queuequeue; //设置回复的目的地 QueuereplyQueue; //MessageProducer: 消息发送者 MessageProducerproducer; MessageConsumerreplyer; connectionFactory=newActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp: //192.168.1.191: 61616"); try{ //构造从工厂得到连接对象 connection=connectionFactory.createConnection(); //启动 connection.start(); //获取操作连接 session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //创建队列: 可以在http: //localhost: 8161/admin/queue.jsp中看到 queue=newActiveMQQueue("jason.queue2"); replyQueue=newActiveMQQueue("jason.replyQueue"); //得到消息生成者【发送者】: 需要由Session和Destination来创建 producer=session.createProducer(queue); //创建消息 TextMessagemessage=session.createTextMessage("jason学习ActiveMq发送的消息"); //在消息中设置回复的目的地, //对方用MessageProducersender=session.createProducer(message.getJMSReplyTo());创建回复者 message.setJMSReplyTo(replyQueue); //发送一个non-Persistent的消息 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.send(message); //发送一个Persistent的消息 producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(session.createTextMessage("这是一个Persistent的消息! 重启JMS,仍可获取")); System.out.println("发送消息: jason学习ActiveMq发送的消息"); System.out.println("这是一个Persistent的消息! 重启JMS,仍可获取"); //用回复的目的地定义回复接收者,且设置侦听 replyer=session.createConsumer(replyQueue); replyer.setMessageListener ( newMessageListener() { publicvoidonMessage(Messagemessage) { try{ TextMessagetxtmess=(TextMessage)message; System.out.println("consumer的回复内容是: "+txtmess.getText()); }catch(Exceptione){ e.printStackTrace(); } } } ); mit(); }catch(Exceptione){ e.printStackTrace(); }finally{ try{ if(null! =connection) connection.close(); }catch(Throwableignore){ } } } } 接收者: //publicclassReceiver{ publicclassQueueConsumerimplementsMessageListener{ publicstaticvoidmain(String[]args) { QueueConsumerre=newQueueConsumer(); //循环只是为了让程序每2秒进行一次连接侦听是否有消息可以获取. while(true) { re.consumeMessage(); try{ Thread.sleep(2000); }catch(InterruptedExceptione){ e.printStackTrace(); } } //对于主动接收的,只须直接执行: re.consumeMessage();即可. //其中的while(true),会一次性将所有的消息获取过来. } publicvoidconsumeMessage() { ConnectionFactoryconnectionFactory; Connectionconnection=null; Sessionsession; Queuequeue; MessageConsumerconsumer; connectionFactory=newActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp: //192.168.1.191: 61616"); try{ connection=connectionFactory.createConnection(); connection.start(); session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); queue=newActiveMQQueue("jason.queue2"); consumer=session.createConsumer(queue); //接受消息方式一: 主动的去接受消息,用consumer.receive //只能获取一条消息–>不采用 // TextMessagemessage=(TextMessage)consumer.receive(1000); // if(null! =message){ // System.out.println("收到消息"+message.getText()); // } //可以不断循环,获取所有的消息.—>关键. // while(true){ // TextMessagemessage=(TextMessage)consumer.receive(1000); // if(null! =message){ // System.out.println("收到消息"+message.getText()); // }else{ // break; //没有消息时,退出 // } // } /*接受消息方式二: 基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。 */ consumer.setMessageListener(this); }catch(Exceptione){ e.printStackTrace(); }finally{ try{ if(null! =connection) connection.close(); }catch(Throwableignore){ } } } publicvoidonMessage(Messagemessage) { try{ if(messageinstanceofTextMessage){ TextMessagetxtmess=(TextMessage)message; System.out.println("收到的消息是: "+txtmess.getText()); //回复发送者 MessageProducersender=session.createProducer(message.getJMSReplyTo()); sender.send(session.createTextMessage("已收到你的消息")); } else System.out.println("收到的消息是: "+message); }catch(Exceptione){ } } } 说明: (2) VMTransport VMtransport允许在VM内部通信,从而避免了网络传输的开销。 这时候采用的连接不是socket连接,而是直接地方法调用。 第一个创建VM连接的客户会启动一个embedVMbroker,接下来所有使用相同的brokername的VM连接都会使用这个broker。 当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。 TCPTransport TCPtransport允许客户端通过TCPsocket连接到远程的broker。 以下是配置语法: tcp: //hostname: port? transportOptions tcp: //localhost: 61616 (3) 3.1 启动activeMQ后,用户创建的queues会被保存在activeMQ解压目录下的\data\kr-store\data中. 3.2 创建queue,可以在代码中创建,也可以直接进入http: //localhost: 8161/admin—>点击queue—>在上面的field中填下你要创建的queue名–>点击创建即可. 3.3 若用户创建的queue,不是持久化的,则在重启activeMQ后,数据文件中的内容会被清空,但文件仍存在. (4) messageProducer发送消息后,会在保存在目的地,即上面的queue中,也即就是在\data\kr-store\data目录下的文件中. messageReceiver(连接到相同目的地的接收者),不需要立即接收.只要activeMQ的服务端不关闭,当运行接收者,连接到activeMQ的服务端时,就可以获取activeMQ服务端上已发送的消息. 发送/接收的消息情况及数量及消息的内容与处理(删除),可以在 http: //localhost: 8161/admin/queue.jsp中查看,操作. (5) 可以将activeMQ的服务端放于一PC中,发送者位于另一PC,接收者也位于另一PC中. 只要: tcp: //activeMQ的服务端IP: activeMQ的服务端口,进行连接即可. (6) queue消息,只被消费一次. topic的实例(无结合spring) publicclassTopicTest{ publicstaticvoidmain(String[]args)throwsException{ ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory("vm: //localhost"); Connectionconnection=factory.createConnection(); connection.start();
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- ActiveMQ jms 实战
![提示](https://static.bingdoc.com/images/bang_tan.gif)