IO 多路复用
高并发 IO
为避免用户直接操作内核,保证内核安全,操作系统将内存划分为两个部分:内核空间和用户空间,普通应用程序不允许直接在内核空间进行读写,必须通过系统调用向内核发出指令;
应用程序进行 IO 读写基本上会用到底层的 read 和 write 两大系统调用,但他们不会直接从物理设备进行读写操作,而是面向缓冲区读写;相当于 IO 操作其实是进程缓冲区和内核缓冲区的数据交换,底层的物理读写交换是由内核完成的;
为什么设置缓冲区,不嫌麻烦吗:外部设备的直接读写涉及操作系统的中断(进程是停止状态,中断前需要保存进程数据和状态等信息),中断结束后需要恢复(恢复之前保存的数据和状态等信息),频繁中断会损耗系统性能,所以出现了缓冲区。
四种主要的 IO 模型
阻塞与非阻塞:阻塞 IO 指的是需要内核 IO 操作彻底完成后才返回到用户空间执行用户程序的操作指令。
同步与异步:同步 IO 是指用户空间(进程或者线程)是主动发起 IO 请求的一方,系统内核是被动接收方。异步 IO 则反过来,系统内核是主动发起 IO 请求的一方,用户空间是被动接收方。
同步阻塞 IO(Blocking IO,BIO)
从 Java 应用程序发起 IO 系统调用开始,一直到系统调用返回,这段时间内发起 IO 请求的 Java 进程(或者线程)是阻塞的。直到返回成功后,应用进程才能开始处理用户空间的缓冲区数据。
同步阻塞 IO 一般情况下会为每个连接配备一个独立的线程,一个线程维护一个连接的 IO 操作,在高并发的应用场景下性能很低,基本上是不可用的。
同步非阻塞 IO(Non-Blocking IO,NIO)
在内核缓冲区中没有数据的情况下,系统调用会立即返回失败的信息(需要不断地发起 IO 系统调用直到获得数据)。在内核缓冲区中有数据的情况下,在数据的复制过程中系统调用是阻塞的,直到完成数据从内核缓冲区复制到用户缓冲区。
同步非阻塞 IO 的缺点是,每个线程要不断地轮询内核,这将占用大量的 CPU 时间,效率低下。在 Java 的实际开发中,不会涉及这种 IO 模型。
IO 多路复用
采用 IO 多路复用模型可避免同步非阻塞 IO 模型中轮询等待的问题;为了提高性能,操作系统引入了一种专门用于查询 IO 文件描述符(含 socket 连接)就绪状态的系统调用,在 Linux 系统中,新的系统调用为 select/epoll 系统调用;
IO 多路复用模型的 IO 涉及两种系统调用,一种是 IO 操作的系统调用,另一种是 select/epoll 就绪查询系统调用,其中 epoll 系统调用是 select 的升级版。
IO 多路复用模型:应用程序要进行 IO 操作时,先注册文件描述符到 select/epoll 系统调用(Java NIO 的 Selector 底层用的就是 epoll),然后轮询 select/epoll 查询文件的就绪状态(返回所有就绪列表),一旦就绪,就阻塞等待文件从内核缓冲区复制到用户缓冲区(数据从内核缓冲区复制到用户缓冲区线程会短暂阻塞);优点是一个选择器查询线程可以同时处理成千上万的网络连接,从而大大减少了系统的开销。
提示
简单理解,将 IO 多路复用模型想象成餐厅只有一个超级服务员(Selector),他是这样工作的:
- 登记关注名单 (注册到 Selector):注明关心的事件“A 桌点菜(OP_READ)”、“B 桌结账(OP_WRITE)”
- 巡视询问 (轮询 - select/poll/epoll):定期巡视“A 桌有菜吗?B 桌有结账吗?”(调用 select() / poll() / epoll_wait() 系统调用),或者,他去厨房看看,菜是否做好了(epoll 的“事件驱动”模式,更高效)。
- 获取就绪列表 (内核返回就绪事件):系统(内核)给他一个就绪餐桌列表“A 桌有菜了(可读),B 桌需要结账(可写)”,这个过程是非阻塞的。
- 专注服务就绪桌 (处理就绪事件):他先走到 A 桌记录客人点的菜,此时线程会短暂阻塞(内核将数据从内核缓冲区复制到用户缓冲区),然后去 B 桌结账。
- 循环处理:回到第 2 步,继续巡视询问。
IO 多路复用之所以高效,是因为一个服务员(线程)就能服务整个餐厅(成千上万连接),换句话说,一个线程利用内核的能力,同时监控多个网络连接,只在连接真正准备好读写数据时才去操作它,从而用极少的资源服务海量并发连接。
Spring Boot 程序处理 HTTP 请求(无论是通过 Tomcat NIO 还是 Netty)的核心高效通信机制,就是基于 IO 多路复用模型(Selector)。多个 HTTP 请求对应的 TCP 连接会被注册到 Selector 上,Selector 负责高效轮询或等待内核通知哪些连接有数据就绪,然后工作线程只处理这些就绪的连接,进行实际的 HTTP 请求读取、业务处理、响应写入。这就是 Spring Boot 应用能够支持高并发的底层网络基础。
异步 IO(Asynchronous IO,AIO)
理论上来说,异步 IO 是真正的异步输入输出,它的吞吐量高于 IO 多路复用模型的吞吐量。就目前而言,Windows 系统下通过 IOCP 实现了真正的异步 IO。在 Linux 系统下,异步 IO 模型在 2.6 版本才引入,JDK 对它的支持目前并不完善,因此异步 IO 在性能上没有明显的优势。
大多数高并发服务端的程序都是基于 Linux 系统的。因而,目前这类高并发网络应用程序的开发大多采用 IO 多路复用模型。大名鼎鼎的 Netty 框架使用的就是 IO 多路复用模型,而不是异步 IO 模型。
合理配置支持百万级并发连接
Linux 操作系统中文件句柄数的限制默认值为 1024,也就是说,一个进程最多可以接受 1024 个 socket 连接,这是远远不够的,没有办法支撑百万级的并发网络连接。 文件句柄也叫文件描述符,是内核为了高效管理已被打开的文件所创建的索引;
查看最大文件句柄数量
ulimit -n修改句柄数量
# 临时修改,只对当前 shell 会话有效 ulimit -n 65536 # 永久修改;编辑/etc/rc.local 开机启动文件,在文件中添加如下内容 ulimit -SHn 65536彻底解除 Linux 系统的最大文件打开数量
上面示例-S 和-H 两个命令选项,-S 表示软性极限值,-H 表示硬性极限值;硬性极限值是实际的限制,软性极限值则是系统发出警告的极限值;上面的示例中,65536 不能再多了,可以通过彻底解除 Linux 系统的最大文件打开数量的限制,来满足更高的需求:
# 编辑/etc/security/limits.conf,加入如下内容 # 在某些系统中可能存在 /etc/security/limits.d/20-nproc.conf 文件,如果存在,也需要加入下面配置 # 其中 nofile 和 nproc 分别表示 文件打开数量和进程打开数量 # * 表示所有用户和所有组,也可以具体指定,如 root@Ubuntu,或直接指定用户,如 root * soft nofile 65536 * hard nofile 65536 * soft nproc 131072 * hard nproc 131072配置完成后重新登录即可生效。
在使用和安装目前非常流行的分布式搜索引擎 ElasticSearch 时,必须修改这个文件,以增加最大的文件描述符的极限值。当然,在生产环境运行 Netty 时,也需要修改/etc/security/limits.conf 文件来增加文件描述符数量的极限值。
Java NIO
Java 从 1.4 版本开始,引进了新的异步 IO 库,被称为 Java New IO 类库,简称为 Java NIO(注意,Java NIO 其底层实现是基于 IO 多路复用,和 IO 模型中的同步非阻塞(NIO)不同)。称“老的”阻塞式 Java IO 为 OIO(Old IO)。
高性能的 Java 通信离不开 Java NIO 组件,现在主流的技术框架或中间件服务器都使用了 Java NIO 组件,譬如 Tomcat、Jetty、Netty。
Netty 的底层核心思想和技术基础源于 JDK NIO,它对 JDK NIO 进行了深度的优化、扩展和重新设计;因此,学习 Netty 之前先了解一下 JDK NIO 的核心组件和事件驱动模型。
三个核心组件
Java NIO 属于 IO 多路复用模型。只不过,Java NIO 组件提供了统一的 API,为大家屏蔽了底层的操作系统的差异。 Java NIO 类库包含以下三个核心组件:
Channel(通道)
一个网络连接使用一个通道表示,所有 NIO 的 IO 操作都是通过连接通道完成的。一个通道类似于 OIO(面向流的阻塞式 IO,输入、输出流分别单独处理)中两个流的结合体,既可以从通道读取数据,也可以向通道写入数据。
Buffer(缓冲区)
Buffer 本质上是一个内存块,既可以从中写入数据,也可以读取数据。Java NIO 中代表缓冲区的 Buffer 类是一个抽象类,位于 java.nio 包中。
通道的读取,就是将数据从通道读取到缓冲区中;所谓通道的写入,就是将数据从缓冲区写入通道中。
通道和缓冲区的联系比较密切:数据总是从通道读到缓冲区内,或者从缓冲区写入通道中
Selector(选择器)
选择器主要工作是通道的注册、监听、事件查询。简单地说,选择器的使命是完成 IO 的多路复用。
通过选择器,一个线程可以查询多个通道的 IO 事件的就绪状态。一旦其中的一个或者多个文件描述符可读或者可写,该监听进程/线程就能够进行 IO 就绪事件的操作。
选择器的最大优势是系统开销小,系统不必为每一个网络连接(文件描述符)创建进程/线程,从而大大减少了系统的开销;
Netty 核心原理与基础实战
Netty 是一个 Java NIO 客户端/服务器框架,是一个为了快速开发可维护的高性能、高可扩展的网络服务器和客户端程序而提供的异步事件驱动基础框架和工具。
基于 Netty,可以快速轻松地开发网络服务器和客户端的应用程序。与直接使用 Java NIO 相比,Netty 给大家造出了一个非常优美的轮子,它可以大大简化网络编程流程。例如,Netty 极大地简化了 TCP、UDP 套接字和 HTTP Web 服务程序的开发;除此之外,还可以做到“快速和轻松”地开发应用层协议的通信程序,如 FTP、SMTP、HTTP 以及其他的传统应用层。
Reactor 模式
高性能网络编程都绕不开 Reactor 模式。Nginx、Redi、Netty 等,都是是基于 Reactor 模式的。
Netty 的 Reactor 模式是其核心架构思想,而 Java NIO 是实现这个思想的关键底层技术支撑。就像建筑师设计酒店(Reactor 模式),但施工时仍需要砖块(NIO 组件)。
Reactor 模式核心思想是事件驱动(应用程序响应发生的事件而非主动轮询),由 Reactor 线程、Handlers 处理器两大角色组成,两大角色的职责分别如下:
Reactor 线程(反应器):负责响应 IO 事件(IO 事件查询),并且分发到 Handlers 处理器。
Handlers(处理器):非阻塞的执行具体的业务逻辑。
为便于理解,以下是一个单线程 Reactor 模式实现 的示意代码,了解即可:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
/**
* @ Description: 单线程 Reactor 模式实现 - Java NIO 示例
* 仅供学习参考,单线程 Reactor 模式中,Reactor 和 Handler 都在同一条线程中执行
* 不能充分利用多核资源,在高性能服务器应用场景中,单线程 Reactor 模式实际使用的很少
* 测试连接: cmd 窗口 输入指令: telnet localhost 8080
**/
public class SingleThreadReactor implements Runnable {
// 在网络编程中,"事件" 指的是操作系统检测到的网络状态变化。可以理解为当某个网络通道发生特定变化时,操作系统发出的通知。
// 主要的事件类型为: OP_ACCEPT(服务器准备好接受新连接)、OP_CONNECT(客户端连接建立完成)、OP_READ(通道中有数据可读)、OP_WRITE(通道中有数据可写)。
private final Selector selector; // 事件多路复用器,调度中心
// ServerSocketChannel 每个端口一个实例,用于绑定端口,而 SocketChannel 代表单个 TCP 连接, ServerSocketChannel 为每个连接创建 SocketChannel
private final ServerSocketChannel serverSocket;
// Reactor:相当于中控台,负责事件分发和事件循环;不同事件的业务由工作线程定义
public SingleThreadReactor(int port) throws IOException {
// 1. 创建选择器(Selector) 和 通道(ServerSocketChannel)
selector = Selector.open(); // 创建 Selector
serverSocket = ServerSocketChannel.open(); // 创建 ServerSocketChannel,绑定端口
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false); // 设置为非阻塞
// 2. 将通道(这里是 ServerSocketChannel)注册到选择器(Selector)上
// OP_ACCEPT 表示关注"接受新连接"事件,当有新的客户端连接请求到达时,选择器会检测到这个事件,
// 返回的 SelectionKey 对象是注册关系的凭证,包含三个重要信息:通道、选择器、关注的操作集(这里只有 OP_ACCEPT)
SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
key.attach(new Acceptor()); // 建立事件与处理器的关联
System.out.println("服务器启动,监听端口:" + port);
}
@Override
public void run() {
try {
// 3. 事件循环 - Reactor 核心
while (!Thread.interrupted()) { // 只要当前线程没有被中断,就继续执行事件循环
selector.select(); // 阻塞等待事件,不断调用 selector.select() 等待事件发生
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// 4. 分发事件到绑定的 Handler
dispatch(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 事件分发方法
private void dispatch(SelectionKey key) {
// 5. 通过 attachment() 获取之前绑定的 Handler
Runnable handler = (Runnable) key.attachment(); // 事件触发时获取处理器(这里底层关系到系统调用,由操作系统通知具体事件)
if (handler != null) {
// !!! 这里非常关键!handler.run() 执行的事件处理器,可能是 Acceptor,也可能是 EchoHandler,由 SelectionKey 决定
// 观察整个代码,在注册 ServerSocketChannel 时,SelectionKey 绑定了 Acceptor 处理器;在注册 SocketChannel 时,绑定了 EchoHandler 处理器
// 两个处理器的 run 方法执行时机,取决于 SelectionKey 绑定的事件类型
handler.run();
}
}
// 处理【新连接事件】的处理器,当有新连接到达时,Acceptor 负责处理
class Acceptor implements Runnable {
@Override
public void run() {
try {
// 6. 接受新连接, SocketChannel 代表每一个新连接
SocketChannel client = serverSocket.accept(); // 这里和 OP_ACCEPT 对应,表示处理【新连接】事件
if (client != null) {
System.out.println("接受新连接:" + client.getRemoteAddress());
// 7. 为新连接创建 Handler 并注册 READ 事件
new EchoHandler(selector, client); // 创建业务处理器
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 处理【每个连接的数据读写事件】的处理器;当 OP_READ 或 OP_WRITE 事件发生时被触发
static class EchoHandler implements Runnable {
private static final int MAX_IN = 256;
private final SocketChannel socket;
private final SelectionKey key;
private final ByteBuffer input = ByteBuffer.allocate(MAX_IN); // 初始化通道缓冲区大小
private final ByteBuffer output = ByteBuffer.allocate(MAX_IN);
// 状态机:0=读取,1=写入,2=关闭
private int state = 0;
EchoHandler(Selector selector, SocketChannel socket) throws IOException {
this.socket = socket;
socket.configureBlocking(false);
// 8. 注册 READ 事件并附加自身 Handler 实例
key = socket.register(selector, SelectionKey.OP_READ);
key.attach(this); // 关键:将 EchoHandler 附加到 SelectionKey, 注册处理器后,处理器的 run 方法执行时机 由 dispatch 方法决定
}
// 具体业务逻辑
@Override
public void run() {
try {
if (state == 0) {
read(); // 读取数据
} else if (state == 1) {
write(); // 写入数据
}
} catch (IOException e) {
close();
}
}
// 读取客户端数据
private void read() throws IOException {
int bytesRead = socket.read(input);
if (bytesRead == -1) {
close();
return;
}
if (bytesRead > 0) {
System.out.println("接收到数据:" + new String(input.array(), 0, bytesRead));
// 9. 准备回写相同数据
// 说明: 网络读写需要缓冲区在"读模式"和"写模式"间切换,这样数据的读取和写入都复用同一块内存区域,避免重复创建/销毁缓冲区,提升数据传输性能
// flip():写模式 → 读模式(准备读取已写入的数据); clear():读模式 → 写模式(准备写入新数据)
input.flip(); // 将 input 缓冲区切换到"读模式",读取客户端数据到 input 缓冲区
output.clear(); // 清空输出缓冲区,准备写入
output.put("your send data is: ".getBytes(StandardCharsets.UTF_8)); // 加句话描述一下
output.put(input); // 将 input 缓冲区的内容复制到 output 缓冲区,相当于将数据原样返回给客户端
input.clear(); // 重置输入缓冲区,准备接收新数据
output.flip(); // 将输出缓冲区从写模式切换到读模式
// 10. 切换状态并注册 WRITE 事件
state = 1;
key.interestOps(SelectionKey.OP_WRITE);
}
}
// 向客户端写数据
private void write() throws IOException {
socket.write(output);
if (!output.hasRemaining()) {
// 11. 数据写完,切换回 READ 状态
state = 0;
key.interestOps(SelectionKey.OP_READ);
}
}
private void close() {
try {
System.out.println("关闭连接:" + socket.getRemoteAddress());
key.cancel();
socket.close();
} catch (IOException e) {
// 忽略关闭异常
}
}
}
public static void main(String[] args) throws IOException {
// 启动服务器
new Thread(new SingleThreadReactor(8080)).start();
}
}Netty 中的 Reactor 模式
反应器
因为 Netty 是基于 Reactor 模式实现的,自然包含两大角色——反应器和处理器;
在 Netty 的 Reactor 模式中,一个反应器(或者 SubReactor 子反应器)会由一个事件处理线程负责事件查询和分发。该线程不断进行轮询,通过 Selector 选择器不断查询注册过的 IO 事件(选择键)。如果查询到 IO 事件,就分发给 Handler 业务处理器。Netty 中的反应器组件有多个实现类,这些实现类与其通道类型相互匹配。对应于 NioSocketChannel 通道,Netty 的反应器类为 NioEventLoop(NIO 事件轮询)。
Netty 的程序开发不会直接使用单个 EventLoop(事件轮询器),而是使用 EventLoopGroup,它是一个 EventLoop 的集合,负责创建并管理多个 EventLoop(一个线程对应一个 EventLoop 子反应器),完成多线程版本的 Reactor 模式实现。
Netty 中反应器的查询和分发的 IO 事件都来自 Channel 组件,对应到不同的协议,Netty 实现了对应的通道,每一种协议基本上都有 NIO 和 OIO 两个版本:
通道类型 描述 NioSocketChannel 异步非阻塞 TCP Socket 传输通道 NioServerSocketChannel 异步非阻塞 TCP Socket 服务器端通道 NioDatagramChannel 异步非阻塞的 UDP 传输通道 NioSctpChannel 异步非阻塞 Sctp 传输通道 NioSctpServerChannel 异步非阻塞 Sctp 服务端监听通道 OioSocketChannel 同步阻塞式 TCP Socket 传输通道 OioServerSocketChannel 同步阻塞式 TCP Socket 服务端监听通道 OioDatagramChannel 同步阻塞式 UDP 传输通道 OioSctpChannel 同步阻塞式 Sctp 传输通道 OioSctpServerChannel 同步阻塞式 Sctp 服务端监听通道 提示
一般来说,服务端编程用到最多的通信协议还是 TCP,对应的 Netty 传输通道类型为 NioSocketChannel 类、Netty 服务器监听通道类型为 NioServerSocketChannel
Netty 中一个反应器可以注册成千上万的通道,每个通道都由一个 Handler 业务处理器负责处理。
处理器
在 Netty 中,EventLoop 反应器内部有一个线程负责 Java NIO 选择器的事件的轮询,然后进行对应的事件分发。事件分发 (Dispatch) 的目标就是执行不同的自定义的业务处理器 Handler。Netty 的 Handler 分为两大类:第一类是 ChannelInboundHandler 入站处理器;第二类是 ChannelOutboundHandler 出站处理器。简单来说,数据从网络来到你的应用程序就是入站(解码,将数据转为对象),数据从你的应用程序发往网络就是出站(编码,将对象转为适合网络传输的字节数据 ByteBuf)。
在 Netty 中,所有数据的入站和出站操作,以及绝大多数事件,都必须流经 ChannelPipeline(通道流水线),这是 Netty 架构设计的核心原则之一。
总结
- EventLoop 反应器(或者 SubReactor 子反应器)和 Channel 通道(代表一个开放的连接,如 TCP 连接或 UDP 会话)之间是一对多的关系,一个反应器可以查询很多个通道的 IO 事件;
- Channel 通道和 Handler 处理器实例之间是多对多的关系,一个通道的 IO 事件可以被多个 Handler 实例处理;一个 Handler 处理器实例也能绑定到很多通道,处理多个通道的 IO 事件。
- ChannelPipeline(通道流水线)是与 Channel 绑定的处理器链,维护了通道和 Handler 处理器实例之间的绑定关系,它负责在 IO 事件到达时,按照注册顺序将事件依次传递给各个 ChannelHandler 处理器,以实现事件的处理与转发。
Bootstrap 工厂类
Bootstrap 类是 Netty 提供的一个便利的工厂类,可以通过它来完成 Netty 的客户端或服务端的 Netty 组件的组装。自己可以手动一点点去手动创建通道、完成各种设置和启动注册到 EventLoop 反应器,然后开始事件的轮询和处理,但是这个过程会非常麻烦,使用这个便利的 Bootstrap 工具类的效率会更高。
Netty 中有两个引导类,分别用于服务器(Bootstrap)和客户端(ServerBootstrap),下面以 ServerBootstrap 类作为重点介绍对象,可以参考以下代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ReferenceCountUtil;
/**
* @ Description: 创建一个 TCP 服务端,当接收到客户端数据时,在控制台打印丢弃的数据
* @ Date: 2025/7/22 17:25:25
* @ Author: ZJX
**/
public class NettyDiscardServer {
private final int serverPort;
public NettyDiscardServer(int port) {
this.serverPort = port;
}
ServerBootstrap b = new ServerBootstrap(); // 创建一个服务端的引导类
public void runServer() {
// 创建反应器轮询组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); // boss 组用于处理【连接监听】
EventLoopGroup workerLoopGroup = new NioEventLoopGroup(); // work 组用于处理【数据传输】和【事件处理】
try {
// 1. 使用引导类设置反应器轮询组
b.group(bossLoopGroup, workerLoopGroup);
// 2. 设置 nio 类型的通道
b.channel(NioServerSocketChannel.class);
// 3. 设置监听端口
b.localAddress(serverPort);
// 4. 设置通道的参数,ChannelOption 有多个配置,其他请参考源码或查阅资料
b.option(ChannelOption.SO_KEEPALIVE, true); // 开启 TCP 底层心跳机制
// 5. 装配子通道流水线,可同时添加多个
b.childHandler(new ChannelInitializer<SocketChannel>() {
// 有连接到达时会创建一个通道
protected void initChannel(SocketChannel ch) {
// 流水线的职责:负责管理通道中的处理器
// 向“子通道”(传输通道)流水线添加一个处理器
ch.pipeline().addLast(new NettyDiscardHandler());
// ch.pipeline().addLast(new OtherHandler());
}
});
// 6. 开始绑定服务器
ChannelFuture channelFuture = b.bind().sync(); // 调用 sync 同步方法阻塞直到绑定成功
System.out.println(" 服务器启动成功,监听端口:" + channelFuture.channel().localAddress());
// 7. 等待通道关闭的异步任务结束
ChannelFuture closeFuture = channelFuture.channel().closeFuture(); // 服务监听通道会一直等待通道关闭的异步任务结束
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8. 优雅关闭 EventLoopGroup
// 释放掉所有资源,包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
// 配置一个入站通道处理器
class NettyDiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
System.out.println("收到消息,丢弃如下:");
while (in.isReadable()) {
System.out.print((char) in.readByte());
}
System.out.println(); // 换行
} finally {
ReferenceCountUtil.release(msg);
}
}
}
public static void main(String[] args) {
int port = 8080;
new NettyDiscardServer(port).runServer();
}
}<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.2.3.Final</version>
</dependency>解码、编码、序列化、反序列化
解码: 在入站处理过程中,需要将 ByteBuf 二进制类型解码为更有意义的类型(如 POJO)。这个解码过程可以通过 Netty 的 Decoder(解码器)去完成。
编码: 在出站处理过程中,需要将有意义的类型(如 POJO)编码成更适合传输的类型 (常为 ByteBuf)。这个编码过程可以通过 Netty 的 Encoder(编码器)去完成。
序列化:将内存中的 Java 对象转换成适合网络传输或存储的字节序列 (ByteBuf),如 JSON、XML、Protobuf 等。
反序列化:将从网络接收到的字节序列 (ByteBuf) 转换回内存中的 Java 对象。
提示
编码和解码包含或依赖于序列化、反序列化;序列化/反序列化解决对象与原始字节的转换问题,编码/解码(特别是其中的粘包处理部分)解决原始字节在流式传输中的边界识别问题,是可靠传输的保障。
粘包和半包:粘包(多条应用层消息的数据粘在一起,出现在同一个 ByteBuf 中)和半包(一条应用层消息的数据被拆分成多个部分,分次到达)会导致解码器无法正确识别消息的边界,导致反序列化失败;
简单 IM 系统
以下是一个使用 Netty 实现的简单的 IM 系统示例,客户端发送消息,服务端控制台上打印出收到的内容。
更多内容如使用 JSON 封装消息体、聊天室分组、私聊请参考 其他示例。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ Description: 服务端:接受连接、转发消息
* 测试连接: 使用客户端测试连接;或 cmd 窗口 输入指令: telnet localhost 8080
**/
public class NettyIMServer {
// 存储在线客户端,收到消息时,将消息广播给每个客户端
private static final ConcurrentHashMap<ChannelId, Channel> clients = new ConcurrentHashMap<>();
public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // 创建一个服务端的引导类
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder(StandardCharsets.UTF_8));
p.addLast(new StringEncoder(StandardCharsets.UTF_8));
p.addLast(new SimpleChannelInboundHandler<String>() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
clients.put(ctx.channel().id(), ctx.channel());
System.out.println("客户端连接:" + ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
clients.remove(ctx.channel().id());
System.out.println("客户端断开:" + ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("收到消息:" + msg);
// 广播消息给其他客户端
for (Channel channel : clients.values()) {
if (channel != ctx.channel()) {
channel.writeAndFlush(msg + "\n");
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
});
}
});
ChannelFuture f = b.bind(8080).sync();
System.out.println("IM 服务器已启动,端口 8080");
f.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
/**
* @ Description: 连接服务器、发送消息、接收其他用户的消息
**/
public class NettyIMClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // 创建一个客户端的引导类
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder(StandardCharsets.UTF_8));
p.addLast(new StringEncoder(StandardCharsets.UTF_8));
p.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("收到:" + msg);
}
});
}
});
Channel channel = b.connect("localhost", 8080).sync().channel();
System.out.println("连接到 IM 服务器,输入格式 username: message");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
channel.writeAndFlush(line + "\n");
}
} finally {
group.shutdownGracefully();
}
}
}<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.2.3.Final</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
</dependency>
