实现步骤
构建Selector以及服务端监听通道
启动监听并处理建立连接请求
处理读数据
群发数据实现
客户端测试实现
服务端实现
2.0 服务端完整代码
服务端的主要功能如下
1.开放监听端口,方法ChatServer构造方法
2.处理链接请求,方法listener实现连接的建立
3.读取消息内容,方法readData
4. 转发消息给当前所有在线的人,方法sendData2All
package com.hgy.chat;
/**
* 群聊服务器
*/
public class ChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
/**
* 初始化服务端
*/
public ChatServer() {
try {
// 创建Selector以及ServerSocketChannel
selector = Selector.open();
serverSocketChannel = serverSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//将服务端监听通道注册到Selector中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 监听客户端操作
*/
public void listener() {
while (true) {
try {
if (selector.select(1000) == 0) {
continue;
}
//获得所有有事件的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//如果当前key是处理链接类型
if (key.isAcceptable()) {
SocketChannel socketChannel =
serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 当前链接是读数据类型
if (key.isReadable()) {
readData(key);
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 读取数据并群发给所有的用户
* @param key
*/
private void readData(SelectionKey key) {
try {
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
String s = new String(byteBuffer.array());
// 写到其他所有客户端
sendData2All(s);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群发给所有的用户
* @param msg 需要发送的消息
*/
private void sendData2All(String msg) {
try {
// 当前在selector上注册的所有key就是所有用户
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
// 获取每个用户的通道
SelectableChannel channel = key.channel();
// 实现数据发送
if (channel instanceof SocketChannel) {
System.out.println(":::" + msg);
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
SocketChannel socketChannel = (SocketChannel) channel;
socketChannel.write(byteBuffer);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
chatServer.listener();
}
}
2.1 构建Selector以及服务端监听通道
当ChatServer对象被创建时具体实现步骤如下
1.创建serverSocketChannel对象
2.设置处理模式为非阻塞模式
3.绑定监听端口
4.将channel注册到selector中
public class ChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
/**
* 初始化服务端
*/
public ChatServer() {
try {
// 创建Selector以及ServerSocketChannel
selector = Selector.open();
serverSocketChannel = serverSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//将服务端监听通道注册到Selector中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.2 实现监听并处理建立连接请求
连接请求处理实现步骤
1. 获得所有有事件的key, 通过key就可以拿到用户的SocketChannel
2. 循环遍历每一个key,判断当前是读事件,还是建立连接事件
3. 如果是建立连接事件则直接将该通道注册到selector中
4. 如果是读数据事件就交给具体的读数据方法处理数据
/**
* 监听客户端操作
*/
public void listener() {
while (true) {
try {
if (selector.select(1000) == 0) {
continue;
}
//获得所有有事件的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//如果当前key是处理链接类型
if (key.isAcceptable()) {
SocketChannel socketChannel =
serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 当前链接是读数据类型
if (key.isReadable()) {
readData(key);
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.3 处理读数据
数据处理的具体实现步骤
1. 通过key获取和用户连接的通道(相当于输入流)
2. 获取通道的数据并打印
3. 将数据转发给其他在线用户
/**
* 读取数据并群发给所有的用户
* @param key
*/
private void readData(SelectionKey key) {
try {
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
String s = new String(byteBuffer.array());
// 写到其他所有客户端
sendData2All(s);
}
} catch (IOException e) {
e.printStackTrace();
}
}
2.4 群发数据实现
数据群发实现步骤
1. 当前在线用户实际上就是selector中所有注册的key,也就是在线的用户
2. 通过key拿到和用户的链接讲消息转发出去
/**
* 群发给所有的用户
* @param msg 需要发送的消息
*/
private void sendData2All(String msg) {
try {
// 当前在selector上注册的所有key就是所有用户
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
// 获取每个用户的通道
SelectableChannel channel = key.channel();
// 实现数据发送
if (channel instanceof SocketChannel) {
System.out.println(":::" + msg);
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
SocketChannel socketChannel = (SocketChannel) channel;
socketChannel.write(byteBuffer);
}
}
} catch (Exception e) {
e.printStackTrace();
}}
2.5 启动服务端
public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
chatServer.listener();
}
客户端实现
客户端实现
1. 首先创建SocketChannel对象并链接到具体的服务器
2. 将通道注册到selector中
3. 开启一个新的线程监听selector中所有key的事件
4. 在主线程中循环阻塞获取用户的输入
public class ChatClient {
public static void main(String[] args) throws Exception {
// 客户端代码, 建立连接
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open(new
InetSocketAddress("127.0.0.1", 8888));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
// 开启一个新的线程轮询当前客户是否有可读消息
new Thread(() -> {
while (true) {
try {
int select = selector.select(1000);
// 有可读消息进行解析打印
if (select > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
SocketChannel channel = (SocketChannel)
key.channel();
ByteBuffer byteBuffer =
ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println(":==:" + new
String(byteBuffer.array()));
// 写到其他所有客户端
System.out.println(new
String(byteBuffer.array()));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 主线程中循环获取用户输入的聊天消息
while(true) {
Scanner scanner = new Scanner(System.in);
//发送用户的消息
socketChannel.write(ByteBuffer.wrap(scanner.nextLine().getBytes()));
}
}
}