RabbitMQ的阻塞和初步优化

RabbitMQ占用内存达到峰值(内存的40%)消息链接就会发生阻塞

情况模拟:大量发送消息,RabbitMQ的消息处于堆积状态,内存值持续增大,RabbitMQ 的连接会阻塞。导致发送等待状态,CPU、内存占用很大,发送消息服务器异常。

异常处理结果分析:

管理后台删除queue会大幅度的减少内存的占用,内存释放,链接不会发生阻塞,接受消息正常,CPU也正常。

异常发生原因初步分析:

1. 消费者采用的排他队列(Exclusive)和持久化方式(Durable

持久化方式消息保存在磁盘中,非持久化保存在内存采用临时队列,应该注意以下三点:

  • 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的临时队列的

  • 连接间不允许建立同名的排他队列的

  • 即使该队列是持久化的,一旦连接关闭或者客户端退出,应立即删除队列

我们处理措施:队列修改为一旦连接关闭或者客户端退出,该队列都会被立即删除

channel.queueDeclare(queueName, true, true, true, null);
  • @param queue the name of the queue
  • @param durable true if we are declaring a durable queue (the queue will survive a server restart) 服务器重启
  • @param exclusive true if we are declaring an exclusive queue (restricted to this connection) 一个queue能不能有多个消费者
  • @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 如果一个queue的全部消费者都挂了
  • @param arguments other properties (construction arguments) for the queue
  • @return a declaration-confirm method to indicate the queue was successfully declared

2. 修正程序中的bug,建立MQSender时不建Queue

在之前的程序中,当我们新建一个MQSender总会建一个Queue,而这个Queue没有消费者。P在发送信息时总会同时发送给ExchangeQueue。没有消费者以及持续不断地收消息导致Queue占用内存不断增大,最终拖垮整个RabbitMQ的效能。

异常导致RabbitMQ连接异常处理:

1.管理页面可以访问的情况下

点击queues --> 点击对应的队列名 --> 进入队列详情页面,点击页面底部的delete按钮删除相应的队列,上述操作可以让RabbitMQ的内存下降

2.管理后台不能访问,后台不能启动,后台相关处理

删除所有queues的消息,减小内存 (110为例)

cd /var/lib/rabbitmq/mnesia/rabbit\@template-CentOS6/
rm -rf queues

启动RabbitMQ

service rabbitmq-server start

此时两种方式删除queues

1.访问管理台页面按1的方式删除queues
2.linux管理台删除
rabbitmqctl stop_app 

清除所有队列同时初始化rabbitmq

rabbitmqctl reset

创建登录管理后台的用户用户名 密码

rabbitmqctl add_user 用户名 密码
rabbitmqctl set_user_tags 用户名 administrator
rabbitmqctl set_permissions -p / hao24 ".*" ".*" ".*"

建立连接用户名密码

rabbitmqctl add_user 用户名 密码
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
2016/3/15 posted in  RabbitMQ

Queue

channel.queueDeclare(queueName, true, true, true, null);
  • @param queue the name of the queue
  • @param durable true if we are declaring a durable queue (the queue will survive a server restart) 服务器重启
  • @param exclusive true if we are declaring an exclusive queue (restricted to this connection) 一个queue能不能有多个消费者
  • @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 如果一个queue的全部消费者都挂了
  • @param arguments other properties (construction arguments) for the queue
  • @return a declaration-confirm method to indicate the queue was successfully declared
2016/3/15 posted in  RabbitMQ

Introduction

RabbitMQ is a message broker. In essence, it accepts messages from producers, and delivers them to consumers.

Producing means nothing more than sending. A program that sends messages is a producer.

A queue is the name for a mailbox. It lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can be stored only inside a queue. A queue is not bound by any limits, it can store as many messages as you like - it's essentially an infinite buffer. Many producers can send messages that go to one queue - many consumers can try to receive data from one queue.

Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages.

Sending

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

then we can create a connection to the server:

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.

Next we create a channel, which is where most of the API for getting things done resides.

To send, we must declare a queue for us to send to; then we can publish a message to the queue:

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

Declaring a queue is idempotent - it will only be created if it doesn't exist already.

Receiving

Setting up is the same as the sender; we open a connection and a channel, and declare the queue from which we're going to consume.

Message acknowledgement

In order to make sure a message is never lost, RabbotMQ supports message acknowledgments. An ack is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that meesages aren't lost: we need to make both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);

RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that.

At this point we're sure that the task_queue won't be lost even if RabbitMQ restarts. Now we need to make our messages as persistent - by setting MessageProperties to the value PERSISTENT_TEXT_PLAIN.

    channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

RabbitMQ dispatches a mesage when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);  

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives.

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possile because we were using a default exchange, which we identify by the empty string ("")

    channel.basicPublish("", "hello", null, message.getBytes());

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routingKey parameter. To avoid the confusion with a basic_publish parameter we're going to call it a bingding key.

    channel.queueBind(queueName, EXCHANGE_NAME, "black");

The meaning of a binding key depends on the exchange type. The fanout exchanges simply ignored its value.

The routing algorithm behind a direct exchange is simple - a message foes to the queues whose binding key exactly matches the routing key of the message.

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

Messages sent to a topic exchange can't have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. There can be as many words in the routing key as you like, up to the limit of 255 bytes.

The binding key must also be in the same form. The logic behind the topic exchange is similar to a direct one - a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for bing keys:

  • * (star) can be substitute for exactly one word
  • # (hash) can substitute for zero or more words

Exchanges

  • A producer is a user application that sends message.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. On one side the exchange receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives.

Temporary queues

Giving a queue a name is important when you want to share the queue between producers and consumers.

  • Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name.

  • Secondly, once we disconnect the consumer the queue should be automatically deleted.

发送消息

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {
            // 新建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 定义连接工厂的host地址(RabbitMQ Server)
        factory.setHost("localhost");
        // 生成一个新连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
            // 设定指定exhange的消息发送格式
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);
            // 对exchange发送消息
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

接收消息

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 生成一个queue
    String queueName = channel.queueDeclare().getQueue();
    // 将这个queue绑定到指定的exchange上去
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}
2016/2/4 posted in  RabbitMQ