EventLoop事情循环器

  1. 单线程执行器
  2. 处理 Channel 上源源不断的 io 事件:Channel 上通过selector去监听accept(建立连接)、read(读)、write(可写)等事件,通过EventLoop去处理这些事件

我们上面使用的就是EventLoopGroup,包含很多个EventLoop,我们每创建一个连接,就需要绑定到一个EventLoop上,之后EventLoop就会开始监听这个连接(只要连接不关闭,一直都是这个EventLoop负责此Channel),而一个EventLoop可以同时监听很多个Channel,实际上就是我们之前学习的Selector罢了。

当然,EventLoop并不只是用于网络操作的,我们前面所说的EventLoop其实都是NioEventLoop,它是专用于网络通信的,除了网络通信之外,我们也可以使用普通的EventLoop来处理一些其他的事件。

这样会卡住:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup) //指定事件循环组
.channel(NioServerSocketChannel.class) //指定为NIO的ServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() { //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
Thread.sleep(10000); //这里我们直接卡10秒假装在处理任务
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
bootstrap.bind(8080);
}

可以创建一个普通的EventLoop来处理专门读写之外的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
handlerGroup.submit(() -> {
//由于继承自ScheduledExecutorService,我们直接提交任务就行了,是不是感觉贼方便
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
});
}
});
}
});
bootstrap.bind(8080);
}

也可以创建一条流水线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
}).addLast(handlerGroup, new ChannelInboundHandlerAdapter(){ //在添加时,可以直接指定使用哪个EventLoopGroup
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
bootstrap.bind(8080);
}

Future和Promise

我们接着来看ChannelFuture,前面我们提到,Netty中Channel的相关操作都是异步进行的,并不是在当前线程同步执行,我们不能立即得到执行结果,如果需要得到结果,那么我们就必须要利用到Future。

我们先来看看ChannelFutuer接口怎么定义的:

1
2
3
4
5
6
7
8
9
10
11
12
public interface ChannelFuture extends Future<Void> {
Channel channel(); //我们可以直接获取此任务的Channel
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); //当任务完成时,会直接执行GenericFutureListener的任务,注意执行的位置也是在EventLoop中
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture sync() throws InterruptedException; //在当前线程同步等待异步任务完成,任务失败会抛出异常
ChannelFuture syncUninterruptibly(); //同上,但是无法响应中断
ChannelFuture await() throws InterruptedException; //同上,但是任务中断不会抛出异常,需要手动判断
ChannelFuture awaitUninterruptibly();
boolean isVoid(); //返回类型是否为void
}
1
2
3
4
5
// 以下是两种解决办法
future.sync();
ChannelFuture future = bootstrap.bind(8080);
//直接添加监听器,当任务完成时自动执行,但是注意执行也是异步的,不是在当前线程
future.addListener(f -> System.out.println("我是服务端启动完成之后要做的事情!"));

Promise接口,它支持手动设定成功和失败的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//此接口也是继承自Netty中的Future接口
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V var1); //手动设定成功
boolean trySuccess(V var1);
Promise<V> setFailure(Throwable var1); //手动设定失败
boolean tryFailure(Throwable var1);
boolean setUncancellable();
//这些就和之前的Future是一样的了
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> await() throws InterruptedException;
Promise<V> awaitUninterruptibly();
Promise<V> sync() throws InterruptedException;
Promise<V> syncUninterruptibly();
}