这是一篇为了参加活动写得文章,不知道为什么,写得时候网络巨差,我是顶着闹心发上来的,最后大家女神节快乐。
最近难得忙了一下,领导让我从Vert.x和lagom中技术选型,因为lagom是scala写得,我虽然会一点,但是不多,于是选择了vert.x,我记得之前用vert.x,就用的一知半解的,于是这一次我想的是,一定要研究明白,一下子我就想到了,在研究powerjob源码的时候,server启动的时候有这么一行代码:
VertXStarter.init();
于是我就从这一行代码作为入手点,开始研究,powerjob是怎么运用vert.x的,进入init方法内部,发现是这个样子的:
public static Vertx vertx;
@Getter
private static String address;
public static void init() {
Stopwatch stopwatch = Stopwatch.createStarted();
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));
String portFromJVM = System.getProperty(PowerJobServerConfigKey.HTTP_PORT);
if (StringUtils.isNotEmpty(portFromJVM)) {
port = Integer.parseInt(portFromJVM);
}
String localIP = NetUtils.getLocalHost();
address = localIP + ":" + port;
vertx = Vertx.vertx();
}
其实这里面的代码,除了之前讲过的获取地址等内容,就只有一行代码是有用的,就是:
vertx = Vertx.vertx();
这个就是Vert.x的初始化,然后从这里代码就直接断开了,因为已经初始化结束了,接下来就是找哪里用到了这个vertx了,通过编辑器寻找到,是在tech.powerjob.server.core.handler.impl包下的Initializer类里面。
VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
这一行代码的意思也很简单,就是部署一个Verticle,什么是Verticle?Verticle就是Vert.x最小的代码执行单元,类似于akka里面的Actor,所以就有疑问了,为什么不用actor呢?前面都用了,这里又要多用一个Verticle呢?这个我也不是很清楚,那就来看看这个Verticle吧,上面那一行代码指出,这个WorkerRequestHttpHandler,就是那个Verticle,源码如下所示:
public class WorkerRequestHttpHandler extends AbstractVerticle {
@Override
public void start() throws Exception {
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));
HttpServerOptions options = new HttpServerOptions();
HttpServer server = vertx.createHttpServer(options);
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT)
.handler(ctx -> {
WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat);
success(ctx);
});
router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
.blockingHandler(ctx -> {
TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
try {
fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
out(ctx, AskResponse.succeed(null));
} catch (Exception e) {
out(ctx, AskResponse.failed(ExceptionUtils.getMessage(e)));
}
});
router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT)
.blockingHandler(ctx -> {
WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
fetchWorkerRequestHandler().processWorkerLogReport(req);
success(ctx);
});
server.requestHandler(router).listen(port);
}
private static void out(RoutingContext ctx, Object msg) {
ctx.response()
.putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE, OmsConstant.JSON_MEDIA_TYPE)
.end(JsonObject.mapFrom(msg).encode());
}
private static void success(RoutingContext ctx) {
out(ctx, ResultDTO.success(null));
}
}
一句话总结一下上面代码,就是利用vert.x设置了一个HTTP的服务端,分别接受worker的心跳,任务实例的执行状态和日志,这个Router router = Router.router(vertx);里面的Router就类似于Spring里面的Controller,router.post就相当于设置了一个post方法,地址就是ProtocolConstant.SERVER_PATH_HEARTBEAT这个常量对应的字符串,所以疑问我就又来了,我记得在Actor里面也有接收心跳,状态和日志的功能,这又设置了一个,当然了,这个是用了和actor不同的方式,这个是用过http的方式得到的,估计可以连接openapi。
所以总结来说,vert.x在powerjob里面的应用,就类似于spring,有些大材小用的感觉,反正我是没有时间验证我的猜想是否正确,就拿源码来说,我认为Vert.x其实可以不用出现,因为既然选择了spring和akka,这俩框架完全拥有Vert.x在该框架的作用,感觉就像是在故意炫技一般。