JavaEE鸿蒙应用开发HTML&JS+前端Python+大数据开发人工智能开发电商视觉设计软件测试新媒体+短视频直播运营产品经理集成电路应用开发(含嵌入式)Linux云计算+运维开发C/C++拍摄剪辑+短视频制作PMP项目管理认证电商运营Go语言与区块链大数据PHP工程师Android+物联网iOS.NET

NIO实现群聊

来源:黑马程序员

浏览7765人

2020.10.30

实现步骤

构建Selector以及服务端监听通道 

启动监听并处理建立连接请求 

处理读数据 

群发数据实现 

客户端测试实现

服务端实现

2.0 服务端完整代码

服务端的主要功能如下 

1.开放监听端口,方法ChatServer构造方法

2.处理链接请求,方法listener实现连接的建立 

3.读取消息内容,方法readData 

4. 转发消息给当前所有在线的人,方法sendData2All

package com.hgy.chat;

/**

* 群聊服务器

*/

public class ChatServer {

private ServerSocketChannel serverSocketChannel;

private Selector selector;

/**

* 初始化服务端

*/

public ChatServer() {

try {

// 创建Selector以及ServerSocketChannel

selector = Selector.open();

serverSocketChannel = serverSocketChannel.open();

serverSocketChannel.configureBlocking(false);

serverSocketChannel.socket().bind(new InetSocketAddress(8888));

//将服务端监听通道注册到Selector中

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 监听客户端操作

*/

public void listener() {

while (true) {

try {

if (selector.select(1000) == 0) {

continue;

}

//获得所有有事件的key

Set<SelectionKey> selectionKeys = selector.selectedKeys();

Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

//如果当前key是处理链接类型

if (key.isAcceptable()) {

SocketChannel socketChannel =

serverSocketChannel.accept();

socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

}

// 当前链接是读数据类型

if (key.isReadable()) {

readData(key);

}

iterator.remove();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

/**

* 读取数据并群发给所有的用户

* @param key

*/

private void readData(SelectionKey key) {

try {

if (key.isReadable()) {

SocketChannel channel = (SocketChannel) key.channel();

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

channel.read(byteBuffer);

String s = new String(byteBuffer.array());

// 写到其他所有客户端

sendData2All(s);

}

} catch (IOException e) {

e.printStackTrace();

}

}

/**

* 群发给所有的用户

* @param msg 需要发送的消息

*/

private void sendData2All(String msg) {

try {

// 当前在selector上注册的所有key就是所有用户

Set<SelectionKey> keys = selector.keys();

for (SelectionKey key : keys) {

// 获取每个用户的通道

SelectableChannel channel = key.channel();

// 实现数据发送

if (channel instanceof SocketChannel) {

System.out.println(":::" + msg);

ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());

SocketChannel socketChannel = (SocketChannel) channel;

socketChannel.write(byteBuffer);

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

ChatServer chatServer = new ChatServer();

chatServer.listener();

}

}

2.1 构建Selector以及服务端监听通道

当ChatServer对象被创建时具体实现步骤如下 

1.创建serverSocketChannel对象

2.设置处理模式为非阻塞模式

3.绑定监听端口

4.将channel注册到selector中

public class ChatServer {

private ServerSocketChannel serverSocketChannel;

private Selector selector;

/**

* 初始化服务端

*/

public ChatServer() {

try {

// 创建Selector以及ServerSocketChannel

selector = Selector.open();

serverSocketChannel = serverSocketChannel.open();

serverSocketChannel.configureBlocking(false);

serverSocketChannel.socket().bind(new InetSocketAddress(8888));

//将服务端监听通道注册到Selector中

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

} catch (Exception e) {

e.printStackTrace();

}

}

}

2.2 实现监听并处理建立连接请求

连接请求处理实现步骤

1. 获得所有有事件的key, 通过key就可以拿到用户的SocketChannel

2. 循环遍历每一个key,判断当前是读事件,还是建立连接事件

3. 如果是建立连接事件则直接将该通道注册到selector中

4. 如果是读数据事件就交给具体的读数据方法处理数据

/**

* 监听客户端操作

*/

public void listener() {

while (true) {

try {

if (selector.select(1000) == 0) {

continue;

}

//获得所有有事件的key

Set<SelectionKey> selectionKeys = selector.selectedKeys();

Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

//如果当前key是处理链接类型

if (key.isAcceptable()) {

SocketChannel socketChannel =

serverSocketChannel.accept();

socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

}

// 当前链接是读数据类型

if (key.isReadable()) {

readData(key);

}

iterator.remove();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

2.3 处理读数据

数据处理的具体实现步骤

1. 通过key获取和用户连接的通道(相当于输入流)

2. 获取通道的数据并打印

3. 将数据转发给其他在线用户

/**

* 读取数据并群发给所有的用户

* @param key

*/

private void readData(SelectionKey key) {

try {

if (key.isReadable()) {

SocketChannel channel = (SocketChannel) key.channel();

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

channel.read(byteBuffer);

String s = new String(byteBuffer.array());

// 写到其他所有客户端

sendData2All(s);

}

} catch (IOException e) {

e.printStackTrace();

}

}

2.4 群发数据实现

数据群发实现步骤

1. 当前在线用户实际上就是selector中所有注册的key,也就是在线的用户

2. 通过key拿到和用户的链接讲消息转发出去

/**

* 群发给所有的用户

* @param msg 需要发送的消息

*/

private void sendData2All(String msg) {

try {

// 当前在selector上注册的所有key就是所有用户

Set<SelectionKey> keys = selector.keys();

for (SelectionKey key : keys) {

// 获取每个用户的通道

SelectableChannel channel = key.channel();

// 实现数据发送

if (channel instanceof SocketChannel) {

System.out.println(":::" + msg);

ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());

SocketChannel socketChannel = (SocketChannel) channel;

socketChannel.write(byteBuffer);

}

}

} catch (Exception e) {

e.printStackTrace();

}}

2.5 启动服务端

public static void main(String[] args) {

ChatServer chatServer = new ChatServer();

chatServer.listener();

}

客户端实现

客户端实现

1. 首先创建SocketChannel对象并链接到具体的服务器

2. 将通道注册到selector中

3. 开启一个新的线程监听selector中所有key的事件

4. 在主线程中循环阻塞获取用户的输入

public class ChatClient {

public static void main(String[] args) throws Exception {

// 客户端代码, 建立连接

Selector selector = Selector.open();

SocketChannel socketChannel = SocketChannel.open(new

InetSocketAddress("127.0.0.1", 8888));

socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

// 开启一个新的线程轮询当前客户是否有可读消息

new Thread(() -> {

while (true) {

try {

int select = selector.select(1000);

// 有可读消息进行解析打印

if (select > 0) {

for (SelectionKey key : selector.selectedKeys()) {

if (key.isReadable()) {

SocketChannel channel = (SocketChannel)

key.channel();

ByteBuffer byteBuffer =

ByteBuffer.allocate(1024);

channel.read(byteBuffer);

System.out.println(":==:" + new

String(byteBuffer.array()));

// 写到其他所有客户端

System.out.println(new

String(byteBuffer.array()));

}

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

}).start();

// 主线程中循环获取用户输入的聊天消息

while(true) {

Scanner scanner = new Scanner(System.in);

//发送用户的消息

socketChannel.write(ByteBuffer.wrap(scanner.nextLine().getBytes()));

}

}

}