本文介紹了java遠程連接調(diào)用Rabbitmq,分享給大家,希望此文章對各位有所幫助。
富縣網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)公司,富縣網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為富縣超過千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營銷網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的富縣做網(wǎng)站的公司定做!
打開IDEA創(chuàng)建一個maven工程(Java就可以了)。
pom.xml文件如下
4.0.0 com.zhenqi rabbitmq-study 1.0-SNAPSHOT jar rabbitmq-study http://maven.apache.org UTF-8 junit junit 4.12 test com.rabbitmq amqp-client 4.1.0 org.slf4j slf4j-api org.slf4j slf4j-log4j12 1.7.21 commons-lang commons-lang 2.6
為了能遠程訪問rabbitmq,則需要編輯 /etc/rabbitmq/rabbitmq.conf,添加以下內(nèi)容。
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ]
添加administrator角色
rabbitmqctl set_user_tags openstack administrator
創(chuàng)建抽象隊列 EndPoint.java
package com.zhenqi; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Created by wuming on 2017/7/16. */ public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws Exception { this.endPointName = endpointName; //創(chuàng)建一個連接工廠 connection factory ConnectionFactory factory = new ConnectionFactory(); //設(shè)置rabbitmq-server服務(wù)IP地址 factory.setHost("192.168.146.128"); factory.setUsername("openstack"); factory.setPassword("rabbitmq"); factory.setPort(5672); factory.setVirtualHost("/"); //得到 連接 connection = factory.newConnection(); //創(chuàng)建 channel實例 channel = connection.createChannel(); channel.queueDeclare(endpointName, false, false, false, null); } /** * 關(guān)閉channel和connection。并非必須,因為隱含是自動調(diào)用的。 * @throws IOException */ public void close() throws Exception{ this.channel.close(); this.connection.close(); } }
生產(chǎn)者Producer.java
生產(chǎn)者類的任務(wù)是向隊列里寫一條消息
package com.zhenqi; import org.apache.commons.lang.SerializationUtils; import java.io.Serializable; /** * Created by wuming on 2017/7/16. */ public class Producer extends EndPoint { public Producer(String endpointName) throws Exception { super(endpointName); } public void sendMessage(Serializable object) throws Exception { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
消費者QueueConsumer.java
消費者可以以線程方式運行,對于不同的事件有不同的回調(diào)函數(shù),其中最主要的是處理新消息到來的事件。
package com.zhenqi; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import org.apache.commons.lang.SerializationUtils; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by wuming on 2017/7/16. */ public class QueueConsumer extends EndPoint implements Runnable, Consumer { private Logger LOG=Logger.getLogger(QueueConsumer.class); public QueueConsumer(String endpointName) throws Exception { super(endpointName); } public void handleConsumeOk(String s) { } public void handleCancelOk(String s) { } public void handleCancel(String s) throws IOException { } public void handleShutdownSignal(String s, ShutdownSignalException e) { } public void handleRecoverOk(String s) { LOG.info("Consumer "+s +" registered"); } public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { Map map = (HashMap) SerializationUtils.deserialize(bytes); LOG.info("Message Number "+ map.get("message number") + " received."); } public void run() { try{ channel.basicConsume(endPointName, true,this); }catch(IOException e){ e.printStackTrace(); } } }
測試
運行一個消費者線程,然后開始產(chǎn)生大量的消息,這些消息會被消費者取走
package com.zhenqi; import java.util.HashMap; /** * Created by wuming on 2017/7/16. */ public class TestRabbitmq { public static void main(String[] args){ try{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 100000; i++){ HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } }catch(Exception e){ e.printStackTrace(); } } }
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。