真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

java遠程連接調(diào)用Rabbitmq的實例代碼

本文介紹了java遠程連接調(diào)用Rabbitmq,分享給大家,希望此文章對各位有所幫助。

富縣網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)公司,富縣網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為富縣超過千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營銷網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的富縣做網(wǎng)站的公司定做!

打開IDEA創(chuàng)建一個maven工程(Java就可以了)。

java遠程連接調(diào)用Rabbitmq的實例代碼 

pom.xml文件如下


 4.0.0

 com.zhenqi
 rabbitmq-study
 1.0-SNAPSHOT
 jar

 rabbitmq-study
 http://maven.apache.org

 
  UTF-8
 

 
  
   junit
   junit
   4.12
   test
  

  
  
   com.rabbitmq
   amqp-client
   4.1.0
   
    
     org.slf4j
     slf4j-api
    
   
  

  
   org.slf4j
   slf4j-log4j12
   1.7.21
  

  
   commons-lang
   commons-lang
   2.6
  

 


為了能遠程訪問rabbitmq,則需要編輯 /etc/rabbitmq/rabbitmq.conf,添加以下內(nèi)容。

[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

添加administrator角色

rabbitmqctl set_user_tags openstack administrator

創(chuàng)建抽象隊列 EndPoint.java

package com.zhenqi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * Created by wuming on 2017/7/16.
 */
public abstract class EndPoint {

  protected Channel channel;
  protected Connection connection;
  protected String endPointName;

  public EndPoint(String endpointName) throws Exception {

    this.endPointName = endpointName;

    //創(chuàng)建一個連接工廠 connection factory
    ConnectionFactory factory = new ConnectionFactory();

    //設(shè)置rabbitmq-server服務(wù)IP地址
    factory.setHost("192.168.146.128");
    factory.setUsername("openstack");
    factory.setPassword("rabbitmq");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    //得到 連接
    connection = factory.newConnection();

    //創(chuàng)建 channel實例
    channel = connection.createChannel();

    channel.queueDeclare(endpointName, false, false, false, null);
  }

  /**
   * 關(guān)閉channel和connection。并非必須,因為隱含是自動調(diào)用的。
   * @throws IOException
   */
  public void close() throws Exception{
    this.channel.close();
    this.connection.close();
  }
}

生產(chǎn)者Producer.java

生產(chǎn)者類的任務(wù)是向隊列里寫一條消息

package com.zhenqi;

import org.apache.commons.lang.SerializationUtils;

import java.io.Serializable;

/**
 * Created by wuming on 2017/7/16.
 */
public class Producer extends EndPoint {

  public Producer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void sendMessage(Serializable object) throws Exception {
    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
  }
}

消費者QueueConsumer.java

消費者可以以線程方式運行,對于不同的事件有不同的回調(diào)函數(shù),其中最主要的是處理新消息到來的事件。

package com.zhenqi;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by wuming on 2017/7/16.
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {

  private Logger LOG=Logger.getLogger(QueueConsumer.class);

  public QueueConsumer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void handleConsumeOk(String s) {

  }

  public void handleCancelOk(String s) {

  }

  public void handleCancel(String s) throws IOException {

  }

  public void handleShutdownSignal(String s, ShutdownSignalException e) {

  }

  public void handleRecoverOk(String s) {
    LOG.info("Consumer "+s +" registered");
  }

  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    Map map = (HashMap) SerializationUtils.deserialize(bytes);
    LOG.info("Message Number "+ map.get("message number") + " received.");
  }

  public void run() {
    try{
      channel.basicConsume(endPointName, true,this);
    }catch(IOException e){
      e.printStackTrace();
    }
  }
}

 測試

運行一個消費者線程,然后開始產(chǎn)生大量的消息,這些消息會被消費者取走

package com.zhenqi;

import java.util.HashMap;

/**
 * Created by wuming on 2017/7/16.
 */
public class TestRabbitmq {

  public static void main(String[] args){
    try{
      QueueConsumer consumer = new QueueConsumer("queue");
      Thread consumerThread = new Thread(consumer);
      consumerThread.start();

      Producer producer = new Producer("queue");

      for (int i = 0; i < 100000; i++){
        HashMap message = new HashMap();
        message.put("message number", i);
        producer.sendMessage(message);
        System.out.println("Message Number "+ i +" sent.");
      }
    }catch(Exception e){
      e.printStackTrace();
    }

  }
}

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。


新聞標題:java遠程連接調(diào)用Rabbitmq的實例代碼
網(wǎng)頁地址:http://weahome.cn/article/pcsods.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部