对比了 BIO、NIO、AIO 三种 IO 方式的实现区别以及优缺点。
传统的BIO编程
网络编程的基本模型是 Client/Server 模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。
在基于传统同步阻塞模型开发中,ServerSocket 负责绑定IP地址,启动监听端口 Socket 负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。
下面是BIO实现的时间服务器(Time Server)分析
原理分析
采用BIO通信模型的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的一请求一应答通信模型。
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈 1:1 的正比关系,由于线程是 Java 虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。
代码分析
public class TimeServer {
public static void main(String[] args) throws IOException {
new TimeServer().run();
}
public void run() throws IOException {
int port = 8080;
ServerSocket server = new ServerSocket(port);
System.out.println("The time server is start in port " + port);
Socket socket = null;
while (true) {
//阻塞直至接收到请求,然后无限循环继续阻塞等待请求
socket = server.accept();
new Thread(new TimeServerHandler(socket)).start();
}
}
static class TimeServerHandler implements Runnable {
private Socket socket;
TimeServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));;
PrintWriter out = new PrintWriter(this.socket.getOutputStream(), true);
String body;
while ((body = in.readLine()) != null) {
//如果是Query Time命令,就返回当下的时间
String currentTime = "Query Time".equalsIgnoreCase(body) ?
new Date() + " which is provided by TimeServer" : "No corresponding command";
//向客户端传输
out.println(currentTime);
}
} catch (IOException e) {
//此处省略关闭输入流、输出流、Socket代码
}
}
}
}
public class TimeClient {
public void run() {
try {
//连接服务端
Socket socket = new Socket("localhost", 8080);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
//发送查询时间请求
out.println("Query Time");
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//读取服务端返回数据
String resp = in.readLine();
System.out.println("Now is : " + resp);
} catch (IOException e) {
//此处省略关闭输入流、输出流、Socket代码
e.printStackTrace();
}
}
}
伪异步IO编程
为了解决同步阻塞 IO 面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化——后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数 M:线程池最大线程数 N 的比例关系,其中 M 可以远远大于 N。通过线程池可以灵活地调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。
当有新的客户端接入时,将客户端的 Socket 封装成一个 Task(该任务实现java.lang.Runnable
接口)投递到后端的线程池中进行处理,JDK 的线程池维护一个消息队列和 N 个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
代码分析
对之前直接创建线程的行为改换成通过线程池执行线程的行为(此处略)。
弊端分析
对 Socket 输入流进行读取操作时,输入流会一直阻塞,直至发生以下三个条件:有数据可读,可用数据已经读取完毕,发生空指针或者 IO 异常。
这意味着当对方发送请求或者应答消息比较缓慢,或者网络传输较慢时,读取输入流一方的通信线程将被长时间阻塞,如果对方要 60s 才能够将数据发送完成,读取一方的 IO 线程也将会被同步阻塞 60s,在此期间,其他接入消息只能在消息队列中排队。
对 Socket 输出流进行写入操作时,输出流会一直阻塞,直到所有要发送的字节写入完毕,或者发生异常学习过 TCP/IP 相关知识的人都知道,当消息的接收方处理缓慢的时候,将不能及时地从 TCP 缓冲区读取数据,这将会导致发送方的 TCP window size 不断减小,直到为 0,双方处于 Keep-Alive 状态,消息发送方将不能再向 TCP 缓冲区写入消息,这时如果采用的是同步阻塞 IO,write 操作将会被无限期阻塞,直到 TCP window size 大于 0 或者发生 IO 异常。
伪异步 IO 实际上仅仅是对之前 IO 线程模型的一个简单优化,无法从根本上解决同步 IO 导致的通信线程阻塞问题。
通信对方返回应答时间过长引起的级联故障
- 服务端处理缓慢,返回应答消息耗费 60s,平时只需要 10ms。
- 采用伪异步 IO 的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,它将被同步阻塞 60s。
- 加入所有的可用线程都被故障服务器阻塞,那后续所有的 IO 消息都将在队列中排队。
- 由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
- 由于前端只有一个 Acceptor 线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
- 由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接收新的请求消息。
NIO编程
与 Socket 类和 ServerSocket 类相对应,NlO 也提供了 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现。这两种新增的通道都支持阻塞和非阻塞两种模式。阻塞模式使用非常简单,但是性能和可靠性都不好,非阻塞模式则正好相反。开发人员可以根据自己的需要来选择合适的模式。一般来说,低负载、低并发的应用程序可以选择同步阻塞 IO 以降低编程复杂度;对于高负载、高并发的网络应用,需要使用 NIO 的非阻塞模式进行开发。
NIO服务端通信序列
序列图
详细步骤演示
步骤一
打开 ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道
ServerSocketChannel acceptor = ServerSocketChannel.open();
步骤二
绑定监听端口,设置连接为非阻塞模式
acceptor.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"), port));
acceptor.configureBlocking(false);
步骤三
创建 Reactor 线程,创建多路复用器并启动线程
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
步骤四
将 ServerSocketChannel 注册到 Reactor 线程的多路复用器 Selector 上,监听 ACCEPT 事件
SelectionKey key = acceptor.register(selector, SelectionKey.OP_ACCEPT, ioHandler);
步骤五
多路复用器在线程run方法的无限循环体内轮询准备就绪的Key
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while(it.hasNext()){
SelectionKey key = (SelectionKey) it.next();
//Deal with I/O event
}
步骤六
多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路
SocketChannel channel = svrChannel.accept();
步骤七
设置客户端链路为非阻塞模式
channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
步骤八
将新接入的客户端连接注册到 Reactor 线程的多路复用器上,监听读操作,读取客户端发送的网络消息
Selectionkey key = socketChannel.register(selector, SelectionKey.OP_READ, ioHandler);
步骤九
异步读取客户端请求消息到缓冲区
int readNumber = channel.read(receivedBuffer);
步骤十
对 ByteBuffer 进行编解码,如果有半包消息指针 reset,继续读取后续的报文,将解码成功的消息封装成 Task,投递到业务线程池中,进行业务逻辑编排
Object message null;
while(buffer.hasRemain()){
byteBuffer.mark();
Object message = decode(byteBuffer);
if(message == null){
byteBuffer.reset();
break;
}
messageList.add(message);
}
if(!byteBuffer.hasRemain())
byteBuffer.clear();
else
byteBuffer.compact();
if(messageList != null && !messageList.isEmpty()){
for(Object messageE : messageList) handlerTask(messageE)
}
步骤十一
将 POJO 对象 encode 成 ByteBuffer,调用 SocketChannel 的异步 write 接口,将消息异步发送给客户端
socketChannel.write(buffer);
注意:如果发送区TCP缓冲区满,会导致写半包,此时,需要注册监听写操作位,循环写,直到整包消息写入TCP缓冲区。
NIO实现的TimeServer
public class TimeServer {
public static void main(String[] args) {
MultiplexerTimeServer server = new MultiplexerTimeServer(8080);
new Thread(server).start();
}
}
public class MultiplexerTimeServer implements Runnable{
private ServerSocketChannel serverChannel;
private Selector selector;
private boolean stop;
MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (!stop) {
try {
//无论是否有读写事件发生,selector每隔一秒都被唤醒一次
//该方法对应的的无参方法是只要有就绪的Channel就返回该Channel的SelectionKey集合
//通过对就绪的Channel集合进行迭代,可以进行网络的异步读写操作
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
//多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public void stop() {
this.stop = true;
}
public void handleInput(SelectionKey key) throws Exception{
if (key.isValid()) {
//根据SelectionKey的操作位进行判断可获知网络事件的类型
if (key.isAcceptable()) {
//Accept new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//接收客户端的连接请求并创建SocketChannel实例(相当于完成了三次握手,TCP物理链路正式建立)
SocketChannel sc = ssc.accept();
//设置异步非阻塞(同时也可以设置其他的Socket参数设置)
sc.configureBlocking(false);
//Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
//读取客户端请求信息
if (key.isReadable()) {
//Read Data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuf = ByteBuffer.allocate(1024);
// 返回值大于0:读到了字节,对字节进行编解码;
// 返回值等于0:没有读取到字节,属于正常场景,忽略;
// 返回值为-1:链路已经关闭,需要关闭 SocketChannel,释放资源。
int readBytes = sc.read(readBuf);
if (readBytes > 0) {
//读取内容
readBuf.flip();
// 由于 SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,
// 此时会出现“写半包”问题。我们需要注册写操作,不断轮询 Selector将没有发送完的
// Byte Buffer发送完毕,然后可以通过ByteBuffer的hasRemaino方法判断消息
// 是否发送完成
// 这里没有演示如何处理写半包的情况
byte[] bytes = new byte[readBuf.remaining()];
readBuf.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Time server receive order : " + body);
String currentTime = "Query time".equalsIgnoreCase(body) ? new Date().toString() : "Bad Order";
//写入返回数据
doWrite(sc, currentTime);
} else if (readBytes < 0) {
//对端链路关闭
key.channel();
sc.close();
} else {
//读到0字节,忽略
}
}
}
}
private void doWrite(SocketChannel sc, String time) throws IOException {
byte[] response = time.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(response.length);
writeBuffer.put(response);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
NIO客户端通信序列
序列图
详细步骤演示
步骤一
打开 Socketchannel,绑定客户端本地地址(可选,默认系统会随机分配1个可用的本地地址)
SocketChannel clientChannel = Socketchannel.open();
步骤二
设置 SocketChannel 为非阻塞模式,同时设置客户端连接的TCP参数
clientChannel.configureBlocking(false);
socket.setReuseAddress(true);
socket.setReceiveBufferSize(BUFFER_SIZE);
socket.setSendBufferSize(BUFFER_SIZE);
步骤三
异步连接服务端
boolean connected = clientChannel.connect(new InetSocketAddress("ip", port));
步骤四
判断是否连接成功,如果连接成功,则直接注册读状态位到多路复用器中,如果当前没有连接成功(异步连接,返回 false,说明客户端已经发送sync包,服务端没有返回ack包,物理链路还没有建立)
if(connected){
clientChannel.register(selector, SelectionKey.OP_READ, ioHandler);
}else{
clientChannel.register(selector, SelectionKey.OP_CONNECT, ioHandler);
}
步骤五
向 Reactor 线程的多路复用器注册 OP_CONNECT 状态位,监听服务端的TCP ACK应答
clientChannel.register(selector, SelectionKey.OP_CONNECT, ioHandler);
步骤六
创建 Reactor线程,创建多路复用器并启动线程
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
步骤七
多路复用器在线程run方法的无限循环体内轮询准备就绪的Key
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()){
SelectionKey key = (SelectionKey) it.next();
//... deal with I/O event
}
步骤八
接收 connect 事件进行处理
if(key.isConnectable())
//handlerConnect
步骤九
判断连接结果,如果连接成功,注册读事件到多路复用器
if (channel.finishConnect())
registerRead();
步骤十
注册读事件到多路复用器
clientChannel.register(selector, SelectionKey.OP_READ, ioHandler);
步骤十一
异步读客户端请求消息到缓冲区
int readNumber = channel.read(receivedBuffer);
步骤十二
对 ByteBuffer 进行编解码,如果有半包消息接收缓冲区 Reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排。
Object message = null;
while(buffer.hasRemain()){
byteBuffer.mark();
Object message = decode(byteBuffer);
if(message == null) {
byteBuffer.reset();
break;
}
messageList.add(message);
}
if (byteBuffer.hasRemain())
byteBuffer.clear();
else
byteBuffer.compact();
if (messageList != null & !messageList.isEmpty())
for(Object messageE : messageList)
handlerTask(messageE);
步骤十三
将POJO对象 encode 成 ByteBuffer,调用 SocketChannel 的异步 write 接口,将消息异步发送给客户端。
socketChannel.write(buffer);
NIO实现的TimeClient
public class MultiplexerTimeClient implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public MultiplexerTimeClient(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try{
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doConnect() throws IOException {
//如果直接连接成功,则注册在多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}
//如果没有直接连接成功,说明服务端没有返回TCP握手应答消息,但不代表连接失败
//需要将SocketChannel注册到多路复用器上,注册为OP_CONNECT,当服务端返回TCP
//syn-ack消息后,Selector就能够轮询到这个SocketChannel处于连接就绪状态
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void handleInput(SelectionKey key) throws Exception{
if (key.isValid()) {
//判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
//判断是否连接成功,true为成功,false或IOException为连接失败
//将SocketChannel注册到Selector上,注册OP_READ,
//监听网络读操作,发送请求信息给服务端
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}else {
System.exit(1);
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is " + body);
this.stop = true;
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
//读到零字节,忽略
}
}
}
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "Query time".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("Send Success");
}
}
}
NIO编程的优点
- 客户端发起的连接操作是异步的,可以通过在多路复用器注册 OP_CONNECT 等待后续结果,不需要像之前的客户端那样被同步阻塞。
- SocketChannel 的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样IO通信线程就可以处理其他的链路,不需要同步等待这个链路可用
- 线程模型的优化:由于JDK的 Selector 在 Linux 等主流操作系统上通过 epoll 实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个 Selector 线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。因此,它非常适合做高性能、高负载的网络服务器。
AIO编程
NIO 2.0 引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供以下两种方式获取获取操作结果
- 通过
java.util.concurrent.Future
类来表示异步操作的结果 - 在执行异步操作的时候传入一个
java.nio.channels.CompletionHandler
接口的实现类作为操作完成的回调。
NIO 2.0 的异步套接字通道是真正的异步非阻塞 IO,对应于 UNIX 网络编程中的事件驱动 IO(AIO)。它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了 NIO 的编程模型。
AIO服务端
public class AsyncTimeServer {
public static void main(String[] args) throws IOException {
new Thread(new AsyncTimeServerHandler(8080), "AIO-AsyncTimeServerHandler-001").start();
}
}
public class AsyncTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel
.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept() {
asynchronousServerSocketChannel.accept(this,
new AcceptCompletionHandler());
}
}
public class AcceptCompletionHandler implements
CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result,
AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
有的读者可能会心存疑惑:既然已经接收客户端成功了,为什么还要再次调用 accept 方法呢?原因是这样的:调用 AsynchronousServerSocketChannel 的 accept方法后,如果有新的客户端连接接入,系统将回调我们传入的 CompletionHandler 实例的 completed 方法,表示新的客户端已经接入成功。
因为一个 AsynchronousServerSocketChannel 可以接收成千上万个客户端,所以需要继续调用它的 accept 方法,接收其他的客户端连接,最终形成一个循环。每当接收一个客户读连接成功之后,再异步接收新的客户端连接。
public class ReadCompletionHandler implements
CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null)
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("The time server receive order : " + req);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite(String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = (currentTime).getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果没有发送完成,继续发送
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
AIO客户端
public class AsyncTimeClient {
public static void main(String[] args) {
new Thread(new AsyncTimeClientHandler("127.0.0.1", 8080),
"AIO-AsyncTimeClientHandler-001").start();
}
}
public class AsyncTimeClientHandler implements
CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(
readBuffer,
readBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result,
ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer
.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,
"UTF-8");
System.out.println("Now is : "
+ body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,
ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
从线程堆栈中可以发现,JDK 底层通过线程池 ThreadPoolExecutor 来执行回调通知,异步回调通知类由sun.nio.ch.AsynchronousChannelGrouplmpl
实现,它经过层层调用,最终回调com.demo.netty.aio.AsyncTimeClientHandler$1.completed
方法,完成回调通知。由此我们也可以得出结论:异步 SocketChannel 是被动执行对象,我们不需要像 NIO 编程那样创建一个独立的 IO 线程来处理读写操作。对于 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel,它们都由 JDK 底层的线程池负责回调并驱动读写操作。正因为如此,基于 NIO 2.0 新的异步非阻塞 Channel 进行编程比 NIO 编程更为简单。
Netty编程
Netty TimeServer
public class NettyTimeServer {
public static void main(String[] args) {
new NettyTimeServer().bind(8080);
}
private void bind(int port) {
//创建两个NIO线程组(Reactor线程组),用于网络事件的处理
//用于服务端接受客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于SocketChannel网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//Netty用于启动NIO服务端的辅助启动类,目的是降低服务端开发复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//IO事件处理类
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放跟shutdownGracefully相关的资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer {
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new TimeServerHandler());
}
}
}
//ChannelHandlerAdapter关注的是网络事件的读写操作
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf request = (ByteBuf) msg;
byte[] req = new byte[request.readableBytes()];
request.readBytes(req);
String result = new String(req, "UTF-8");
System.out.println("The time server is received order : " + result);
String resp = "Query".equalsIgnoreCase(result) ?
new Date().toString() : "Bad Argument";
ByteBuf response = Unpooled.copiedBuffer(resp.getBytes());
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 调用了ChannelHandlerContext的flush方法,它的作用是将消息发送队列中的消息写入到
// SocketChannel中发送给对方。从性能角度考虑,为了防止频繁地唤醒 Selector进行消息发送,
// Netty的write方法并不直接将消息写入SocketChannel中,调用 write方法只是把待发送的消息
// 放到发送缓冲数组中,再通过调用flush方法,将发送缓冲区中的消息全部写到SocketChannel中
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}
Netty TimeClient
public class NettyTimeClient {
public static void main(String[] args) {
new NettyTimeClient().connect("127.0.0.1", 8080);
}
private void connect(String host, int port) {
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端辅助启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//connect发起异步连接操作 sync同步方法等待连接成功
ChannelFuture f = bootstrap.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
public class TimeClientHandler extends ChannelHandlerAdapter {
private ByteBuf message;
public TimeClientHandler(){
byte[] req = "Query".getBytes();
message = Unpooled.buffer(req.length);
message.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] resp = new byte[buf.readableBytes()];
buf.readBytes(resp);
String body = new String(resp, "UTF-8");
System.out.println("Now is : " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println(cause.getMessage());
}
}