博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
vert.x线程安全的线程模型详解
阅读量:4039 次
发布时间:2019-05-24

本文共 32001 字,大约阅读时间需要 106 分钟。

线程模型概述

Vert.x的线程模型设计的非常巧妙。总的来说,Vert.x中主要有两种线程:Event Loop线程 和 Worker线程。其中,Event Loop线程结合了Netty的EventLoop,用于处理事件。每一个EventLoop都与唯一的线程相绑定,这个线程就叫Event Loop线程。Event Loop线程不能被阻塞,否则事件将无法被处理。

Worker线程用于执行阻塞任务,这样既可以执行阻塞任务而又不阻塞Event Loop线程。

如果像Node.js一样只有单个Event Loop的话就不能充分利用多核CPU的性能了。为了充分利用多核CPU的性能,Vert.x中提供了一组Event Loop线程。每个Event Loop线程都可以处理事件。为了保证线程安全,防止资源争用,Vert.x保证了某一个Handler总是被同一个Event Loop线程执行,这样不仅可以保证线程安全,而且还可以在底层对锁进行优化提升性能。所以,只要开发者遵循Vert.x的线程模型,开发者就不需要再担心线程安全的问题,这是非常方便的。

本篇文章将底层的角度来解析Vert.x的线程模型。对应的Vert.x版本为3.3.3

Event Loop线程

首先回顾一下Event Loop线程,它会不断地轮询获取事件,并将获取到的事件分发到对应的事件处理器中进行处理:

Vert.x线程模型中最重要的一点就是:永远不要阻塞Event Loop线程。因为一旦处理事件的线程被阻塞了,事件就会一直积压着不能被处理,整个应用也就不能正常工作了。

Vert.x中内置一种用于检测Event Loop是否阻塞的线程:vertx-blocked-thread-checker。一旦Event Loop处理某个事件的时间超过一定阈值(默认为2000ms)就会警告,如果阻塞的时间过长就会抛出异常。Block Checker的实现原理比较简单,底层借助了JUC的TimerTask,定时计算每个Event Loop线程的处理事件消耗的时间,如果超时就进行相应的警告。

Vert.x Thread

Vert.x中的Event Loop线程及Worker线程都用VertxThread类表示,并通过VertxThreadFactory线程工厂来创建。VertxThreadFactory创建Vert.x线程的过程非常简单:

1        
2
3
4
5
6
7
8
9
10
11
12
@Override        
public Thread newThread(Runnable runnable) {
VertxThread t =
new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime);
if (checker !=
null) {
checker.registerThread(t);
}
addToMap(t);
t.setDaemon(
false);
return t;
}

除了创建VertxThread线程之外,VertxThreadFactory还会将此线程注册至Block Checker线程中以监视线程的阻塞情况,并且将此线程添加至内部的weakMap中。这个weakMap作用只有一个,就是在注销对应的Verticle的时候可以将每个VertxThread中的Context实例清除(unset)。为了保证资源不被一直占用,这里使用了WeakHashMap来存储每一个VertxThread。当里面的VertxThread的引用不被其他实例持有的时候,它就会被标记为可清除的对象,等待GC。

至于VertxThread,它其实就是在普通线程的基础上存储了额外的数据(如对应的Vert.x Context,最大执行时长,当前执行时间,是否为Worker线程等),这里就不多讲了。

Vert.x Context

Vert.x底层中一个重要的概念就是Context,每个Context都会绑定着一个Event Loop线程(而一个Event Loop线程可以对应多个Context)。我们可以把Context看作是控制一系列的Handler的执行作用域及顺序的上下文对象。

每当Vert.x底层将事件分发至Handler的时候,Vert.x都会给此Handler钦点一个Context用于处理任务:

  • 如果当前线程是Vert.x线程(VertxThread),那么Vert.x就会复用此线程上绑定的Context;如果没有对应的Context就创建新的
  • 如果当前线程是普通线程,就创建新的Context

Vert.x中存在三种Context,与之前的线程种类相对应:

  • EventLoopContext
  • WorkerContext
  • MultiThreadedWorkerContext

Event loop context

每个Event Loop Context都会对应着唯一的一个EventLoop,即一个Event Loop Context只会在同一个Event Loop线程上执行任务。在创建Context的时候,Vert.x会自动根据轮询策略选择对应的EventLoop:

1        
2
3
4
5
6
7
8
9
10
11
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,        
ClassLoader tccl) {
// ...
EventLoopGroup group = vertx.getEventLoopGroup();
if (group !=
null) {
this.eventLoop = group.next();
}
else {
this.eventLoop =
null;
}
// ...
}

在Netty中,EventLoopGroup代表一组EventLoop,而从中获取EventLoop的方法则是next方法。EventLoopGroupEventLoop的数量由CPU内核数目所确定。Vert.x这里使用了Netty NIO对应的NioEventLoop

1        
2
eventLoopGroup =          
new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

对应的轮询算法:

1        
2
3
4
5
6
7
8
@Override        
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return
new PowerOfTowEventExecutorChooser(executors);
}
else {
return
new GenericEventExecutorChooser(executors);
}
}

可以看到,正常情况下Netty会用轮询策略选择EventLoop。特别地,如果EventLoop的个数是2的倍数的话,选择的会快一些:

1        
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private          
static
final
class GenericEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
private
static
final
class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length -
1];
}
}

我们可以在Embedded模式下测试一下Event Loop线程的分配:

1        
2
3
4
5
6
7
System.out.println(Thread.currentThread());        
Vertx vertx = Vertx.vertx();
for (
int i =
0; i <
20; i++) {
int index = i;
vertx.setTimer(
1, t -> {
System.out.println(index +
":" + Thread.currentThread());
});

运行结果(不同机器运行顺序、Event Loop线程数可能不同):

1        
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Thread[main,         
5,main]
0:Thread[vert.x-eventloop-thread-
0,
5,main]
1:Thread[vert.x-eventloop-thread-
1,
5,main]
2:Thread[vert.x-eventloop-thread-
2,
5,main]
3:Thread[vert.x-eventloop-thread-
3,
5,main]
5:Thread[vert.x-eventloop-thread-
5,
5,main]
6:Thread[vert.x-eventloop-thread-
6,
5,main]
8:Thread[vert.x-eventloop-thread-
8,
5,main]
7:Thread[vert.x-eventloop-thread-
7,
5,main]
10:Thread[vert.x-eventloop-thread-
10,
5,main]
9:Thread[vert.x-eventloop-thread-
9,
5,main]
4:Thread[vert.x-eventloop-thread-
4,
5,main]
11:Thread[vert.x-eventloop-thread-
11,
5,main]
12:Thread[vert.x-eventloop-thread-
12,
5,main]
13:Thread[vert.x-eventloop-thread-
13,
5,main]
14:Thread[vert.x-eventloop-thread-
14,
5,main]
16:Thread[vert.x-eventloop-thread-
0,
5,main]
17:Thread[vert.x-eventloop-thread-
1,
5,main]
15:Thread[vert.x-eventloop-thread-
15,
5,main]
18:Thread[vert.x-eventloop-thread-
2,
5,main]
19:Thread[vert.x-eventloop-thread-
3,
5,main]

可以看到尽管每个Context对应唯一的Event Loop线程,而每个Event Loop线程却可能对应多个Context

Event Loop Context会在对应的EventLoop中执行Handler进行事件的处理(IO事件,非阻塞)。Vert.x会保证同一个Handler会一直在同一个Event Loop线程中执行,这样可以简化线程模型,让开发者在写Handler的时候不需要考虑并发的问题,非常方便。

我们来粗略地看一下Handler是如何在EventLoop上执行的。EventLoopContext中实现了executeAsync方法用于包装Handler中事件处理的逻辑并将其提交至对应的EventLoop中进行执行:

1        
2
3
4
public void executeAsync(Handler
task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(
null, task,
true,
null));
}

这里Vert.x使用了wrapTask方法将Handler封装成了一个Runnable用于向EventLoop中提交。代码比较直观,大致就是检查当前线程是否为Vert.x线程,然后记录事件处理开始的时间,给当前的Vert.x线程设置Context,并且调用Handler里面的事件处理方法。具体请参考源码,这里就不贴出来了。

那么把封装好的task提交到EventLoop以后,EventLoop是怎么处理的呢?这就需要更多的Netty相关的知识了。根据Netty的模型,Event Loop线程需要处理IO事件,普通事件(即我们的Handler)以及定时事件(比如Vert.x的setTimer)。Vert.x会提供一个NETTY_IO_RATIO给Netty代表EventLoop处理IO事件时间占用的百分比(默认为50,即IO事件时间占用:非IO事件时间占用=1:1)。当EventLoop启动的时候,它会不断轮询IO时间及其它事件并进行处理:

1        
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override        
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(
false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys =
0;
needsToSelectAgain =
false;
final
int ioRatio =
this.ioRatio;
if (ioRatio ==
100) {
processSelectedKeys();
runAllTasks();
}
else {
final
long ioStartTime = System.nanoTime();
processSelectedKeys();
final
long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (
100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
}
catch (Throwable t) {
// process the error
// ...
}
}
}

这里面Netty会调用processSelectedKeys方法进行IO事件的处理,并且会计算出处理IO时间所用的事件然后计算出给非IO事件处理分配的时间,然后调用runAllTasks方法执行所有的非IO任务(这里面就有我们的各个Handler)。

runAllTasks会按顺序从内部的任务队列中取出任务(Runnable)然后进行安全执行。而我们刚才调用的NioEventLoopexecute方法其实就是将包装好的Handler置入NioEventLoop内部的任务队列中等待执行。

Worker context

顾名思义,Worker Context用于跑阻塞任务。与Event Loop Context相似,每一个Handler都只会跑在固定的Worker线程下。

Vert.x还提供一种Multi-threaded worker context可以在多个Worker线程下并发执行任务,这样就会出现并发问题,需要开发者自行解决并发问题。因此一般情况下我们用不到Multi-threaded worker context。

Verticle

我们再来讨论一下Verticle中的Context。在部署Verticle的时候,Vert.x会根据配置来创建Context并绑定到Verticle上,此后此Verticle上所有绑定的Handler都会在此Context上执行。相关实现位于doDeploy方法,这里摘取核心部分:

1        
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
for (Verticle verticle: verticles) {        
WorkerExecutorImpl workerExec = poolName !=
null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) :
null;
WorkerPool pool = workerExec !=
null ? workerExec.getPool() :
null;
// 根据配置创建Context
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
if (workerExec !=
null) {
context.addCloseHook(workerExec);
}
context.setDeployment(deployment);
deployment.addVerticle(
new VerticleHolder(verticle, context));
// 此Verticle上的Handler都会在创建的context作用域内执行
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future
startFuture = Future.future();
// 大家熟悉的start方法的执行点
verticle.start(startFuture);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent !=
null) {
parent.addChild(deployment);
deployment.child =
true;
}
vertx.metricsSPI().verticleDeployed(verticle);
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
}
else
if (!failureReported.get()) {
reportFailure(ar.cause(), callingContext, completionHandler);
}
});
}
catch (Throwable t) {
reportFailure(t, callingContext, completionHandler);
}
});
}

通过这样一种方式,Vert.x保证了Verticle的线程安全 —— 即某个Verticle上的所有Handler都会在同一个Vert.x线程上执行,这样也保证了Verticle内部成员的安全(没有race condition问题)。比如下面Verticle中处理IO及事件的处理都一直是在同一个Vert.x线程下执行的,每次打印出的线程名称应该是一样的:

1        
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public          
class TcpClientVerticle extends AbstractVerticle {
int i =
0;
@Override
public void start() throws Exception {
vertx.createNetClient().connect(
6666,
"localhost", ar -> {
if (ar.succeeded()) {
NetSocket socket = ar.result();
System.out.println(Thread.currentThread().getName());
socket.handler(buffer -> {
i++;
System.out.println(Thread.currentThread().getName());
System.out.println(
"Net client receiving: " + buffer.toString(
"UTF-8"));
});
socket.write(
"+1s\n");
}
else {
ar.cause().printStackTrace();
}
});
}
}

线程池

Event Loop线程池

之前我们已经提到过,Event Loop线程池的类型为Netty中的NioEventLoopGroup,里面的线程通过Vert.x自己的线程工厂VertxThreadFactory进行创建:

1        
2
3
eventLoopThreadFactory =          
new VertxThreadFactory(
"vert.x-eventloop-thread-", checker,
false, options.getMaxEventLoopExecuteTime());
eventLoopGroup =
new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

其中Event Loop线程的数目可以在配置中指定。

Worker线程池

在之前讲executeBlocking底层实现的文章中我们已经提到过Worker线程池,它其实就是一种Fixed Thread Pool:

1        
2
3
4
5
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),        
new VertxThreadFactory(
"vert.x-worker-thread-", checker,
true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec,
"worker",
"vert.x-worker-thread", options.getWorkerPoolSize()) :
null;
workerPool =
new WorkerPool(workerExec, workerPoolMetrics);

Worker线程同样由VertxThreadFactory构造,类型为VertxThread,用于执行阻塞任务。我们同样可以在配置中指定其数目。

内部阻塞线程池

1        
2
3
4
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),        
new VertxThreadFactory(
"vert.x-internal-blocking-", checker,
true, options.getMaxWorkerExecuteTime()));
PoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(internalBlockingExec,
"worker",
"vert.x-internal-blocking", options.getInternalBlockingPoolSize()) :
null;
internalBlockingPool =
new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);

Internal Blocking Pool可能设计用于内部使用,在executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler)这个版本的方法中就使用了它。

Acceptor Event Loop线程池

大家可能会发现VertxImpl类中还有一个acceptorEventLoopGroup。顾名思义,它是Netty中的Acceptor线程池,负责处理客户端的连接请求:

1        
2
acceptorEventLoopGroup =          
new NioEventLoopGroup(
1, acceptorEventLoopThreadFactory);
acceptorEventLoopGroup.setIoRatio(
100);

由于系统只有一个服务端端口需要监听,因此这里只需要一个线程。

Vert.x中的HttpServer就利用了acceptorEventLoopGroup处理客户端的连接请求,具体的实现后边会另起一篇介绍。

你可能感兴趣的文章
vsftp 配置具有匿名登录也有系统用户登录,系统用户有管理权限,匿名只有下载权限。
查看>>
linux安装usb wifi接收器
查看>>
补充自动屏蔽攻击ip
查看>>
谷歌走了
查看>>
多线程使用随机函数需要注意的一点
查看>>
getpeername,getsockname
查看>>
让我做你的下一行Code
查看>>
浅析:setsockopt()改善程序的健壮性
查看>>
关于对象赋值及返回临时对象过程中的构造与析构
查看>>
VS 2005 CRT函数的安全性增强版本
查看>>
SQL 多表联合查询
查看>>
Visual Studio 2010:C++0x新特性
查看>>
drwtsn32.exe和adplus.vbs进行dump文件抓取
查看>>
cppcheck c++静态代码检查
查看>>
在C++中使用Lua
查看>>
C++中使用Mongo执行count和distinct运算
查看>>
一些socket的编程经验
查看>>
socket编程中select的使用
查看>>
关于AIS编码解码的两个小问题
查看>>
GitHub 万星推荐:黑客成长技术清单
查看>>