NIO模型
1. 概述
1.1 翻译翻译?什么叫NIO?
NIO:我认为翻译成Non-Blocking
,更加的通俗直白,相比于BIO,也有一个对比,叫他非阻塞IO最好不过了
- 它和BIO有以下的区别
![在这里插入图片描述]()
- Channel是
双向
的,即可以读又可以写,相比于Stream,它并不区分出输入流和输出流,而且Channel可以完成非阻塞的读写,也可以完成阻塞的读写
1.2 Buffer简介
![在这里插入图片描述]()
- Channel的读写是离不开Buffer的,Buffer实际上内存上一块用来读写的区域。
1.2.1 写模式
![在这里插入图片描述]()
- 其中三个指针我们要了解一下,
position
为当前指针位置,limit
用于读模式,用它来标记可读的最大范围,capacity
是最大的可写范围阈值
![在这里插入图片描述]()
当我们写数据写了四个格子时,我们执行flip()
方法,即可转变为读模式
,limit指针就直接变到了我们刚刚写数据的极限位置,position指针回到初始位置,这样我们就可以将数据读出来了
![在这里插入图片描述]()
1.2.2 读模式到写模式的两种切换
![在这里插入图片描述]()
- 当我们将数据全部读完时,切换到写模式
调用clear()
方法,它会使position指针回到初始位置,limit回到最远端,这样就可以重新开始数据了,虽然clear意为清除,但是其实它只是将指针的位置移动了,并没有将数据清除,而是会覆盖原来的位置
![在这里插入图片描述]()
- 只读了部分数据,我想将未读的部分保留,而现在我又要开始先进行写模式的操作了,这样可以执行
compact()
方法
这个方法会将没有读到的数据保存到初始位置
,而position指针的位置将会移动到这些数据的后面位置
,从未读的数据后开始进行写数据
![在这里插入图片描述]()
之后再读数据的时候,我们就能将上次没有读到的数据读出来了
1.3 Channel简介
Channel间的数据交换,都需要依赖Buffer
![在这里插入图片描述]()
1.3.1 几个重要的Channel
![在这里插入图片描述]()
- FileChannel:用于文件传输
- ServerSocketChannel和SocketChannel:用于网络编程的传输
2. 文件拷贝实战
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.FileChannel;
interface FileCopyRunner{ void copyFile(File source,File target); }
public class FileCopyDemo {
private static void close(Closeable closeable){ if(closeable != null) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } }
private static FileCopyRunner noBufferStreamCopy = new FileCopyRunner() { @Override public void copyFile(File source, File target) { InputStream fin = null; OutputStream fout = null; try { fin = new FileInputStream(source); fout = new FileOutputStream(target); int result; while((result = fin.read()) != - 1){ fout.write(result); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { close(fin); close(fout); } } };
private static FileCopyRunner bufferStreamCopy = new FileCopyRunner() { @Override public void copyFile(File source, File target) { InputStream fin = null; OutputStream fout = null; try { fin = new FileInputStream(source); fout = new FileOutputStream(target); byte[] buffer = new byte[1024]; int result; while((result = fin.read(buffer)) != -1){ fout.write(buffer,0,result); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { close(fin); close(fout); } } };
private static FileCopyRunner nioBufferCopy = new FileCopyRunner() { @Override public void copyFile(File source, File target) { FileChannel fin = null; FileChannel fout = null;
try { fin = new FileInputStream(source).getChannel(); fout = new FileOutputStream(target).getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while(fin.read(byteBuffer) != -1){ byteBuffer.flip(); while (byteBuffer.hasRemaining()){ fout.write(byteBuffer); } byteBuffer.clear(); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { close(fin); close(fout); } } };
private static FileCopyRunner nioTransferCopy = ((source, target) -> { FileChannel fin = null; FileChannel fout = null;
try { fin = new FileInputStream(source).getChannel(); fout = new FileOutputStream(target).getChannel();
long transferred = 0L; long size = fin.size(); while(transferred != size){ transferred += fin.transferTo(0,size,fout); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { close(fin); close(fout); } });
public static void main(String[] args) { File source = new File("J:\\StudySpace\\Java秒杀系统方案优化-高性能高并发实战\\project.zip"); File target = new File("J:\\StudySpace\\Java秒杀系统方案优化-高性能高并发实战\\p1.zip"); File target2 = new File("J:\\StudySpace\\Java秒杀系统方案优化-高性能高并发实战\\p2.zip"); File target3 = new File("J:\\StudySpace\\Java秒杀系统方案优化-高性能高并发实战\\p3.zip"); File target4 = new File("J:\\StudySpace\\Java秒杀系统方案优化-高性能高并发实战\\p4.zip");
new Thread(() -> noBufferStreamCopy.copyFile(source,target)).start(); new Thread(() -> bufferStreamCopy.copyFile(source,target2)).start(); new Thread(() -> nioBufferCopy.copyFile(source,target3)).start(); new Thread(() -> nioTransferCopy.copyFile(source,target4)).start(); } }
|
3. Selector概述
- Channel需要在Selector上注册
![在这里插入图片描述]()
- 注册的同时,要告诉Selector监听的状态
![在这里插入图片描述]()
- Channel对应的状态有:
CONNECT
:socketChannel已经与服务器建立连接的状态;ACCEPT
:serverSocketChannel已经与客户端建立连接的状态;READ
:可读状态;WRITE
:可写状态
![在这里插入图片描述]()
- Channel在Selector上注册完成后,会返回一个SelectKey对象,其中有几个重要的方法:
interestOps
:查看注册的Channel绑定的状态;readyOps
:查看哪些是可操作的状态;channel
:返回channel对象;selector
:返回selector对象;attachment
:附加其他对象![在这里插入图片描述]()
- 调用Selector的select方法,返回它监听的事件的数量,可同时响应多个事件。不过它是阻塞式的调用,当监听的事件中没有可以用来响应请求的,则会被阻塞,直到有可用的channel能够响应该请求,才会返回
![在这里插入图片描述]()
实战
1. NIO模型分析
![在这里插入图片描述]()
- 在服务器端创建一个
Selector
,将ServerSocketChannel注册到Selector上
,被Selector监听的事件为Accept
![在这里插入图片描述]()
- Client1请求与服务器建立连接,Selector接收到Accept事件,服务器端对其进行处理(handles),服务器与客户端连接成功
![在这里插入图片描述]()
- 建立连接过程中,服务器通道(ServerSocketChannel)调用
accept方法
,获取到与客户端进行连接的通道(SocketChannel
),也将其注册到Selector
上,监听READ事件
,这样,客户端向服务器发送消息,就能触发该READ事件进行响应,读取该消息。
Tips: 我们处理这个建立连接并接收从客户端传过来的消息,都是在一个线程
内完成的。在bio中,则会为单个客户端单独开辟一个线程,用于处理消息,并且客户端在不发送消息的过程中,该线程一直是阻塞的。
![在这里插入图片描述]()
- 同样,两个客户连接过来也是一个线程在起作用,将客户端2的SocketChannel注册到服务器的Selector,并监听READ事件,随时响应随时处理。即一个客户端有一个SocketChannel,两个客户端就有两个SocketChannel,这个就是我们使用nio编程模型来用一个selector对象在一个线程里边监听以及处理多个通道的io的操作
各个Channel是被配置为非阻塞式
的(configureBlocking(false)),但是Selector本身调用的select()方法
,它是阻塞式
的,当监听在Selector上的事件都没有触发时,那么它就会被阻塞,直到有事件对其进行响应
2. 聊天室项目代码重点知识
2.1 服务器端
2.1.1 字段
![在这里插入图片描述]()
2.1.2 主方法
![在这里插入图片描述]()
2.1.3 处理方法
![在这里插入图片描述]()
2.1.4 转发消息方法
![在这里插入图片描述]()
2.1.5 接收消息方法
![在这里插入图片描述]()
2.2 客户端
2.2.1 字段
![在这里插入图片描述]()
2.2.2 主方法
![在这里插入图片描述]()
2.2.3 处理方法
![在这里插入图片描述]()
2.2.4 接收方法
![在这里插入图片描述]()
2.2.5 发送方法
![在这里插入图片描述]()
3. 测试结果
- 服务器端显示信息正确
![在这里插入图片描述]()
4. 完整代码
4.1 服务器端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| package server;
import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set;
public class ChatServer { private static final int DEFAULT_PORT = 8888; private static final String QUIT = "quit"; private static final int BUFFER = 1024; private int port;
private ServerSocketChannel serverSocketChannel; private Selector selector; private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER); private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER); private Charset charset = Charset.forName(String.valueOf(StandardCharsets.UTF_8));
public ChatServer(){ this(DEFAULT_PORT); }
public ChatServer(int port) { this.port = port; }
public boolean readyToQuit(String msg){ return QUIT.equals(msg); }
public void close(Closeable closeable){ if(closeable != null) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } }
public void start(){ try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(port)); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器启动成功,监听端口号:" + port + "...");
while(true){ selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { handles(selectionKey); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { close(selector); } }
private void handles(SelectionKey selectionKey) throws IOException { if(selectionKey.isAcceptable()){ ServerSocketChannel server =(ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = server.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); System.out.println("客户端" + socketChannel.socket().getPort() + ":已经连接"); }else if(selectionKey.isReadable()){ SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); String fwdMsg = receive(clientChannel);
if(fwdMsg.isEmpty()){ selectionKey.cancel(); selector.wakeup(); }else { forwardMessage(clientChannel,fwdMsg); if(readyToQuit(fwdMsg)){ selectionKey.cancel(); selector.wakeup(); } } } }
private void forwardMessage(SocketChannel clientChannel, String fwdMsg) throws IOException { for (SelectionKey selectionKey : selector.keys()) { SelectableChannel channel = selectionKey.channel(); if(channel instanceof ServerSocketChannel) System.out.println("客户端" + clientChannel.socket().getPort() + ":" + fwdMsg); else if(selectionKey.isValid() && !channel.equals(clientChannel)){ wBuffer.clear(); wBuffer.put(charset.encode("客户端" + clientChannel.socket().getPort() + ":" + fwdMsg)); wBuffer.flip(); while(wBuffer.hasRemaining()) ((SocketChannel)channel).write(wBuffer); } } }
private String receive(SocketChannel clientChannel) throws IOException { rBuffer.clear(); while(clientChannel.read(rBuffer) > 0); rBuffer.flip(); return String.valueOf(charset.decode(rBuffer)); }
public static void main(String[] args) { ChatServer chatServer = new ChatServer(); chatServer.start(); } }
|
4.2 客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| package client;
import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set;
public class ChatClient {
private static final String DEFAULT_SERVER_HOST = "127.0.0.1"; private static final int DEFAULT_SERVER_PORT = 8888; private static final String QUIT = "quit";
private static final int BUFFER = 1024; private String host; private int port; private SocketChannel clientChannel; private Selector selector; private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER); private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER); private Charset charset = Charset.forName(String.valueOf(StandardCharsets.UTF_8));
public ChatClient(String host, int port) { this.host = host; this.port = port; }
public ChatClient() { this(DEFAULT_SERVER_HOST,DEFAULT_SERVER_PORT); }
public boolean readyToQuit(String msg){ return QUIT.equals(msg); }
public void close(Closeable closeable){ if(closeable != null){ try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } }
public void start(){ try { clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); selector = Selector.open(); clientChannel.register(selector, SelectionKey.OP_CONNECT); clientChannel.connect(new InetSocketAddress(host,port));
while (selector.isOpen()){ selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { handles(selectionKey); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); }catch (ClosedSelectorException e){
} finally { close(selector); } }
private void handles(SelectionKey selectionKey) throws IOException { if(selectionKey.isConnectable()){ SocketChannel channel = (SocketChannel) selectionKey.channel(); if(channel.isConnectionPending()){ channel.finishConnect(); new Thread(new UserInputHandler(this)).start(); } channel.register(selector,SelectionKey.OP_READ); }else if(selectionKey.isReadable()){ String msg = receive(clientChannel); SocketChannel channel = (SocketChannel) selectionKey.channel(); if(msg.isEmpty()){ close(selector); }else { System.out.println(msg); }
} }
private String receive(SocketChannel clientChannel) throws IOException { rBuffer.clear(); while (clientChannel.read(rBuffer) > 0); rBuffer.flip(); return String.valueOf(charset.decode(rBuffer)); }
public void send(String msg) throws IOException { if(msg.isEmpty()) return;
wBuffer.clear(); wBuffer.put(charset.encode(msg)); wBuffer.flip(); while(wBuffer.hasRemaining()){ clientChannel.write(wBuffer); }
if(QUIT.equals(msg)) close(selector); }
public static void main(String[] args) { ChatClient chatClient = new ChatClient(); chatClient.start(); } }
|
4.3 客户端监听用户输入进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package client;
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader;
public class UserInputHandler implements Runnable{
private ChatClient chatClient;
public UserInputHandler(ChatClient chatClient) { this.chatClient = chatClient; }
@Override public void run() { BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
while(true){ try { String msg = consoleReader.readLine(); chatClient.send(msg); if(chatClient.readyToQuit(msg)) break; } catch (IOException e) { e.printStackTrace(); } } } }
|