概述
java nio的核心有以下几部分组成:
Buffer
一个 Buffer
本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据
关键的Buffer实现:
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
- MappedByteBuffer
核心的还是 ByteBuffer
, 其他的 Char、Integer 等只是包装了一下它而已, 通常我们是直接使用 ByteBuffer
。
MappedByteBuffer
用于实现内存映射文件,这里暂时不讨论
capacity, position, limit
可以将 Buffer
理解成一个数组, 数组有容量, 每次访问需要指定下标
Buffer
中有3个很重要的变量,它们是理解Buffer工作机制的关键,分别是:
- capacity: 代表这个缓冲区的容量,一旦设定就不可以更改
- position: 初始值是 0, 每写入 Buffer 一个值, position 就自动加 1, 代表下一次写入的位置。读也是类似,每读一个值,position 就加 1
- limit: 写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity。写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了
初始化 Buffer
Buffer 的实现类提供了一个静态的方法 allocate(int capacity)
, 来实例化 Buffer, 如:
1 2 3
| ByteBuffer byteBuf = ByteBuffer.allocate(1024) IntBuffer intBuf = IntBuffer.allocate(1024) LongBuffer longBuf = LongBuffer.allocate(1024)
|
实际场景中,也会经常使用 wrap
来初始化:
1
| public static ByteBuffer wrap(byte[] array)
|
Buffer 的读写操作
往Buffer中写
Buffer
提供了一些 put 方法用于填充数据到 Buffer
中, 另外常见的操作就是将 Channel
中的数据读入 Buffer
, 代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public abstract ByteBuffer put(byte b);
public abstract ByteBuffer put(int index, byte b);
public final ByteBuffer put(byte[] src) {...} public ByteBuffer put(byte[] src, int offset, int length) {...}
int num = channel.read(buf);
|
读取Buffer的值
如果要读 Buffer 中的值,需要切换模式,从写入模式切换到读出模式。调用 Buffer 的 flip()
方法,可以从写入模式切换到读取模式。其实这个方法也就是设置了一下 position 和 limit 值:
1 2 3 4 5 6 7
| public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
|
对应写的put方法, Buffer 也提供了一系列的 get 方法用来读, 同样也经常将写入 Buffer 的数据传输到 Channel 中:
1 2 3 4 5 6 7 8 9
| public abstract byte get();
public abstract byte get(int index);
public ByteBuffer get(byte[] dst)
int num = channel.write(buf);
|
mark 和 reset
mark 用于临时保存 position 的值,每次调用 mark() 方法都会将 mark 设值为当前的 position,便于后续需要的时候使用:
1 2 3 4
| public final Buffer mark() { mark = position; return this; }
|
reset 用于将 position 设回 mark 标记的值:
1 2 3 4 5 6 7
| public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
|
如果需要重新读数据可以考虑使用 mark&reset
rewind 和 clear
rewind():会重置 position 为 0,通常用于重新从头读写 Buffer:
1 2 3 4 5
| public final Buffer rewind() { position = 0; mark = -1; return this; }
|
clear(): 重置 Buffer, 一般数据重新写入 Buffer 前调用clear:
1 2 3 4 5 6
| public final Buffer clear() { position = 0 limit = capacity mark = -1 return this }
|
Channel
Channel 是数据来源或写入的目的地,NIO中主要的channel:
- FileChannel: 文件通道,用于文件读写(常用的文件操作, 是阻塞的)
- DatagramChannel: UDP 连接的接收和发送
- SocketChannel: TCP 客户端
- SeverSocketChannel: TCP 服务端
通道(Channel)用于和 Buffer 一起操作,读操作将 Channel 的数据写入 Buffer (Channel.read(buffer)
), 写操作将 Buffer 的数据写入到 Channel 中(channel.write(buffer)
)
FileChannel
FileChannel 主要用于一些文件的操作,常见的操作如下:
初始化
可以从 inputstream
或 RandomAccessFile 获取一个 FileChannel:
1 2 3 4 5 6
| FileInputStream inputStream = new FileInputStream(new File("/data.txt")); FileChannel fileChannel = inputStream.getChannel();
FileChannel fileChannel = new RandomAccessFile(new File("/data.txt"), "rw").getChannel();
|
读取/写入文件内容
前面说过,Channel 的数据操作是和 Buffer 打交道的:
1 2 3 4 5 6 7 8 9 10 11
| ByteBuffer buffer = ByteBuffer.allocate(1024);
int num = fileChannel.read(buffer);
buffer.flip(); while(buffer.hasRemaining()) { fileChannel.write(buffer); }
|
SocketChannel
SocketChannel 可以理解成一个 TCP 的客户端.
打开一个TCP连接:
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 80));
读写数据与 FileChannel 没什么区别, 都是通过操作 Buffer 缓冲区:
1 2 3 4 5 6 7
| socketChannel.read(buffer);
while(buffer.hasRemaining()) { socketChannel.write(buffer); }
|
ServerSocketChannel
ServerSocketChannel 可以理解成TCP的服务端。
打开一个TCP服务操作如下:
1 2 3 4 5 6 7 8 9
| ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); }
|
ServerSocketChannel 不和 Buffer 打交道了,因为它并不实际处理数据,它一旦接收到请求后,实例化 SocketChannel,之后在这个连接通道上的数据传递它就不管了,因为它需要继续监听端口,等待下一个连接
DatagramChannel
UDP 和 TCP 不一样,DatagramChannel 一个类处理了服务端和客户端。
监听端口操作:
1 2 3 4 5
| DatagramChannel channel = DatagramChannel.open() channel.socket().bind(new InetSocketAddress(9090)) // 接收数据 ByteBuffer buf = ByteBuffer.allocate(48) channel.receive(buf)
|
发送数据操作:
1 2 3 4 5 6
| String data = "hello world";
ByteBuffer buf = ByteBuffer.allocate(11); buf.put(newData.getBytes()); buf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));
|
UDP 是无连接的,不需要握手,也不需要通知对方。所以他无法保证数据的送达
Selector
Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。我们先将通道注册到选择器,并设置好关心的事件,然后就可以通过调用select()方法,静静地等待事件发生,通道有四个事件供我们监听:
- Accept:有可以接受的连接
- Connect:连接成功
- Read:有数据可读
- Write:可写入数据了
Selector的优势
如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这个问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。
Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。
这是在一个单线程中使用一个Selector处理3个Channel的图示:
要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等
使用方式
创建一个Selector,并注册一个Channel。
注意:要将 Channel 注册到 Selector,首先需要将 Channel 设置为非阻塞模式,否则会抛异常。
FileChannel 是不能够使用选择器的, 因为 FileChannel 都是阻塞的
1 2 3
| Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
|
register()方法的第二个参数名叫“interest set”,也就是你所关心的事件集合。如果你关心多个事件,用一个“按位或运算符”分隔,比如
1
| SelectionKey.OP_READ | SelectionKey.OP_WRITE
|
Selector的基本使用流程
- 通过 Selector.open() 打开一个 Selector.
- 将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)
- 不断重复:
- 调用 select() 方法
- 调用 selector.selectedKeys() 获取 selected keys
- 迭代每个 selected key:
- 从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
- 判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 “SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()” 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
- 根据需要更改 selected key 的监听事件.
- 将已经处理过的 key 从 selected keys 集合中删除.
完整Selector示例
当调用了 Selector.close()方法时, 我们其实是关闭了 Selector 本身并且将所有的 SelectionKey 失效, 但是并不会关闭 Channel.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) { if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; }
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) { SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); }
if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } }
if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
|
SelectionKey
- register 注册一个 Channel 时, 会返回一个 SelectionKey 对象, 这个对象包含了如下内容:
- interest set, 即我们感兴趣的事件集, 即在调用 register 注册 channel 时所设置的 interest set.
- ready set, 代表了 Channel 所准备好了的操作
- channel
- selector
- attached object, 可选的附加对象
interest set
我们可以通过如下方式获取 interest set:
1 2 3 4 5 6
| int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
|
ready set
代表了 Channel 所准备好了的操作.
我们可以像判断 interest set 一样操作 Ready set, 但是我们还可以使用如下方法进行判断:
1 2 3 4 5 6
| int readySet = selectionKey.readyOps()
selectionKey.isAcceptable() selectionKey.isConnectable() selectionKey.isReadable() selectionKey.isWritable()
|
Channel 和 Selector
我们可以通过 SelectionKey 获取相对应的 Channel 和 Selector:
1 2
| Channel channel = selectionKey.channel() Selector selector = selectionKey.selector()
|
Attaching Object
我们可以在selectionKey中附加一个对象:
1 2
| selectionKey.attach(theObject) Object attachedObj = selectionKey.attachment()
|
或者在注册时直接附加:
1
| SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
|
参考资料