NIO—前锋

Time: 2024-11-05 Tuesday 11:19:01
Author: Jackasher

NIO-前锋

我发现前锋讲的还不错

如何理解这个NIO

首先这是一个SocketServerChannel的服务端代码,它的非阻塞体现在 int select = selector.select();,这里只接受事件,而不做具体的处理,在BIO中,客户端连接后,必须要等待输入,这就是阻塞操作,而现在不需要,把连接和读写操作分离开来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package org.example.io_05;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
* @author Jackasher
* @version 1.0
* @className Server
* @since 1.0
**/
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("等待事件发生");
int select = selector.select();
System.out.println("某个事件发生了");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey next = selectionKeyIterator.next();
handleChannel(next, selectionKeyIterator);

}
}


}

private static void handleChannel(SelectionKey next, Iterator<SelectionKey> selectionKeyIterator) {
if (next.isAcceptable()) {
System.out.println("有客户端连接");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) next.channel();
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(next.selector(), SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
if (next.isReadable()) {
System.out.println("有客户端发送数据");
SocketChannel socketChannel = (SocketChannel) next.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = 0;
try {
int len = socketChannel.read(buffer);
if (len != -1) {
System.out.println("客户端发送的数据:" + new String(buffer.array(), 0, len));
}

} catch (IOException e) {
throw new RuntimeException(e);
}



}

selectionKeyIterator.remove();
}
}

Buffer.flip()

buffer默认limit就是capacity,经过读模式后,变成老position模式,buffer.flip()

​ •用途:切换缓冲区模式,以准备读取已经写入的数据。

​ •作用:将缓冲区的 limit 设置为当前位置 position,然后将 position 设置为 0。这样,缓冲区就可以从头开始读取已写入的数据。

​ •典型使用场景:在向缓冲区写入数据后,调用 flip() 准备将数据从缓冲区中读取出来。flip() 通常在 put()(写入数据)操作后紧接着使用,用于切换到读取模式。

image-20241103233809036

非阻塞

这个非阻塞会使ServerSocket.accept()时,不会等待,如果没有连接,会直接跳过

1
serverSocketChannel.configureBlocking(false);

UDP

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.example.io_06;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

/**
* @author Jackasher
* @version 1.0
* @className Server
* @since 1.0
**/
public class Server {
public static void main(String[] args) throws IOException {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.bind(new InetSocketAddress(9999));
while (true) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketAddress receive = datagramChannel.receive(byteBuffer);
int read = datagramChannel.read(byteBuffer);
byteBuffer.clear();
byteBuffer.flip();
System.out.println(receive + ":" + new String(byteBuffer.array(), 0, byteBuffer.limit()));
}
}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package org.example.io_06;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

/**
* @author Jackasher
* @version 1.0
* @className Client
* @since 1.0
**/
public class Client {
public static void main(String[] args) throws IOException {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.send(ByteBuffer.wrap("Hello Server".getBytes()),
new java.net.InetSocketAddress("127.0.0.1", 9999));

datagramChannel.connect(new InetSocketAddress("127.0.0.1", 9999));

datagramChannel.write(ByteBuffer.wrap("Hello Server".getBytes()));
}
}

NioChat

至此,已经完成了我Nio的学习,实现了基于Nio的Selector多路复合器的即时群聊天室的功能,之前在广播一直有问题,我发现最多只有两个人收到信息然后我发现是在socketChannel,write(buffer)时,会改变position的位置,每次发送时,应该flip一下

但是同时注意到,之前事例代码是没有flip的,因为使用的wrap,每一次都是新的buffer

1
client.write(ByteBuffer.wrap(msg.getBytes()));

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package org.example.io_08;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
* @author Jackasher
* @version 1.0
* @className NioServerWithIssues
* @since 1.0
**/
public class NioServerWithIssues {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
selector.select();
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();

if (selectionKeyIterator == null) {
continue;
}

try {
while (selectionKeyIterator.hasNext()) {
SelectionKey selectionKey = selectionKeyIterator.next();
if (selectionKey.isAcceptable()) {
handleAcceptable(selectionKey);
}
if (selectionKey.isReadable()) {
handleReadable(selectionKey);
}

selectionKeyIterator.remove();

}
} catch (RuntimeException e) {
System.out.println("关闭出错");
}


}
} catch (IOException e) {
throw new RuntimeException(e);
}

}

private static void handleReadable(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.configureBlocking(false);

if (socketChannel == null) {
return;
}

try {

//读取客户端数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read;
read = socketChannel.read(buffer);

if (read == -1) {
System.out.println("客户端 " + socketChannel.getRemoteAddress() + " 已下线");
socketChannel.close();
selectionKey.cancel(); // Remove the selection key for this channel
return;
}


System.out.println("客户端" + socketChannel.getRemoteAddress() + " 发送了:" + new String(buffer.array(), 0, read));

//广播数据

for (SelectionKey key : selectionKey.selector().keys()) {
Channel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel && targetChannel != socketChannel) {

System.out.println(buffer.remaining());
System.out.println("开始广播");
try {
buffer.flip();
((SocketChannel) targetChannel).write(buffer);
} catch (IOException e) {
targetChannel.close();
key.cancel();
throw new RuntimeException(e);
}
}
}

buffer.clear();


} catch (IOException e) {
try {
System.out.println("发生异常,客户端 " + socketChannel.getRemoteAddress() + " 已断开");
socketChannel.close();
selectionKey.cancel();
} catch (IOException ex) {
System.err.println("关闭客户端通道时出错: " + ex.getMessage());
}
throw new RuntimeException(e);
}
}

private static void handleAcceptable(SelectionKey selectionKey) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
System.out.println("有客户端连接");

} catch (IOException e) {

throw new RuntimeException(e);
}

}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package org.example.io_08;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
* @author Jackasher
* @version 1.0
* @className NioClient
* @since 1.0
**/
public class NioClient {
private static String name = "sophia";



public static void main(String[] args) {
try {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
boolean connect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
if (connect) {
System.out.println("连接成功");
}

new Thread(new ReadThread(selector)).start();

Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String next = scanner.next();
String msg = name + ":" + next;
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
}


} catch (IOException e) {
throw new RuntimeException(e);
}

}
}

class ReadThread implements Runnable {
private Selector selector;

public ReadThread(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {

while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey next = selectionKeyIterator.next();
selectionKeyIterator.remove();

if (next.isConnectable()){
SocketChannel channel = (SocketChannel) next.channel();
channel.finishConnect();
channel.register(selector, SelectionKey.OP_READ);
}

if (next.isReadable()) {
SocketChannel channel = (SocketChannel) next.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
}
}
}

} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

NIO—前锋
http://example.com/2024/11/05/NIO—前锋/
作者
Jack Asher
发布于
2024年11月5日
许可协议