Java NIO 使用实例

分享到:

     在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的输入输出系统(New Input/Out,NIO),非阻塞是Java NIO实现的重要功能之一 。

 1、Buffer

缓冲区,传输数据使用,本质是一个数组,Channel中读数据和写数据都只能通过Buffer传输。

2、Channel

通道,所有的IO流在NIO中都是从Channel开始的,数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中,是Buffer对象的唯一接口。

3、Selector

选择器,它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。

使用一个 select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。而不需要为每个 channel 去分配一个线程。

4、SelectionKey

包含了事件的状态信息和时间对应的通道的绑定。

使用NIO的步骤:

1\创建一个Selector实例

2\将该实例注册到各种通道,指定每个通道上感兴趣的IO操作

3\重复执行,选择器循环:

    31\调用一种Select()方法

    32\获取已选键值

    33\对于已选中键集中的每一个键:

        331\将已选键从键集中移除

        332\获取信道,并从键中获取附件

        333\确定准备就绪的操作并执行;对于accept操作获得的SocketChannel对象,需将信道设为非阻塞模式,并将其注册到选择器中

        334\根据需要,修改键的兴趣操作集

如下代码:

<span style="font-size: small;"><span>NIOServer.java</span></span>
package org.hadoopinternal.nio;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOServer {
	private static final int TIMEOUT  = 300;
	private static final int PORT
= 12112;
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel listenChannel = ServerSocketChannel.open();
listenChannel.configureBlocking(false);
listenChannel.socket().bind(new InetSocketAddress(PORT));
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
if(selector.select(TIMEOUT)==0) {
System.out.print(".");
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while( iter.hasNext() ) {
SelectionKey key = iter.next();
iter.remove();
//Server socket channel has pending connection request?
if( key.isAcceptable() ) {
SocketChannel channel=listenChannel.accept();
channel.configureBlocking(false);
SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ );
NIOServerConnection conn=new NIOServerConnection(connkey);
connkey.attach(conn);
}
if( key.isReadable() ) {
NIOServerConnection conn=(NIOServerConnection) key.attachment();
conn.handleRead();
}
if( key.isValid() && key.isWritable() ) {
NIOServerConnection conn=(NIOServerConnection) key.attachment();
conn.handleWrite();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
	}
}
<span style="font-size: small;"><span>NIOServerConnection.java:
</span></span>
package org.hadoopinternal.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class NIOServerConnection {
	private static final int BUFFSIZE = 1024;
SelectionKey key;
	SocketChannel channel;
	ByteBuffer buffer;
public NIOServerConnection(SelectionKey key) {
this.key=key;
this.channel=(SocketChannel) key.channel();
buffer=ByteBuffer.allocate(BUFFSIZE);
	}
public void handleRead() throws IOException {
long bytesRead=channel.read(buffer);
if(bytesRead==-1) {
channel.close();
} else {
key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE );
}
	}
public void handleWrite() throws IOException {
buffer.flip();
channel.write(buffer);
if(!buffer.hasRemaining()) {
key.interestOps( SelectionKey.OP_READ );
}
buffer.compact();
	}
}

如下代码为hadoop IPC Server 中的Listener是一个标准的NIO应用:

/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
private Random rand = new Random();
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
//-tion (for idle connections) ran
private long cleanupInterval = 10000; //the minimum interval between
//two cleanup runs
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
private ExecutorService readPool;
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
private class Reader implements Runnable {
private volatile boolean adding = false;
private Selector readSelector = null;
Reader(Selector readSelector) {
this.readSelector = readSelector;
}
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
if (running) {
// unexpected -- log it
LOG.info(getName() + " caught: " +
StringUtils.stringifyException(e));
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
}
}
}
}
/**
* This gets reader into the state that waits for the new channel
* to be registered with readSelector. If it was waiting in select()
* the thread will be woken up, otherwise whenever select() is called
* it will return even if there is nothing to read and wait
* in while(adding) for finishAdd call
*/
public void startAdd() {
adding = true;
readSelector.wakeup();
}
public synchronized SelectionKey registerChannel(SocketChannel channel)
throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ);
}
public synchronized void finishAdd() {
adding = false;
this.notify();
}
}
/** cleanup connections from connectionList. Choose a random range
* to scan and also have a limit on the number of the connections
* that will be cleanedup per run. The criteria for cleanup is the time
* for which the connection was idle. If 'force' is true then all
* connections will be looked at for the cleanup.
*/
private void cleanupConnections(boolean force) {
if (force || numConnections > thresholdIdleConnections) {
long currentTime = System.currentTimeMillis();
if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
return;
}
int start = 0;
int end = numConnections - 1;
if (!force) {
start = rand.nextInt() % numConnections;
end = rand.nextInt() % numConnections;
int temp;
if (end < start) {
temp = start;
start = end;
end = temp;
}
}
int i = start;
int numNuked = 0;
while (i <= end) {
Connection c;
synchronized (connectionList) {
try {
c = connectionList.get(i);
} catch (Exception e) {return;}
}
if (c.timedOut(currentTime)) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
numNuked++;
end--;
c = null;
if (!force && numNuked == maxConnectionsToNuke) break;
}
else i++;
}
lastCleanupRunTime = System.currentTimeMillis();
}
}
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
cleanupConnections(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
cleanupConnections(false);
}
LOG.info("Stopping " + this.getName());
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { }
selector= null;
acceptChannel= null;
// clean up all connections
while (!connectionList.isEmpty()) {
closeConnection(connectionList.remove(0));
}
}
}
private void closeCurrentConnection(SelectionKey key, Throwable e) {
if (key != null) {
Connection c = (Connection)key.attachment();
if (c != null) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
c = null;
}
}
}
InetSocketAddress getAddress() {
return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
}
void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
if (LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
} finally {
reader.finishAdd();
}
}
}
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " +
c + ". Number of active connections: "+
numConnections);
closeConnection(c);
c = null;
}
else {
c.setLastContact(System.currentTimeMillis());
}
}
synchronized void doStop() {
if (selector != null) {
selector.wakeup();
Thread.yield();
}
if (acceptChannel != null) {
try {
acceptChannel.socket().close();
} catch (IOException e) {
LOG.info(getName() + ":Exception in closing listener socket. " + e);
}
}
readPool.shutdown();
}
// The method that will return the next reader to work with
// Simplistic implementation of round robin for now
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}
}

昵    称:
验证码:

相关文档: