本文共 32001 字,大约阅读时间需要 106 分钟。
Vert.x的线程模型设计的非常巧妙。总的来说,Vert.x中主要有两种线程:Event Loop线程 和 Worker线程。其中,Event Loop线程结合了Netty的EventLoop
,用于处理事件。每一个EventLoop
都与唯一的线程相绑定,这个线程就叫Event Loop线程。Event Loop线程不能被阻塞,否则事件将无法被处理。
Worker线程用于执行阻塞任务,这样既可以执行阻塞任务而又不阻塞Event Loop线程。
如果像Node.js一样只有单个Event Loop的话就不能充分利用多核CPU的性能了。为了充分利用多核CPU的性能,Vert.x中提供了一组Event Loop线程。每个Event Loop线程都可以处理事件。为了保证线程安全,防止资源争用,Vert.x保证了某一个Handler
总是被同一个Event Loop线程执行,这样不仅可以保证线程安全,而且还可以在底层对锁进行优化提升性能。所以,只要开发者遵循Vert.x的线程模型,开发者就不需要再担心线程安全的问题,这是非常方便的。
本篇文章将底层的角度来解析Vert.x的线程模型。对应的Vert.x版本为3.3.3。
首先回顾一下Event Loop线程,它会不断地轮询获取事件,并将获取到的事件分发到对应的事件处理器中进行处理:
Vert.x线程模型中最重要的一点就是:永远不要阻塞Event Loop线程。因为一旦处理事件的线程被阻塞了,事件就会一直积压着不能被处理,整个应用也就不能正常工作了。
Vert.x中内置一种用于检测Event Loop是否阻塞的线程:vertx-blocked-thread-checker
。一旦Event Loop处理某个事件的时间超过一定阈值(默认为2000ms)就会警告,如果阻塞的时间过长就会抛出异常。Block Checker的实现原理比较简单,底层借助了JUC的TimerTask
,定时计算每个Event Loop线程的处理事件消耗的时间,如果超时就进行相应的警告。
Vert.x中的Event Loop线程及Worker线程都用VertxThread
类表示,并通过VertxThreadFactory
线程工厂来创建。VertxThreadFactory
创建Vert.x线程的过程非常简单:
1 2 3 4 5 6 7 8 9 10 11 12 | @Override public Thread newThread(Runnable runnable) { VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime); if (checker != null) { checker.registerThread(t); } addToMap(t); t.setDaemon( false); return t; } |
除了创建VertxThread
线程之外,VertxThreadFactory
还会将此线程注册至Block Checker线程中以监视线程的阻塞情况,并且将此线程添加至内部的weakMap
中。这个weakMap
作用只有一个,就是在注销对应的Verticle的时候可以将每个VertxThread
中的Context
实例清除(unset)。为了保证资源不被一直占用,这里使用了WeakHashMap
来存储每一个VertxThread
。当里面的VertxThread
的引用不被其他实例持有的时候,它就会被标记为可清除的对象,等待GC。
至于VertxThread
,它其实就是在普通线程的基础上存储了额外的数据(如对应的Vert.x Context,最大执行时长,当前执行时间,是否为Worker线程等),这里就不多讲了。
Vert.x底层中一个重要的概念就是Context
,每个Context
都会绑定着一个Event Loop线程(而一个Event Loop线程可以对应多个Context
)。我们可以把Context
看作是控制一系列的Handler
的执行作用域及顺序的上下文对象。
每当Vert.x底层将事件分发至Handler
的时候,Vert.x都会给此Handler
钦点一个Context
用于处理任务:
VertxThread
),那么Vert.x就会复用此线程上绑定的Context
;如果没有对应的Context
就创建新的Context
Vert.x中存在三种Context
,与之前的线程种类相对应:
EventLoopContext
WorkerContext
MultiThreadedWorkerContext
每个Event Loop Context都会对应着唯一的一个EventLoop
,即一个Event Loop Context只会在同一个Event Loop线程上执行任务。在创建Context
的时候,Vert.x会自动根据轮询策略选择对应的EventLoop
:
1 2 3 4 5 6 7 8 9 10 11 | protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config, ClassLoader tccl) { // ... EventLoopGroup group = vertx.getEventLoopGroup(); if (group != null) { this.eventLoop = group.next(); } else { this.eventLoop = null; } // ... } |
在Netty中,EventLoopGroup
代表一组EventLoop
,而从中获取EventLoop
的方法则是next
方法。EventLoopGroup
中EventLoop
的数量由CPU内核数目所确定。Vert.x这里使用了Netty NIO对应的NioEventLoop
:
1 2 | eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory); eventLoopGroup.setIoRatio(NETTY_IO_RATIO); |
对应的轮询算法:
1 2 3 4 5 6 7 8 | @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } |
可以看到,正常情况下Netty会用轮询策略选择EventLoop
。特别地,如果EventLoop
的个数是2的倍数的话,选择的会快一些:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | private static final class GenericEventExecutorChooser implements EventExecutorChooser { // ... @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { // ... @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } |
我们可以在Embedded模式下测试一下Event Loop线程的分配:
1 2 3 4 5 6 7 | System.out.println(Thread.currentThread()); Vertx vertx = Vertx.vertx(); for ( int i = 0; i < 20; i++) { int index = i; vertx.setTimer( 1, t -> { System.out.println(index + ":" + Thread.currentThread()); }); |
运行结果(不同机器运行顺序、Event Loop线程数可能不同):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | Thread[main, 5,main] 0:Thread[vert.x-eventloop-thread- 0, 5,main] 1:Thread[vert.x-eventloop-thread- 1, 5,main] 2:Thread[vert.x-eventloop-thread- 2, 5,main] 3:Thread[vert.x-eventloop-thread- 3, 5,main] 5:Thread[vert.x-eventloop-thread- 5, 5,main] 6:Thread[vert.x-eventloop-thread- 6, 5,main] 8:Thread[vert.x-eventloop-thread- 8, 5,main] 7:Thread[vert.x-eventloop-thread- 7, 5,main] 10:Thread[vert.x-eventloop-thread- 10, 5,main] 9:Thread[vert.x-eventloop-thread- 9, 5,main] 4:Thread[vert.x-eventloop-thread- 4, 5,main] 11:Thread[vert.x-eventloop-thread- 11, 5,main] 12:Thread[vert.x-eventloop-thread- 12, 5,main] 13:Thread[vert.x-eventloop-thread- 13, 5,main] 14:Thread[vert.x-eventloop-thread- 14, 5,main] 16:Thread[vert.x-eventloop-thread- 0, 5,main] 17:Thread[vert.x-eventloop-thread- 1, 5,main] 15:Thread[vert.x-eventloop-thread- 15, 5,main] 18:Thread[vert.x-eventloop-thread- 2, 5,main] 19:Thread[vert.x-eventloop-thread- 3, 5,main] |
可以看到尽管每个Context
对应唯一的Event Loop线程,而每个Event Loop线程却可能对应多个Context
。
Event Loop Context会在对应的EventLoop
中执行Handler
进行事件的处理(IO事件,非阻塞)。Vert.x会保证同一个Handler
会一直在同一个Event Loop线程中执行,这样可以简化线程模型,让开发者在写Handler
的时候不需要考虑并发的问题,非常方便。
我们来粗略地看一下Handler
是如何在EventLoop
上执行的。EventLoopContext
中实现了executeAsync
方法用于包装Handler
中事件处理的逻辑并将其提交至对应的EventLoop
中进行执行:
1 2 3 4 | public void executeAsync(Handler |
这里Vert.x使用了wrapTask
方法将Handler
封装成了一个Runnable
用于向EventLoop
中提交。代码比较直观,大致就是检查当前线程是否为Vert.x线程,然后记录事件处理开始的时间,给当前的Vert.x线程设置Context
,并且调用Handler
里面的事件处理方法。具体请参考源码,这里就不贴出来了。
那么把封装好的task提交到EventLoop
以后,EventLoop
是怎么处理的呢?这就需要更多的Netty相关的知识了。根据Netty的模型,Event Loop线程需要处理IO事件,普通事件(即我们的Handler
)以及定时事件(比如Vert.x的setTimer
)。Vert.x会提供一个NETTY_IO_RATIO
给Netty代表EventLoop
处理IO事件时间占用的百分比(默认为50,即IO事件时间占用:非IO事件时间占用=1:1)。当EventLoop
启动的时候,它会不断轮询IO时间及其它事件并进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | @Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet( false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * ( 100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { // process the error // ... } } } |
这里面Netty会调用processSelectedKeys
方法进行IO事件的处理,并且会计算出处理IO时间所用的事件然后计算出给非IO事件处理分配的时间,然后调用runAllTasks
方法执行所有的非IO任务(这里面就有我们的各个Handler
)。
runAllTasks
会按顺序从内部的任务队列中取出任务(Runnable
)然后进行安全执行。而我们刚才调用的NioEventLoop
的execute
方法其实就是将包装好的Handler
置入NioEventLoop
内部的任务队列中等待执行。
顾名思义,Worker Context用于跑阻塞任务。与Event Loop Context相似,每一个Handler
都只会跑在固定的Worker线程下。
Vert.x还提供一种Multi-threaded worker context可以在多个Worker线程下并发执行任务,这样就会出现并发问题,需要开发者自行解决并发问题。因此一般情况下我们用不到Multi-threaded worker context。
我们再来讨论一下Verticle
中的Context
。在部署Verticle
的时候,Vert.x会根据配置来创建Context
并绑定到Verticle上,此后此Verticle上所有绑定的Handler
都会在此Context
上执行。相关实现位于doDeploy
方法,这里摘取核心部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | for (Verticle verticle: verticles) { WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null; WorkerPool pool = workerExec != null ? workerExec.getPool() : null; // 根据配置创建Context ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) : vertx.createEventLoopContext(deploymentID, pool, conf, tccl); if (workerExec != null) { context.addCloseHook(workerExec); } context.setDeployment(deployment); deployment.addVerticle( new VerticleHolder(verticle, context)); // 此Verticle上的Handler都会在创建的context作用域内执行 context.runOnContext(v -> { try { verticle.init(vertx, context); Future |
通过这样一种方式,Vert.x保证了Verticle
的线程安全 —— 即某个Verticle
上的所有Handler
都会在同一个Vert.x线程上执行,这样也保证了Verticle
内部成员的安全(没有race condition问题)。比如下面Verticle中处理IO及事件的处理都一直是在同一个Vert.x线程下执行的,每次打印出的线程名称应该是一样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public class TcpClientVerticle extends AbstractVerticle { int i = 0; @Override public void start() throws Exception { vertx.createNetClient().connect( 6666, "localhost", ar -> { if (ar.succeeded()) { NetSocket socket = ar.result(); System.out.println(Thread.currentThread().getName()); socket.handler(buffer -> { i++; System.out.println(Thread.currentThread().getName()); System.out.println( "Net client receiving: " + buffer.toString( "UTF-8")); }); socket.write( "+1s\n"); } else { ar.cause().printStackTrace(); } }); } } |
之前我们已经提到过,Event Loop线程池的类型为Netty中的NioEventLoopGroup
,里面的线程通过Vert.x自己的线程工厂VertxThreadFactory
进行创建:
1 2 3 | eventLoopThreadFactory = new VertxThreadFactory( "vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime()); eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory); eventLoopGroup.setIoRatio(NETTY_IO_RATIO); |
其中Event Loop线程的数目可以在配置中指定。
在之前讲executeBlocking
底层实现的文章中我们已经提到过Worker线程池,它其实就是一种Fixed Thread Pool:
1 2 3 4 5 | ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), new VertxThreadFactory( "vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime())); PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; workerPool = new WorkerPool(workerExec, workerPoolMetrics); |
Worker线程同样由VertxThreadFactory
构造,类型为VertxThread
,用于执行阻塞任务。我们同样可以在配置中指定其数目。
1 2 3 4 | ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), new VertxThreadFactory( "vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime())); PoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(internalBlockingExec, "worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics); |
Internal Blocking Pool可能设计用于内部使用,在executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler)
这个版本的方法中就使用了它。
大家可能会发现VertxImpl
类中还有一个acceptorEventLoopGroup
。顾名思义,它是Netty中的Acceptor线程池,负责处理客户端的连接请求:
1 2 | acceptorEventLoopGroup = new NioEventLoopGroup( 1, acceptorEventLoopThreadFactory); acceptorEventLoopGroup.setIoRatio( 100); |
由于系统只有一个服务端端口需要监听,因此这里只需要一个线程。
Vert.x中的HttpServer
就利用了acceptorEventLoopGroup
处理客户端的连接请求,具体的实现后边会另起一篇介绍。
本文标题:
文章作者:
发布时间:2016年09月25日
原始链接:
许可协议: 转载请保留原文链接及作者。