Time: 2024-11-05 Tuesday 11:19:01
Author: Jackasher
NIO-前锋
我发现前锋讲的还不错
如何理解这个NIO
首先这是一个SocketServerChannel的服务端代码,它的非阻塞体现在 int select = selector.select();
,这里只接受事件,而不做具体的处理,在BIO中,客户端连接后,必须要等待输入,这就是阻塞操作,而现在不需要,把连接和读写操作分离开来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| package org.example.io_05;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set;
public class Server { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); Selector selector = Selector.open(); SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { System.out.println("等待事件发生"); int select = selector.select(); System.out.println("某个事件发生了"); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator(); while (selectionKeyIterator.hasNext()) { SelectionKey next = selectionKeyIterator.next(); handleChannel(next, selectionKeyIterator);
} }
}
private static void handleChannel(SelectionKey next, Iterator<SelectionKey> selectionKeyIterator) { if (next.isAcceptable()) { System.out.println("有客户端连接"); ServerSocketChannel serverSocketChannel = (ServerSocketChannel) next.channel(); try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(next.selector(), SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } if (next.isReadable()) { System.out.println("有客户端发送数据"); SocketChannel socketChannel = (SocketChannel) next.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = 0; try { int len = socketChannel.read(buffer); if (len != -1) { System.out.println("客户端发送的数据:" + new String(buffer.array(), 0, len)); }
} catch (IOException e) { throw new RuntimeException(e); }
}
selectionKeyIterator.remove(); } }
|
Buffer.flip()
buffer默认limit就是capacity,经过读模式后,变成老position模式,buffer.flip()
•用途:切换缓冲区模式,以准备读取已经写入的数据。
•作用:将缓冲区的 limit 设置为当前位置 position,然后将 position 设置为 0。这样,缓冲区就可以从头开始读取已写入的数据。
•典型使用场景:在向缓冲区写入数据后,调用 flip() 准备将数据从缓冲区中读取出来。flip() 通常在 put()(写入数据)操作后紧接着使用,用于切换到读取模式。

非阻塞
这个非阻塞会使ServerSocket.accept()时,不会等待,如果没有连接,会直接跳过
1
| serverSocketChannel.configureBlocking(false);
|
UDP
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package org.example.io_06;
import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel;
public class Server { public static void main(String[] args) throws IOException { DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.bind(new InetSocketAddress(9999)); while (true) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); SocketAddress receive = datagramChannel.receive(byteBuffer); int read = datagramChannel.read(byteBuffer); byteBuffer.clear(); byteBuffer.flip(); System.out.println(receive + ":" + new String(byteBuffer.array(), 0, byteBuffer.limit())); } } }
|
Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package org.example.io_06;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel;
public class Client { public static void main(String[] args) throws IOException { DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.send(ByteBuffer.wrap("Hello Server".getBytes()), new java.net.InetSocketAddress("127.0.0.1", 9999));
datagramChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
datagramChannel.write(ByteBuffer.wrap("Hello Server".getBytes())); } }
|
NioChat
至此,已经完成了我Nio的学习,实现了基于Nio的Selector多路复合器的即时群聊天室的功能,之前在广播一直有问题,我发现最多只有两个人收到信息然后我发现是在socketChannel,write(buffer)时,会改变position的位置,每次发送时,应该flip一下
但是同时注意到,之前事例代码是没有flip的,因为使用的wrap,每一次都是新的buffer
1
| client.write(ByteBuffer.wrap(msg.getBytes()));
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| package org.example.io_08;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator;
public class NioServerWithIssues { public static void main(String[] args) { try { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) { selector.select(); Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
if (selectionKeyIterator == null) { continue; }
try { while (selectionKeyIterator.hasNext()) { SelectionKey selectionKey = selectionKeyIterator.next(); if (selectionKey.isAcceptable()) { handleAcceptable(selectionKey); } if (selectionKey.isReadable()) { handleReadable(selectionKey); }
selectionKeyIterator.remove();
} } catch (RuntimeException e) { System.out.println("关闭出错"); }
} } catch (IOException e) { throw new RuntimeException(e); }
}
private static void handleReadable(SelectionKey selectionKey) throws IOException { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false);
if (socketChannel == null) { return; }
try {
ByteBuffer buffer = ByteBuffer.allocate(1024); int read; read = socketChannel.read(buffer);
if (read == -1) { System.out.println("客户端 " + socketChannel.getRemoteAddress() + " 已下线"); socketChannel.close(); selectionKey.cancel(); return; }
System.out.println("客户端" + socketChannel.getRemoteAddress() + " 发送了:" + new String(buffer.array(), 0, read));
for (SelectionKey key : selectionKey.selector().keys()) { Channel targetChannel = key.channel(); if (targetChannel instanceof SocketChannel && targetChannel != socketChannel) {
System.out.println(buffer.remaining()); System.out.println("开始广播"); try { buffer.flip(); ((SocketChannel) targetChannel).write(buffer); } catch (IOException e) { targetChannel.close(); key.cancel(); throw new RuntimeException(e); } } }
buffer.clear();
} catch (IOException e) { try { System.out.println("发生异常,客户端 " + socketChannel.getRemoteAddress() + " 已断开"); socketChannel.close(); selectionKey.cancel(); } catch (IOException ex) { System.err.println("关闭客户端通道时出错: " + ex.getMessage()); } throw new RuntimeException(e); } }
private static void handleAcceptable(SelectionKey selectionKey) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ); System.out.println("有客户端连接");
} catch (IOException e) {
throw new RuntimeException(e); }
} }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| package org.example.io_08;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set;
public class NioClient { private static String name = "sophia";
public static void main(String[] args) { try { Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_CONNECT); boolean connect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999)); if (connect) { System.out.println("连接成功"); }
new Thread(new ReadThread(selector)).start();
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String next = scanner.next(); String msg = name + ":" + next; socketChannel.write(ByteBuffer.wrap(msg.getBytes())); }
} catch (IOException e) { throw new RuntimeException(e); }
} }
class ReadThread implements Runnable { private Selector selector;
public ReadThread(Selector selector) { this.selector = selector; } @Override public void run() { try {
while (true){ selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator(); while (selectionKeyIterator.hasNext()) { SelectionKey next = selectionKeyIterator.next(); selectionKeyIterator.remove();
if (next.isConnectable()){ SocketChannel channel = (SocketChannel) next.channel(); channel.finishConnect(); channel.register(selector, SelectionKey.OP_READ); }
if (next.isReadable()) { SocketChannel channel = (SocketChannel) next.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.limit())); } } }
} catch (IOException e) { throw new RuntimeException(e); } } }
|