Java NIO杂2

scatter和gather用于描述从Channel中读取或者写入到Channel的操作

分散(scatter)从Channel中读取是指在读操作时将读取的数据写入多个buffer中。

聚集(gather)写入Channel是指在写操作时将多个buffer的数据写入同一个Channel。

scatter和gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

Scattering Reads

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { hearder, body};

channel.read(bufferArray);

read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,channel紧接着向另一个buffer中写。

Scattering Reads在移动到下一个buffer前,必须填满当前的buffer,这也意味着它不适用于动态消息(消息大小不固定)。换句话说,如果存在消息头和消息体,消息头必须完成填充,Scattering Reads才能正常工作。

Gathering Writes

Gathering Writes是指数据从多个buffer写入到同一个channel

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

//write data into buffers

ByteBuffer[] bufferArray = { header, body };

channel.write(bufferArray);

write()方法会按照buffer在数组中的顺序,将数据写入到channel,注意只有position和limit之间的数据才能被写入。

Gathering Writes能较好的处理动态消息。

在Java NIO中,如果两个通道中有一个是FileChannel,那你可以直接将数据从一个channel传输到另一个channel。

transferFrom()

FileChannel的transferFrom()方法可以将数据从源通道传输到FileChannel中。

RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel      fromChannel = fromFile.getChannel();

RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel      toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();

toChannel.transferFrom(position, count, fromChannel);

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样一个单独的线程可以管理多个channel,从而管理多个网络连接。

为什么使用Selector

可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源。

下面是单线程使用一个Selector处理3个channel的示例:

Selector的创建

通过调用Selector.open()方法创建一个Selector,如下:

Selector selector = Selector.open();
向Selector注册通道

为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。

register()方法的第二个参数。这是一个interest集合,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

  • Connect
  • Accept
  • Read
  • Write

某个channel成功连接到另一个服务器称为连接就绪SelectionKey.OP_CONNECT

一个server socket channel准备好接收新进入的连接称为接收就绪SelectionKey.OP_ACCEPT

一个有数据可读的通道可以说是读就绪SelectionKey.OP_READ

等待写数据的通道可以说是写就绪SelectionKey.OP_WRITE

如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些你感兴趣的属性:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加的对象(可选)
interest集合

interest集合是你所选择的感兴趣的事件集合。

可以通过SelectionKey读写interest集合:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

用'位与'操作interest集合和给定的selectionKey常量,可以确定某个确定的事件是否在interest集合中。

ready集合

ready集合是通道已经准备就绪的操作的集合。在一次选择(selection)之后,你会首先访问这个ready set。

int readySet = selectionKey.readyOps();
Channel + Selector

从SelectionKey访问Channel和Selector:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();
附加的对象

可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。

例如,可以附加与通道一起使用的Buffer,或者包含聚集数据的某个对象,使用方法:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以在用register()方法向Selector注册Channel的时候附加对象。

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
通过Selector选择通道

一旦向Selector注册了一个或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件、已经准备就绪的那些通道。

也就是,如果你对'读就绪'的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

  • select()阻塞到至少有一个通道在你注册的事件上就绪了。

  • select(long timeout)select()一样,除了最长会阻塞timeout毫秒(参数)。

  • selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。

select()方法返回的int值表示有多少通道已经就绪。自上次调用select()方法后有多少通道变成就绪状态。

如果调用`select()`方法,因为有一个通道变成就绪状态,返回了1
若再次调用`select()`方法,如果另一个通道就绪了,它会再次返回1。

如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,
但在每次`select()`方法调用之间,只有一个通道就绪了。
selectedKeys()

一旦调用了select()方法,并且返回值表明有一个或者多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问'已选择键集(selected key set)'中的就绪通道。如下所示:

Set selectedKeys = selector.selectedKeys();

当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

这个循环遍历已选择键集中的每个键,并检测各个键所对应的通道的就绪事件。

注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。

SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。

wakeUp()

某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即'醒来(wake up)'

close()

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

完整的示例

这里是一个完整的示例,打开一个Selector,注册一个通道注册到这个Selector上,然后持续监控这个Selector的四种事件(接受,连接,读,写)是否就绪

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}

Reference

http://ifeve.com/java-nio-scattergather/

http://ifeve.com/java-nio-channel-to-channel/

http://ifeve.com/selectors/

2015/10/25

Java NIO杂1

Channel和Buffer

基本上,所有的IO在NIO中都从一个Channel开始。

数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中。

Selector

Selector允许单线程处理多个Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。

img/javaio1.png

要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,

Channel

Java NIO的通道类似流,但又有些不同:

  • 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
  • 通道可以异步地读写。
  • 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

Channel的实现

这些是Java NIO中最重要的通道的实现:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

FileChannel 从文件中读写数据。

DatagramChannel 能通过UDP读写网络中的数据。

SocketChannel 能通过TCP读写网络中的数据。

ServerSocketChannel可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

基本的Channel示例

RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

//create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buf); //read into buffer.
while (bytesRead != -1) 
{
  buf.flip();  //make buffer ready for read

  while(buf.hasRemaining())
  {
      System.out.print((char) buf.get()); // read 1 byte at a time
  }

  buf.clear(); //make buffer ready for writing
  bytesRead = inChannel.read(buf);
}
aFile.close();

注意buf.flip()的调用,首先读取数据到Buffer,然后反转Buffer,接着再从Buffer中读取数据

Buffer

Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中。

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。

Buffer的基本用法

使用Buffer读写数据一般遵循以下四个步骤:

  1. 写入数据到buffer
  2. 调用flip()方法
  3. 从buffer中读取数据
  4. 调用clear()方法或者compact()方法

当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将buffer从写模式切换到读模式在读模式下,可以读取之前写入到buffer的所有数据

一旦读完了所有的数据,就需要清空缓冲区,让它们可以再次被写入。有两种方式能清空缓冲区:调用clear()或者compact()方法。clear()方法会清空整个缓冲区compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

Buffer的capacity, position和limit

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。

position和limit的含义取决于Buffer处在读模式还是写模式。不管Buffer处在什么模式,capacity的含义总是一样的。

img/javaio2.png

capacity

作为一个内存块,Buffer有一个固定的大小值。一旦Buffer满了,需要将其清空(通过读数据或者清楚数据)才能继续写数据往里写数据。

position

当你写数据到Buffer中时,position表示当前的位置。初始的position值为0,当一个byte, long等数据写到Buffer后,position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity-1

当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0.当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。

limit

在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。在写模式下,limit等于Buffer的capacity。

当切换Buffer到读模式的,limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。

Buffer的分配

要想获得一个Buffer对象首先要进行分配。每一个Buffer类都有一个allocate方法

向Buffer中写数据

写数据到Buffer有两种方式:

  • 从Channel写到Buffer
  • 通过Buffer的put()方法写到Buffer里

从Channel写到Buffer的例子

int bytesRead = inChannel.read(buf); // read into buffer

通过put方法写Buffer的例子:

buf.put(127);
flip()方法

flip方法将Buffer从写模式切换到读模式。调用flip()方法会将position设回0,并将limit设置成之前position的值。

从Buffer中读取数据

从Buffer中读取数据有两种方式:

  • 从Buffer读取数据到Channel
  • 使用get()方法从Buffer中读取数据

从Buffer读取数据到Channel的例子

int bytesWritten = inChannel.write(buf);

使用get()方法从Buffer中读取数据的例子

byte aByte = buf.get();
rewind()方法

Buffer.rewind()将position设为0,所以你可以重读Buffer中的所有数据。limit保持不变,任何表示能从Buffer读取多少元素。

clear()与compact()方法

一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或者compact()方法来完成。

如果调用的是clear()方法,position将被设回0,limit被设置为capacity的值。换句话说,Buffer被清空了。Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据。

如果Buffer中有一些未读的数据,调用clear()方法,数据将‘被遗忘’,意味着不再有任何标记会告诉你哪些数据被读过,哪些还没有。

如果Buffer中仍有未读的数据,且后续还需要这些数据,但是此时想要先写些数据,那么使用compact()方法

compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。

mark()和reset()方法

通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。

buffer.mark();
// call buffer.get() a couple of times, e.g. during parsing
buffer.reset();
equals()与compareTo()方法

equals()方法

当满足下列条件时,表示两个Buffer相等:

  • 有相同的类型
  • Buffer中剩余的byte、char等的个数相等
  • Buffer中所有剩余的byte、char等都相同

equals只比较Buffer中的剩余元素

compareTo()方法

compareTo()方法比较两个Buffer的剩余元素。如果满足下列条件,则认为一个Buffer小于另一个Buffer:

  • 第一个不相等的元素小于另一个Buffer中对应的元素
  • 所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素比另一个少)

Reference

http://ifeve.com/overview/

http://ifeve.com/channels/

http://ifeve.com/buffers/

2015/10/25

Redis杂

为什么Redis不支持回滚(rollback)

  • Redis命令只会因为错误的语法而失败,或者是命令用在了错误类型的键上
  • 因为不需要对回滚进行支持,所以Redis的内部可以保持简单而快速

在通常情况下, 回滚并不能解决编程错误带来的问题。 举个例子, 如果你本来想通过 INCR 命令将键的值加上 1 , 却不小心加上了 2 , 又或者对错误类型的键执行了 INCR , 回滚是没有办法处理这些情况的。

鉴于没有任何机制能避免程序员自己造成的错误, 并且这类错误通常不会在生产环境中出现, 所以 Redis 选择了更简单、更快速的无回滚方式来处理事务。


watch的作用是什么,跟直接用multi有什么区别

http://stackoverflow.com/questions/10750626/transactions-and-watch-statement-in-redis

Redis持久化

Redis提供了多种不同级别的持久化方式:

  • RDB持久化可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot)
  • AOF持久化记录服务器执行的所有写操作命令,并在服务器启动时,通过重新执行这些命令来还原数据集。AOF文件中的命令全部以Redis协议的格式来保存,新命令会被追加到文件的末尾。Redis还可以在后台对AOF文件进行重写,使得AOF文件的体积不会超出保存数据集状态所需的实际大小。
  • Redis还可以同时使用AOF持久化和RDB持久化。在这种情况下,当Redis重启时,它会优先使用AOF文件来还原数据集,因为AOF文件保存的数据集通常比RDB文件所保存的数据集更完整。
  • 持久化功能可以被关闭,数据只在服务器运行时存在。
RDB的优点
  • RDB是一个非常紧凑(compact)的文件,它保存了Redis在某个时间点上的数据集。这种文件非常适合于进行备份。
  • RDB非常适合用于灾难恢复(disaster recovery):它只有一个文件,并且内容都非常紧凑,可以(在加密后)将它传送到别的数据中心
  • RDB可以最大化Redis的性能:父进程在保存RDB文件时唯一要做的就是fork出一个子进程,然后这个子进程就会处理接下来的所有保存工作,父进程无需执行任何磁盘I/O操作。
  • RDB在恢复大数据集时的速度比AOF的恢复速度要快。
RDB的缺点
  • 如果你需要尽量避免在服务器故障时丢失数据,那么RDB不适合你。因为RDB文件需要保存整个数据集的状态,所以它并不是一个轻松的操作。因此可能会至少5分钟才保存一次RDB文件。在这种情况下,一旦发生故障停机,你就可能会丢失几分钟的数据。
  • 每次保存RDB的时候,Redis都要fork()出一个子进程,并由子进程来进行实际的持久化工作。当数据集比较庞大时,fork()可能非常耗时,造成服务器可能长达一秒停止处理客户端。
AOF的优点
  • 使用AOF持久化会让Redis变得非常耐久(much more durable):你可以设置不同的fsync策略,比如无fsync,每秒钟一次fsync,或者每次执行写入命令时fsync。AOF的默认策略为每秒钟fsync一次,在这种配置下,Redis仍然可以保持良好的性能,并且就算发生故障停机,也最多只会丢失一秒种的数据。
  • AOF文件是一个只进行追加操作的日志文件(append only log),因此对AOF文件的写入不需要进行seek,即使日志因为某些原因而包含了未写入完整的命令(比如磁盘已满),redis-check-aof工具也可以轻易地修复这种问题。
  • Redis可以在AOF文件体积变得过大时,自动地在后台对AOF进行重写:重写后的新AOF文件包含了恢复当前数据集所需的最小命令集合。整个重写操作是绝对安全的,因为Redis在创建新AOF文件的过程中,会继续将命令追加到现有的AOF里面,即使重写过程中发生停机,现有的AOF文件也不会丢失。一旦新AOF文件创建完毕,Redis就会从旧AOF文件切换到新AOF文件,并开始对新AOF文件进行追加操作。

    举个例子, 如果你对一个计数器调用了 100 次 INCR , 那么仅仅是为了保存这个计数器的当前值, AOF 文件就需要使用 100 条记录(entry)。

    然而在实际上, 只使用一条 SET 命令已经足以保存计数器的当前值了, 其余 99 条记录实际上都是多余的。

    为了处理这种情况, Redis 支持一种有趣的特性: 可以在不打断服务客户端的情况下, 对 AOF 文件进行重建(rebuild)。

  • AOF文件有序地保存了对数据库执行的所有写入操作,这些写入操作以Redis协议的格式保存,因此AOF文件的内容非常容易被人读懂,对文件进行分析(parse)也很轻松。导出(export)AOF文件也非常简单。

    举个例子,如果你不小心执行了FLUSHALL命令,只要AOF文件未被重写,那么只要停止服务器,移除AOF文件末尾的FLUSHALL命令,并重启Redis,就可以将数据集恢复到FLUSHALL执行之前的状态。

AOF的缺点
  • 对于相同的数据集来说,AOF文件的体积通常要大于RDB文件的体积
  • 根据所使用的fsync策略,AOF的速度可能会慢于RDB。在一般情况下,每秒fsync的性能依然非常高,而关闭fsync可以让AOF的速度和RDB一样快,即使在高负荷之下也是如此。不过在处理巨大的写入载入时,RDB可以提供更有保证的最大延迟时间(latency)
2015/10/25

NIO入门

NIO入门

I/O简介

I/O指的是计算机与外部世界或者一个程序与计算机的其余部分之间的接口。

在Java编程中,直到最近一直使用流的方式完成I/O。所有I/O都被视为单个的字节的流动,通过一个称为Stream的对象一次移动一个字节。流I/O用于与外部世界接触。它也在内部使用,用于将对象转换为字节,然后再转换回对象。

NIO与原来的I/O有同样的作用和目的,但是它使用不同的方式:块I/O

块I/O的效率比流I/O高很多

流与块的比较

面向流的I/O系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。为流式数据创建过滤器非常容易。链接几个过滤器,以便每个过滤器只负责单个复杂处理机制的一部分。不利的一面是,面向流的I/O通常相当慢。

一个面向块的I/O系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据库。按块处理数据比按(流式的)字节处理数据要快得多。但是面向块的I/O缺少一些面向流的I/O所具有的优雅型和简单性。

通道和缓冲区是NIO中的核心对象,几乎在每一个I/O操作中都要使用它们。

通道是对原I/O包中的流的模拟。到任何目的地的所有数据都必须通过一个Channel对象。一个Buffer实质上是一个容器对象。发送给一个通道的所有对象都必须首先放到缓冲区中;同样地,从通道中读取的任何数据都要读到缓冲区中。

什么是缓冲区

Buffer是一个对象,它包含一些要写入或者刚读出的数据。

在面向流的I/O中,你是将数据直接写入或者将数据直接读到Stream对象中。

在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。在写入数据时,它是写入缓冲区中的。

缓冲区实质上是一个数组。通常它是一个字节数组,但是也可以使用其他种类的数组。但是一个缓冲区不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。

什么是通道

Channel是一个对象,可以通过它读取和写入数据。

所有数据都通过Buffer对象来处理。你不会将字节直接写入通道中,相反,你是将数据写入包含一个或者多个字节的缓冲区。

通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),而通道可以用于读、写或者同时用于读写。

从文件中读取

我们将从一个文件中读取一些数据。如果使用原来的 I/O,那么我们只需创建一个 FileInputStream 并从它那里读取。而在 NIO 中,情况稍有不同:我们首先从 FileInputStream 获取一个 Channel 对象,然后使用这个通道来读取数据。

在 NIO 系统中,任何时候执行一个读操作,您都是从通道中读取,但是您不是 直接 从通道读取。因为所有数据最终都驻留在缓冲区中,所以您是从通道读到缓冲区中。

因此读取文件涉及三个步骤:(1) 从 FileInputStream 获取 Channel,(2) 创建 Buffer,(3) 将数据从 Channel 读到 Buffer 中。

读文件

第一步是获取通道。我们从 FileInputStream 获取通道:

FileInputStream fin = new FileInputStream( "readandshow.txt" );
FileChannel fc = fin.getChannel();

下一步是创建缓冲区:

ByteBuffer buffer = ByteBuffer.allocate( 1024 );

最后,需要将数据从通道读到缓冲区中,如下所示:

fc.read( buffer );

我们不需要告诉通道要读多少数据到缓冲区中。

写入文件

首先从 FileOutputStream 获取一个通道:

FileOutputStream fout = new FileOutputStream( "writesomebytes.txt" );
FileChannel fc = fout.getChannel();

下一步是创建一个缓冲区并在其中放入一些数据 - 在这里,数据将从一个名为 message 的数组中取出,这个数组包含字符串 "Some bytes" 的 ASCII 字节(本教程后面将会解释 buffer.flip() 和 buffer.put() 调用)。

ByteBuffer buffer = ByteBuffer.allocate( 1024 );

for (int i=0; i<message.length; ++i) {
     buffer.put( message[i] );
}
buffer.flip();

最后一步是写入缓冲区中:

fc.write( buffer );

不需要告诉通道要写入多数据。

缓冲区内部细节

每一个读/写操作都会改变缓冲区的状态。通过记录和跟踪这些变化,缓冲区就能内部地管理自己的资源

状态变量

可以用三个值指定缓冲区在任意时刻的状态:

  • position
  • limit
  • capacity

这三个变量一起可以跟踪缓冲区的状态和它所包含的数据。

Position

缓冲区实际上就是美化了的数组。在从通道读取是,你将所读取的数据放到底层的数组中。position变量跟踪已经写了多少数据,即它制定了下一个字节将放到数组的哪一个元素中。

limit

limit变量表面还有多少数据需要取出,或者还有多少空间可以放入数据

position 总是小于或者等于 limit

capacity

缓冲区的capacity表明可以储存在缓冲区中的最大数据容量。实际上,它指定了底层数组的大小,或者至少指定了准许我们使用的底层数组的容量

limit绝不能大于capacity

flip

flip() 方法。这个方法做两件非常重要的事:

  • 它将 limit 设置为当前 position
  • 它将 position 设置为 0

clear

clear() 方法。这个方法重设缓冲区以便接收更多的字节。 Clear 做两种非常重要的事情:

  • 它将 limit 设置为与 capacity 相同。
  • 它设置 position 为 0。

缓冲区分配和包装

在能够读和写之前,必须有一个缓冲区。要创建缓冲区,您必须 分配 它。我们使用静态方法 allocate() 来分配缓冲区:

ByteBuffer buffer = ByteBuffer.allocate( 1024 );

allocate() 方法分配一个具有指定大小的底层数组,并将它包装到一个缓冲区对象中 ― 在本例中是一个 ByteBuffer。
您还可以将一个现有的数组转换为缓冲区,如下所示:

byte array[] = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap( array );
缓冲区分片

slice()方法根据现有的缓冲区创建一种子缓冲区。也就是说,它创建一个新的缓冲区,新缓冲区与原来的缓冲区的一部分共享数据。

我们首先创建一个长度为 10 的 ByteBuffer:

ByteBuffer buffer = ByteBuffer.allocate( 10 );

然后使用数据来填充这个缓冲区,在第 n 个槽中放入数字 n:

for (int i=0; i<buffer.capacity(); ++i) {
     buffer.put( (byte)i );
}

现在我们对这个缓冲区 分片 ,以创建一个包含槽 3 到槽 6 的子缓冲区。在某种意义上,子缓冲区就像原来的缓冲区中的一个 窗口 。
窗口的起始和结束位置通过设置 position 和 limit 值来指定,然后调用 Buffer 的 slice() 方法:

buffer.position( 3 );
buffer.limit( 7 );
ByteBuffer slice = buffer.slice();

片 是缓冲区的 子缓冲区 。不过, 片段 和 缓冲区 共享同一个底层数据数组

缓冲区分片和数据共享

我们遍历子缓冲区,将每一个元素乘以 11 来改变它。例如,5 会变成 55。

for (int i=0; i<slice.capacity(); ++i) {
     byte b = slice.get( i );
     b *= 11;
     slice.put( i, b );
}

最后,再看一下原缓冲区中的内容:

buffer.position( 0 );
buffer.limit( buffer.capacity() );

while (buffer.remaining()>0) {
     System.out.println( buffer.get() );
}

结果表明只有在子缓冲区窗口中的元素被改变了:

$ java SliceBuffer
0
1
2
33
44
55
66
7
8
9
只读缓冲区

通过调用缓冲区的 asReadOnlyBuffer() 方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区(并与其共享数据),只不过它是只读的。

直接和间接缓冲区

直接缓冲区 是为加快 I/O 速度,而以一种特殊的方式分配其内存的缓冲区。

内存映射文件I/O

内存映射文件 I/O 是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的 I/O 快得多。

将文件映射到内存

下面代码行将文件的前 1024 个字节映射到内存中:

MappedByteBuffer mbb = fc.map( FileChannel.MapMode.READ_WRITE,
    0, 1024 );

map() 方法返回一个 MappedByteBuffer,它是 ByteBuffer 的子类

分散和聚集

分散/聚集 I/O 是使用多个而不是单个缓冲区来保存数据的读写方法。

文件锁定

要获取文件的一部分上的锁,您要调用一个打开的 FileChannel 上的 lock() 方法。注意,如果要获取一个排它锁,您必须以写方式打开文件。

RandomAccessFile raf = new RandomAccessFile( "usefilelocks.txt", "rw" );
FileChannel fc = raf.getChannel();
FileLock lock = fc.lock( start, end, false );

在拥有锁之后,您可以执行需要的任何敏感操作,然后再释放锁:

lock.release();

在释放锁后,尝试获得锁的其他任何程序都有机会获得它。

异步I/O

异步 I/O 是一种没有阻塞地读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。

异步 I/O 的一个优势在于,它允许您同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,您可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程

异步 I/O 中的核心对象名为 Selector。Selector 就是您注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。

我们需要做的第一件事就是创建一个 Selector:

Selector selector = Selector.open();

然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register() 的第一个参数总是这个 Selector。

为了接收连接,我们需要一个 ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );

ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[i] );
ss.bind( address );

第一行创建一个新的 ServerSocketChannel ,最后三行将它绑定到给定的端口。第二行将 ServerSocketChannel 设置为 非阻塞的 。

下一步是将新打开的 ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:

SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

SelectionKey 代表这个通道在此 Selector 上的这个注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。

现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:

int num = selector.select();

Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();

while (it.hasNext()) {
     SelectionKey key = (SelectionKey)it.next();
     // ... deal with I/O event ...
}

首先,我们调用 Selector 的 select() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。
接下来,我们调用 Selector 的 selectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的一个 集合 。
我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,您必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。

因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:

ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();

下一步是将新连接的 SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:

sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );

Reference

http://www.ibm.com/developerworks/cn/education/java/j-nio/section3.html

http://blog.csdn.net/sgbfblog/article/details/7904757

2015/10/25

Tomcat源码阅读

Tomcat

Server

Server是Tomcat中最顶层的组件,它可以包含多个Service组件。

在Tomcat源码中Server组件对应源码中的org.apache.catalina.core.StandardServer

StandardServer的继承关系图如下图所示:

Service

Service组件相当于Connector和Engine组件的包装器,它将一个或者多个Connector组件和一个Engine建立关联。

在缺省的配置文件中,定义了一个叫Catalina的Service,并将Http,AJP这两个Connector关联到了一个名为Catalina的Engine。

Service组件对应Tomcat源码中的org.apache.catalina.core.StandardService

StandardService的继承关系图如下图所示:

Connector

Connector是Tomcat中监听TCP网络连接的组件,一个Connector会监听一个独立的端口来处理来自客户端的连接。

缺省的情况下Tomcat提供了如下两个Connector:

  • HTTP/1.1

    <Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
    

    上面定义了一个Connector,它缺省监听端口8080,这个端口我们可以根据具体情况进行改动。connectionTimeout定义了连接超时时间,单位是毫秒,redirectPort定义了ssl的重定向接口,根据缺省的配置,Connector会将ssl请求重定向到8443端口。

  • AJP/1.3

    AJP表示Apache Jserv Protocol,此连接器将处理Tomcat和Aapache http服务器之间的交互,这个连机器是用来处理我们将Tomcat和Apache http服务器结合使用的情况。假如在同样的一台物理Server上面部署了一台Apache http服务器和多台Tomcat服务器,通过Apache服务器来处理静态资源以及负载均衡的时候,针对不同的Tomcat实例需要AJP监听不同的端口。

Connector对应源代码中的org.apache.catalina.connector.Connector

它的继承关系图如下所示:

Engine

Tomcat中有一个容器的概念,而Engine,Host,Context都属于Container。

一个Engine可以包含一个或者多个Host,也就是说我们一个Tomcat的实例可以配置多个虚拟主机。

缺省的情况下<Engine name="Catalina" defaultHost="localhost">定义了一个名称为Cataline的Engine。

Engine对应源代码中的org.apache.catalina.core.StandardEngine

它的继承关系图如下图所示:

Host

Host定义了一个虚拟主机,一个虚拟主机可以有多个Context。

缺省的配置为<Host name="localhost" appBase="webapps" unpackWARs="true" autoDeploy="true">….</Host>

其中appBase为webapps,也就是<CATALINA_HOME>\webapps目录

unpackingWARS属性指定在appBase指定的目录中的war包都自动的解压,缺省配置为true

autoDeploy属性指定是否对加入到appBase目录的war包进行自动的部署,缺省为true。

Host对应源代码中的org.apache.catalina.core.StandardHost,它的继承关系图如下所示:

Context

在Tomcat中,每一个运行的webapp其实最终都是以Context的形成存在,每个Context都有一个根路径和请求URL路径。

Context对应源代码中的org.apache.catalina.core.StandardContext

它的继承关系图如下图所示:

在Tomcat中我们通常采用如下的两种方式创建一个Context

这样的话,我们就可以通过http://host:port/mypath访问上面配置的context了。

Valve

Valve是Tomcat中责任链模式的实现,通过链接多个Valve对请求进行处理。

其中Valve可以定义在任何的Container(Engine,Host,Context)中。

默认定义了一个名为org.apache.catalina.valves.AccessLogValve的Valve,这个Valve负责拦截每个请求,然后记录一条访问日志。

通过上面的分析,我们发现Server,Service,Engine,Host,Context都实现了org.apache.catalina.Lifecycle接口,通过这个接口管理了这些核心组件的生命周期。

生命周期方法的总体的骨架,如果用伪代码来表示可以简化为如下:
org.apache.catalina.util.LifecycleBase#lifeCycleMethod

public final synchronized void lfieCycleMethod() throws LifecycleException {
    stateCheck();//状态检查
    //设置为进入相应的生命周期之前的状态
    setStateInternal(LifecycleState.BEFORE_STATE, null, false);
    lfieCycleMethodInternal();//钩子方法
    //进入相应的生命周期之后的状态
    setStateInternal(LifecycleState.AFTER_STATE, null, false);
}

通用调用过程:

org.apache.catalina.core.StandardServer#init
->org.apache.catalina.core.StandardService#init
-->org.apache.catalina.connector.Connector#init
-->org.apache.catalina.core.StandardEngine#init

StandardEngine#init方法的时候,发现并没有进行StandardHost的初始化

StarndardHost的init方法是在调用start方法的时候被初始化

org.apache.catalina.startup.Bootstrap#start
->org.apache.catalina.startup.Catalina#start 通过反射调用
-->org.apache.catalina.core.StandardServer#start
--->org.apache.catalina.core.StandardService#start
---->org.apache.catalina.core.StandardEngine#start
---->org.apache.catalina.Executor#start
---->org.apache.catalina.connector.Connector#start


org.apache.catalina.startup.Bootstrap#main
->org.apache.catalina.startup.Bootstrap#init
->org.apache.catalina.startup.Bootstrap#load
-->org.apache.catalina.startup.Catalina#load
--->org.apache.catalina.core.StandardServer#init
---->org.apache.catalina.core.StandardService#init
----->org.apache.catalina.connector.Connector#init
----->org.apache.catalina.core.StandardEngine#init
->org.apache.catalina.startup.Bootstrap#start
-->org.apache.catalina.startup.Catalina#start 通过反射调用
--->org.apache.catalina.core.StandardServer#start
---->org.apache.catalina.core.StandardService#start
----->org.apache.catalina.core.StandardEngine#start
----->org.apache.catalina.Executor#start
----->org.apache.catalina.connector.Connector#start

StandardEngine初始化的时候,也是初始化了一个线程池;而StandardHost也初始化了一个线程池。他们的不同点在与创建线程的工厂方法不同,在采用缺省配置的情况下,StandardEngine的线程池中的线程是以Catalina-startStop的形式命名的,而StandardHost是以localhost-startStop的方式进行命名的。

对于StandardEngineStandardHost的启动,父容器在init的时候创建一个启动和停止子容器的线程池,然后父容器启动的时候首先通过异步的方式将子容器的启动通过org.apache.catalina.core.ContainerBase.StartChild提交到父容器中对应的线程池中进行启动,而子容器启动的时候首先会初始化,然后再启动。

webapps目录的应用又是如何启动的呢?webapps目录下面的应用其实是属于Context的,而Context对应Tomcat中的StandardContext类。

webapps目录中应用的启动在StandardHost#start的时候,通过Lifecycle.START_EVENT这个事件的监听器HostConfig进行进一步的启动。

Dameon线程

Dameon线程又叫后台或者守护线程,它负责在程序运行期提供一种通用服务的线程,比如垃圾收集线程。

非Dameon线程Dameon线程的区别就在于当程序中所有的非Daemon线程都终止的时候,Jvm会杀死余下的Dameon线程,然后退出。

Tomcat启动以后,会启动6条线程,他们分别如下:
"ajp-bio-8009-AsyncTimeout" daemon prio=5 tid=7f8738afe000 nid=0x115ad6000 waiting on condition [115ad5000]

"ajp-bio-8009-Acceptor-0" daemon prio=5 tid=7f8738b05800 nid=0x1159d3000 runnable [1159d2000]

"http-bio-8080-AsyncTimeout" daemon prio=5 tid=7f8735acb800 nid=0x1158d0000 waiting on condition [1158cf000]

"http-bio-8080-Acceptor-0" daemon prio=5 tid=7f8735acd000 nid=0x1157cd000 runnable [1157cc000]

"ContainerBackgroundProcessor[StandardEngine[Catalina]]" daemon prio=5 tid=7f8732850800 nid=0x111203000 waiting on condition [111202000]

"main" prio=5 tid=7f8735000800 nid=0x10843e000 runnable [10843c000]

其中5条是Dameon线程,而对于Java程序来说,当所有非Dameon程序都终止的时候,Jvm就会退出,因此要想终止Tomcat就只需要将main这一条非Dameon线程终止了即可。

Tomcat启动的时候的主线程会在8005端口(默认配置,可以更改)上建立socket监听,当关闭的时候,最终其实就是新起了一个进程然后向Tomcat主线程监听的8005端口发送了一个SHUTDOWN字符串,这样主线程就会结束了,主线程结束了以后,因为其它的线程都是dameon线程,这样依赖Jvm就会退出了。

当用户请求服务器的时候,Connector会接受请求,从Socket连接中根据http协议解析出对应的数据,构造RequestResponse对象,然后传递给后面的容器处理,顶层容器是StandardEngineStandardEngine处理请求其实是通过容器的Pipeline进行的,而Pipeline其实最终是通过管道上的各个阀门进行的,当请求到达StandardEngineValve的时候,此阀门会将请求转发给对应StandardHost的Pipeline的第一个阀门处理,然后以此最终到达StandardHostValve阀门,它又会将请求转发StandardContext的Pipeline的第一个阀门,这样以此类推,最后到达StandardWrapperValve,此阀门会根据Request来构建对应的Servelt,并将请求转发给对应的HttpServlet处理。

HTTP请求行和请求头

GET /contextpath/querystring HTTP/1.1

Host: 127.0.0.1:8080

User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:23.0) Gecko/20100101 Firefox/23.0

Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8

Accept-Language: en-US,en;q=0.5

Accept-Encoding: gzip, deflate

Cookie: JSESSIONID=9F5897FEF3CDBCB234C050C132DCAE52; __atuvc=384%7C39; __utma=96992031.358732763.1380383869.1381468490.1381554710.38; __utmz=96992031.1380383869.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); Hm_lvt_21e144d0df165d6556d664e2836dadfe=1381330561,1381368826,1381395666,1381554711

Connection: keep-alive

Cache-Control: max-age=0

其中请求行就是第一行,GET /contextpath/querystring HTTP/1.1,余下的都是请求头。

请求行末尾必须是CRLF,而请求行与请求头,以及请求头之间必须用空行隔开,而空行也必须只包含CRLF。

一个请求解析流程:

org.apache.tomcat.util.net.JIoEndpoint.Acceptor#run
->org.apache.tomcat.util.net.JIoEndpoint.SocketProcessor#run(请求处理线程池中运行)
-->org.apache.coyote.AbstractProtocol.AbstractConnectionHandler#process
--->org.apache.coyote.http11.AbstractHttp11Processor#process
---->org.apache.coyote.http11.InternalInputBuffer#parseRequestLine
---->org.apache.coyote.http11.InternalInputBuffer#parseHeaders
---->org.apache.catalina.connector.CoyoteAdapter#service

对于StandardEngine来说有一个与之对应的StandardEngineValve,对于StandardHost有一个StandardHostValve与之对应,StandardContext有一个StandardContextValve与之对应,StandardWrapper与StandardWrapperValve对应

调用链

->org.apache.catalina.core.StandardEngineValve#invoke
-->org.apache.catalina.valves.AccessLogValve#invoke
--->org.apache.catalina.valves.ErrorReportValve#invoke
---->org.apache.catalina.core.StandardHostValve#invoke
----->org.apache.catalina.authenticator.AuthenticatorBase#invoke
------>org.apache.catalina.core.StandardContextValve#invoke
------->org.apache.catalina.core.StandardWrapperValve#invoke

最后会调用到StandardWrapperValve,它其实也是最终调用Servlet的地方

一个请求过来以后,Tomcat是如何一步步处理的:

  • 用户浏览器发送请求,请求会发送到对应的Connector监听的Socket端口。
  • Connector从Socket流中获取数据,然后根据Http协议将其解析为Request和Reponse对象
  • 找到Request对象对应的Host,Context,Wrapper
  • 调用最终的Servelt的service进行处理。
Tomcat中ClassLoader的总体结构,

总结如下:

  • 在Tomcat存在common,cataina,shared三个公共的classloader,

    默认情况下,这三个classloader其实是同一个,都是common classloader,

  • 而针对每个webapp,也就是context(对应代码中的StandardContext类),都有自己的WebappClassLoader来加载每个应用自己的类。

Tomcat在加载webapp级别的类的时候,默认是不遵守parent-first的,这样做的好处是更好的实现了应用的隔离,但是坏处就是加大了内存浪费,同样的类库要在不同的app中都要加载一份。

们可以看出每一个StandardContext会关联一个Manager,默认情况下Manager的实现类是StandardManager,而StandardManager内部会聚合多个Session,其中StandardSession是Session的默认实现类,当我们调用Request.getSession的时候,Tomcat通过StandardSessionFacade这个外观类将StandardSession包装以后返回。

如果采用LAST_ACCESS_AT_START的时候,那么请求本身的处理时间将不算在内。比如一个请求处理开始的时候是10:00,请求处理花了1分钟,那么如果LAST_ACCESS_AT_STARTtrue,则算是否超期的时候,是从10:00算起,而不是10:01

每一个容器都有一个pipeline,而一个pipeline又有多个Valve阀门,其中StandardEngine对应的阀门是StandardEngineValveStandardHost对应的阀门是StandardHostValveStandardContext对应的阀门是StandardContextValveStandardWrapper对应的阀门是StandardWrapperValve

对于阀门来说有两种,一种阀门在处理完自己的事情以后,只需要将工作委托给下一个和自己在同一管道的阀门,第二种阀门是负责衔接各个管道的,它负责将请求传递给下个管道的第一个阀门处理,这种阀门叫Basic阀门,它是每个管道中最后一个阀门,上面的Standard*Valve都属于第二种阀门。

Reference:

http://imtiger.net/blog/2013/10/16/tomcat-architecture/

2015/10/25