搞了几个月的某某项目,无尽的人力运维,排查问题,跟踪问题,非常痛苦,基于项目原有的问题,汇总,思考,所以设计了一个技术解决方案。
由于项目处于两个不同局域网,两个局域网之间,有着类似DMZ区域的网络,通讯需要开通相关网络策略及协议,限制比较多。
项目重构前,问题排查异常困难,发送消息后,业务往往不清楚是否处理了,于是整理了一下,比如以下场景:
- 业务希望请求可以有一定堆积,保证服务正常运行。
- 业务希望能够请求获得另一端网络应用实时或异步反馈。
- 为了方便问题排查,具有一定的消息查询能力。
选择kafka作为两个不同网络通讯工具好处:
- Kafka是一个分布式消息队列,具有高性能、持久化、多副本备份,可以起到削峰填谷的作用。高吞吐量、低延迟,解耦两个局域网通讯。
- 实践检验,它的延迟最低只有几毫秒。
一般网关所拥有的功能有, 高并发,请求鉴权,负载均衡,路由转发。
网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:
- Tomcat
- Jetty+NIO+Servlet3
Netty为高并发而生,spring5后推出Spring WebFlux(底层Netty)的,对比传统的springmvc性能高出很多,在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,网上已经有很多的测评,不再在过多说明。
在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。
生成分布式唯一ID,创建一个持久顺序节点,创建操作返回的节点序号,用于确保每个分发节点拥有独立的反馈队列。
将项目划分为以下几个模块
| 名称 | 描述 |
|---|---|
| igferry-common | 一些公共的代码,常量,异常类等。 |
| igferry-server | 测试应用功能模块 |
| igferry-gateway | 网关应用,消费mq投递消息,消息鉴权,负载均衡,路由转发等 |
| igferry-deliver | 把请求转换为mq消息,相当于分发服务 |
-
通过zookpeer,为每一个igferry-deliver应用节点生成一个分布式id,利用该id生成一个独立的反馈队列。
-
利用响应式编码特点,生成一个唯一id,并发送mq消息,缓存队列中存放该id及Mono对象,同时监听反馈队列消息,反馈监听到该消息后则置为success。
生成workid核心代码如下:
try { CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(10000, 4), 10000, 6000); curator.start(); Stat stat = curator.checkExists().forPath(PATH_FOREVER); if (stat == null) { //不存在根节点,机器第一次启动,创建/ferry/ip:port-000000000,并上传数据 zk_AddressNode = createNode(curator); //worker id 默认是0 updateLocalWorkerID(workerID); //定时上报本机时间给forever节点 ScheduledUploadData(curator, zk_AddressNode); return true; } else { Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001 Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001) //存在根节点,先检查是否有属于自己的根节点 List<String> keys = curator.getChildren().forPath(PATH_FOREVER); for (String key : keys) { String[] nodeKey = key.split("-"); realNode.put(nodeKey[0], key); nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1])); } Integer workerid = nodeMap.get(listenAddress); if (workerid != null) { //有自己的节点,zk_AddressNode=ip:port zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress); workerID = workerid;//启动worder时使用会使用 if (!checkInitTimeStamp(curator, zk_AddressNode)) { throw new IGFerryException("2001","init timestamp check error,forever node timestamp gt this node time"); } //准备创建临时节点 doService(curator); updateLocalWorkerID(workerID); log.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID); } else { //表示新启动的节点,创建持久节点 ,不用check时间 String newNode = createNode(curator); zk_AddressNode = newNode; String[] nodeKey = newNode.split("-"); workerID = Integer.parseInt(nodeKey[1]); doService(curator); updateLocalWorkerID(workerID); log.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID); } } } catch (Exception e) { log.error("Start node ERROR {}", e); try { Properties properties = new Properties(); properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + "")))); workerID = Integer.valueOf(properties.getProperty("workerID")); log.warn("START FAILED ,use local node file properties workerID-{}", workerID); } catch (Exception e1) { log.error("Read file error ", e1); return false; } }
发送并监听mq消息,并返回Mono对象。
final KafkaConnect connect = KafkaOperation.getKafkaConnect(kafkaConfig); final Function callback = new Function() { @Override public Object apply(final Object t) { connect.startListenerMQReceiveMsg(kafkaConfig, connect); connect.sendKafkaMsg(kafkaConfig, request, key); return null; } }; final Mono<Response> resultMono = KafkaOperation.invokeInternal(requestContext, key, kafkaConfig.getTimeout(), callback); return resultMono;
mq发送消息及反馈响应方法
public static Mono<Response> invokeInternal(final RequestContext context, final String key, final int timeout, final Function callback) { return Mono.defer(() -> { return getResultMsg(context, key, timeout, callback).flatMap(result -> Mono.just(context.getResponse())); }); } public static Mono<Boolean> getResultMsg(final RequestContext context, final String key, final int timeout, final Function callback) { return Mono.create(sink -> { MQCache.addMonoSink(key, sink); callback.apply(null); }).timeout(Duration.ofSeconds(timeout)).onErrorResume(ex -> { if (ex instanceof TimeoutException) { throw new IGFerryException("2001", "Kafka message timeout"); } else { throw new IGFerryException("2002", "Kafka message error"); } }).map(data -> { log.debug("Receive kafka key:[{}] callback msg:[{}]", key, data); Response response = JacksonUtil.fromJson((String) data, Response.class); context.setResponse(response); return true; }).doFinally(onFinally -> { MQCache.KEY_MONO_SINK.remove(key); }); }
接收到mq消息时处理
final String key = record.key(); try { final MonoSink<Object> monoSink = MQCache.getMonoSink(key); if (monoSink != null) { monoSink.success(value); } else { log.error("Key not found for kafka message, key: [{}], value: [{}]", key, value); } } catch (Exception e) { log.error("Handle kafka record error, key: [{}], value: [{}]", new Object[]{key, value, e}); }
-
统一监听消费队列,消息拉取后通过路由API转发到具体服务处理
-
实现mq消息负载均衡
-
将消息交给 chain去链式处理,实现消息鉴权和路由分发
监听mq消息统一处理
MASTER_POOL.submit(() -> {
try {
Request request = JacksonUtil.fromJson(value, Request.class);
SpringBeanUtils.getBean(KafkaMessageDynamicLoad.class).loadMQMsg(request).block();
} catch (Exception e) {
e.printStackTrace();
log.error("handle msg error topic: {}, value: {} ,errormsg: {}", new Object[]{consumerTopic, value, e.getMessage()});
}
});根据消息API获取服务名,把消息给 chain去链式处理
public Mono<Void> loadMQMsg(Request<?> request){
String appName = parseAppName(request);
MQInvokerChain mqInvokerChain = new MQInvokerChain(serverConfigProperties,gatewayKafkaConfig,grayLoadBalancer
,gatewayKafkaConnect,appName);
mqInvokerChain.addPlugin(new MsgAuthMQInvoker(serverConfigProperties,gatewayKafkaConfig,grayLoadBalancer,gatewayKafkaConnect));
mqInvokerChain.addPlugin(new MsgDynamicRouteMQInvoker(serverConfigProperties,gatewayKafkaConfig,grayLoadBalancer,gatewayKafkaConnect));
return mqInvokerChain.execute(request,mqInvokerChain);
}执行调用链实现类。
public class MQInvokerChain extends AbstractMQInvokerImpl {
/**
* 服务id
*/
private String appName;
/**
* 当前执行的链路插件
*/
private int pos;
/**
* 存放服务链路
*/
private List<MQInvoker> mqInvokers;
public MQInvokerChain(ServerConfigProperties serverConfigProperties, GatewayKafkaConfig gatewayKafkaConfig,
GrayLoadBalancer grayLoadBalancer,
GatewayKafkaConnect gatewayKafkaConnect,String appName) {
super(serverConfigProperties, gatewayKafkaConfig,grayLoadBalancer, gatewayKafkaConnect);
this.appName = appName;
}
/**
* 将启用的插件添加到链
*
* @param mqInvoker
*/
public void addPlugin(MQInvoker mqInvoker) {
if (mqInvokers == null) {
mqInvokers = new ArrayList<>();
}
mqInvokers.add(mqInvoker);
// 排序
mqInvokers.sort(Comparator.comparing(MQInvoker::order));
}
@Override
public Integer order() {
return 0;
}
@Override
public String name() {
return null;
}
/**
* 执行调用链
*/
@Override
public Mono<Void> execute(Request<?> request, MQInvokerChain mqInvokerChain) {
if (pos == mqInvokers.size()) {
return Mono.empty();
}
return mqInvokerChain.mqInvokers.get(pos++).execute(request, mqInvokerChain);
}
public String getAppName() {
return appName;
}
}动态路由转发及均衡负载实现类
public class MsgDynamicRouteMQInvoker extends AbstractMQInvokerImpl {
private static WebClient webClient;
public MsgDynamicRouteMQInvoker(ServerConfigProperties serverConfigProperties, GatewayKafkaConfig gatewayKafkaConfig,
GrayLoadBalancer grayLoadBalancer,
GatewayKafkaConnect gatewayKafkaConnect) {
super(serverConfigProperties, gatewayKafkaConfig, grayLoadBalancer, gatewayKafkaConnect);
}
static {
HttpClient httpClient = HttpClient.create()
.tcpConfiguration(client ->
client.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(180))
.addHandlerLast(new WriteTimeoutHandler(180)))
.option(ChannelOption.TCP_NODELAY, true)
);
webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
@Override
public Integer order() {
return MQInvokerEnum.DYNAMIC_FERRY_ROUTE.getOrder();
}
@Override
public String name() {
return MQInvokerEnum.DYNAMIC_FERRY_ROUTE.name();
}
@Override
public Mono<Void> execute(Request<?> request, MQInvokerChain pluginChain) {
log.info("mq消息路由转发");
String appName = pluginChain.getAppName();
ServiceInstance serviceInstance = chooseInstance(appName);
String url = buildUrl(serviceInstance, request.getApiUrl());
return forward(request, url);
}
private Mono<Void> forward(Request<?> request, String url) {
HttpMethod httpMethod = HttpMethod.valueOf(request.getHttpMethod());
WebClient.RequestBodySpec requestBodySpec = webClient.method(httpMethod).uri(url);
WebClient.RequestHeadersSpec<?> reqHeadersSpec;
if (requireHttpBody(httpMethod)) {
reqHeadersSpec = requestBodySpec.contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromValue(request.getBody()));
} else {
reqHeadersSpec = requestBodySpec;
}
// 消息回调
return getResultMsg(request, reqHeadersSpec).flatMap(response -> {
gatewayKafkaConnect.sendCoustomKafka(response.getTopic(), request.getKey(), JacksonUtil.toJson(response));
return Mono.empty();
});
}
// nio->callback->nio
private Mono<Response> getResultMsg(Request<?> request, WebClient.RequestHeadersSpec<?> reqHeadersSpec) {
Mono<Response> responseMono = reqHeadersSpec.exchangeToMono(clientResponse -> {
return clientResponse.bodyToMono(String.class).flatMap(responseBody -> {
Response response = JacksonUtil.fromJson(responseBody, Response.class);
response.setTopic(request.getResponseTopic());
return Mono.just(response);
});
});
if (request.isAsync()) {
return responseMono.timeout(Duration.ofMillis(serverConfigProperties.getTimeOutMillis()))
.onErrorResume(ex -> {
return Mono.defer(() -> {
Response response = new Response();
response.setTopic(request.getResponseTopic());
if (ex instanceof TimeoutException) {
response.setErrCode("5001");
response.setErrMsg("network timeout");
} else {
response.setErrCode("5000");
response.setErrMsg("system error");
}
return Mono.just(response);
}).then(Mono.just(new Response(request.getResponseTopic(), "5000", "system error")));
});
}
return responseMono;
}
private String buildUrl(ServiceInstance serviceInstance, String apiUrl) {
String path = apiUrl.replaceFirst("/" + serviceInstance.getServiceId(), "");
String url = "http://" + serviceInstance.getHost() + ":" + serviceInstance.getPort() + path;
return url;
}
/**
* 查看http请求是否需要参数body
* @param method
* @return
*/
private boolean requireHttpBody(HttpMethod method) {
if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) || method.equals(HttpMethod.PATCH)) {
return true;
}
return false;
}
/**
* 根据路由规则配置和负载均衡算法选择服务实例
*
* @param appName
* @return
*/
private ServiceInstance chooseInstance(String appName) {
return grayLoadBalancer.choose(appName);
}
}- 该服务为后端具体实现业务的服务类,实现为一个简单的controller接口。
从 Github 上下载源码方式
git clone https://github.com/alibaba/nacos.git
cd nacos/
mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U
ls -al distribution/target/
// change the $version to your actual path
cd distribution/target/nacos-server-$version/nacos/bin
//启动nacos
startup.cmd -m standalone下载zookpeer压缩包 zookpeer ,解压放到D盘,复制D:\apache-zookeeper-3.6.2-bin\conf\zoo_sample.cfg并修改为zoo.cfg,cmd窗口下启动 D:\apache-zookeeper-3.6.2-bin\bin\zkServer.cmd
下载kafka压缩包 kafka
,解压放到D盘,cmd窗口下启动
D:\kafka_2.12-2.6.0\bin\windows\kafka-server-start.bat D:\kafka_2.12-2.6.0\config\server.properties
启动kafka后为topic配置6个分区
D:\kafka_2.12-2.6.0\bin\windows\kafka-topics.bat --alter --zookeeper localhost:2181 --partitions 6 --topic igferry-gateway-deliver
实例1配置: 在启动参数VM options 添加 -Dserver.port=9999
实例2配置:在启动参数VM options 添加 -Dserver.port=9998
实例1配置: 在启动参数VM options 添加 -Dserver.port=4001
实例2配置:在启动参数VM options 添加 -Dserver.port=4002
压测环境:
联想小新15 2021AMD版
处理器 1.8 GHz 八核AMD Ryzen7 4800U
内存 16 GB
网关和后端应用两个,分发服务一个
压测工具:JMeter
压测结果:1000个线程,循环10次,吞吐量大概每秒1657个请求。
构思了很久一段时间并没有赋予实际。开始的确感觉非常困难,但当实际开始行动时就会发现其实没那么难,所以迈出第一步很重要。
过程中遇到很多的困难和问题,也参考了一些优秀的文章解决问题。要归纳整理自己的知识库,
https://cnblogs.com/2YSP/p/14223892.html
https://docs.spring.io/spring-framework/docs/5.3.x/reference/html/web-reactive.html#webflux
https://blog.csdn.net/bskfnvjtlyzmv867/article/details/90247036




