Logstash

A Logstash pipeline has two required elements, input and output, and one optional element, filter. The input plugins consume data from a source, the filter plugins modify the data as you specify, and the output plugins write the data to a destination.

input {
    file {
        path => "/path/to/logstash-tutorial.log"
        start_position => beginning 
    }
}

The default behavior of the file input plugin is to monitor a file for new information, in a manner similar to the UNIX tail -f command. To change this default behavior and process the entire file, we need to specify the position where Logstash starts processing the file.

2016/2/6 posted in  ELK

Introduction

RabbitMQ is a message broker. In essence, it accepts messages from producers, and delivers them to consumers.

Producing means nothing more than sending. A program that sends messages is a producer.

A queue is the name for a mailbox. It lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can be stored only inside a queue. A queue is not bound by any limits, it can store as many messages as you like - it's essentially an infinite buffer. Many producers can send messages that go to one queue - many consumers can try to receive data from one queue.

Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages.

Sending

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

then we can create a connection to the server:

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.

Next we create a channel, which is where most of the API for getting things done resides.

To send, we must declare a queue for us to send to; then we can publish a message to the queue:

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

Declaring a queue is idempotent - it will only be created if it doesn't exist already.

Receiving

Setting up is the same as the sender; we open a connection and a channel, and declare the queue from which we're going to consume.

Message acknowledgement

In order to make sure a message is never lost, RabbotMQ supports message acknowledgments. An ack is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that meesages aren't lost: we need to make both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);

RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that.

At this point we're sure that the task_queue won't be lost even if RabbitMQ restarts. Now we need to make our messages as persistent - by setting MessageProperties to the value PERSISTENT_TEXT_PLAIN.

    channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

RabbitMQ dispatches a mesage when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);  

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives.

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possile because we were using a default exchange, which we identify by the empty string ("")

    channel.basicPublish("", "hello", null, message.getBytes());

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routingKey parameter. To avoid the confusion with a basic_publish parameter we're going to call it a bingding key.

    channel.queueBind(queueName, EXCHANGE_NAME, "black");

The meaning of a binding key depends on the exchange type. The fanout exchanges simply ignored its value.

The routing algorithm behind a direct exchange is simple - a message foes to the queues whose binding key exactly matches the routing key of the message.

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

Messages sent to a topic exchange can't have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. There can be as many words in the routing key as you like, up to the limit of 255 bytes.

The binding key must also be in the same form. The logic behind the topic exchange is similar to a direct one - a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for bing keys:

  • * (star) can be substitute for exactly one word
  • # (hash) can substitute for zero or more words

Exchanges

  • A producer is a user application that sends message.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. On one side the exchange receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives.

Temporary queues

Giving a queue a name is important when you want to share the queue between producers and consumers.

  • Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name.

  • Secondly, once we disconnect the consumer the queue should be automatically deleted.

发送消息

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {
            // 新建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 定义连接工厂的host地址(RabbitMQ Server)
        factory.setHost("localhost");
        // 生成一个新连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
            // 设定指定exhange的消息发送格式
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);
            // 对exchange发送消息
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

接收消息

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 生成一个queue
    String queueName = channel.queueDeclare().getQueue();
    // 将这个queue绑定到指定的exchange上去
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}
2016/2/4 posted in  RabbitMQ

创建Spring MVC之器

HttpServletBean、FrameworkServlet和DispatcherServlet这三个类直接实现三个接口:EnvironmentCapable、EnvironmentAware和ApplicationContextAware。XXXAware在spring里表示对XXX可以感知,通俗点解释就是:如果某个类里面想要使用spring的一些东西,就可以通过实现XXXAware接口告诉spring,spring看到后就会给你送过来,而接收的方式是通过实现接口唯一的方法set-XXX。

比如,有一个类想要使用当前的ApplicationContext,那么我们只需要让它实现ApplicationContextAware接口,然后实现接口中唯一的方法void setApplicationContext(ApplicationContext applicationContext)就可以了,spring会自动调用这个方法将applicationContext传给我们,我们只需要接收就可以了。

2016/2/2 posted in  spring

`Connector`分析

Connector用于接受请求并将请求封装成RequestResponse来具体处理,最底层是使用Socket来进行连接的,RequestResponse是按照HTTP协议来封装的,所以Connector同时实现了TCP/IP协议和HTTP协议。

RequestResponse封装之后交给Container进行处理。Container就是Servlet的容器。

Container处理完之后返回给Connector,最后Connector使用Socket将处理结果返回给客户端,这样整个请求就处理完了。

Connector的结构

Connector中具体是用ProtocolHandler来处理请求的,不同的ProtocolHandler代表不同的连接类型。

ProtocolHandler里面有3个非常重要的组件:EndpointProcessorAdapter
- Endpoint用于处理底层Socket的网络连接 - 用来实现TCP/IP协议
- Processor用于将Endpoint接收到的Socket封装成Request - 用来实现HTTP协议
- Adapter用于将封装好的Request交给Container进行具体处理 - 将请求适配到Servlet容器进行具体处理

Endpoint的抽象实现AbstractEndpoint里面定义的AcceptorAsyncTimeout两个内部类和一个Handler接口。Acceptor用于监听请求,AsyncTimeout用于检查异步request的超时,Handler用于处理接收到的Socket,在内部调用了Processor进行处理。

Connector的使用方法是通过Connector标签配置在conf/server.xml文件中,所以Connector是在Catalinaload方法中根据conf/server.xml配置文件创建Server对象时创建的。Connector的生命周期方法是在Service中调用的。

Connector的创建过程主要是初始化ProtocolHandlerserver.xml配置文件中Connector标签的protocol属性会设置到Connector构造函数的参数中。

Connector的生命周期处理方法中主要调用了ProtocolHandler的相应生命周期方法。
- 在initInternal方法中首先新建了一个Adapter并设置到ProtocolHandler中,然后对ProtocolHandler进行初始化;
- 在startInternal方法中首先判断设置的端口是否小于0,如果小于0就抛出异常,否则就调用ProtocolHandlerstart方法来启动;
- 在stopInternal方法中先设置了生命周期状态,然后调用了ProtocolHandlerstop方法;
- 在destroyInternal方法中除了调用ProtocolHandlerdestroy方法,还会将当前的ConnectorService中剔除并调用父类的destroyInternal方法。

Endpoint用于处理具体连接和传输数据

2016/2/2 posted in  tomcat

深入分析Volatile的实现原理

Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的可见性。它在某些情况下比synchronized的开销更小(因为它不会引起线程上下文的切换和调度)。

可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。

Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量。Java提供了Volatile,在某些情况下比锁更加方便。如果一个字段被声明成VolatileJava线程内存模型确保所有线程看到这个变量的值是一致的。

Java代码: 
instance = new Singleton();//instance是volatile变量

汇编代码:   
0x01a3de1d: movb $0×0,0×1104800(%esi);
0x01a3de24: lock addl $0×0,(%esp);

volatile变量修饰的共享变量进行写操作的时候会多第二行汇编代码,lock前缀的指令在多核处理器下会引发两件事情:
- 将当前处理器缓存行的数据写回到系统内存
- 这个歇会内存的操作会引起其他CPU里缓存该内存地址的数据无效

处理器为了提高处理速度,不直接和内存进行通讯,而是先将系统内存的数据读到内部缓存(L1L2或其他)后再进行操作,但操作完之后不知道何时会写到内存,如果对声明了volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在的缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题。

所以在多处理器下,为了保证每个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线程上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自动缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置为无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存把数据读到处理器缓存里。

2016/1/25 posted in  Java