Skip to content

Commit

Permalink
grpc invoke source refactor (#12716)
Browse files Browse the repository at this point in the history
* grpc invoke source control

* grpc invoke source control

* metrics controller param check enhancement

* check style and pmd
  • Loading branch information
shiyiyue1102 authored Oct 11, 2024
1 parent ec34377 commit d712ee5
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.nacos.api.config.remote.request.ClientConfigMetricRequest;
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.auth.annotation.Secured;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.http.HttpClientBeanHolder;
Expand All @@ -30,6 +31,7 @@
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.paramcheck.ConfigDefaultHttpParamExtractor;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.ParamUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.paramcheck.ExtractorManager;
Expand Down Expand Up @@ -86,7 +88,11 @@ public ClientMetricsController(ServerMemberManager serverMemberManager, Connecti
public ResponseEntity metric(@RequestParam("ip") String ip,
@RequestParam(value = "dataId", required = false) String dataId,
@RequestParam(value = "group", required = false) String group,
@RequestParam(value = "tenant", required = false) String tenant) {
@RequestParam(value = "tenant", required = false) String tenant) throws NacosException {

ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "default", "default");

Loggers.CORE.info("Get cluster config metrics received, ip={},dataId={},group={},tenant={}", ip, dataId, group,
tenant);
Map<String, Object> responseMap = new HashMap<>(3);
Expand Down Expand Up @@ -168,7 +174,11 @@ public void onCancel() {
public Map<String, Object> getClientMetrics(@RequestParam("ip") String ip,
@RequestParam(value = "dataId", required = false) String dataId,
@RequestParam(value = "group", required = false) String group,
@RequestParam(value = "tenant", required = false) String tenant) {
@RequestParam(value = "tenant", required = false) String tenant) throws NacosException {

ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "default", "default");

Map<String, Object> metrics = new HashMap<>(16);
List<Connection> connectionsByIp = connectionManager.getConnectionByIp(ip);
for (Connection connectionByIp : connectionsByIp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.config.server.service.dump.DumpRequest;
import com.alibaba.nacos.config.server.service.dump.DumpService;
Expand All @@ -27,6 +28,7 @@
import com.alibaba.nacos.core.paramcheck.impl.ConfigRequestParamExtractor;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.control.TpsControl;
import com.alibaba.nacos.core.remote.grpc.InvokeSource;
import org.springframework.stereotype.Component;

/**
Expand All @@ -36,6 +38,7 @@
* @version $Id: ConfigChangeClusterSyncRequestHandler.java, v 0.1 2020年08月11日 4:35 PM liuzunfei Exp $
*/
@Component
@InvokeSource(source = {RemoteConstants.LABEL_SOURCE_CLUSTER})
public class ConfigChangeClusterSyncRequestHandler
extends RequestHandler<ConfigChangeClusterSyncRequest, ConfigChangeClusterSyncResponse> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.alibaba.nacos.core.cluster.remote;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.core.cluster.Member;
Expand All @@ -27,6 +28,7 @@
import com.alibaba.nacos.core.cluster.remote.request.MemberReportRequest;
import com.alibaba.nacos.core.cluster.remote.response.MemberReportResponse;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.remote.grpc.InvokeSource;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.stereotype.Component;

Expand All @@ -36,6 +38,7 @@
* @author : huangtianhui
*/
@Component
@InvokeSource(source = {RemoteConstants.LABEL_SOURCE_CLUSTER})
public class MemberReportHandler extends RequestHandler<MemberReportRequest, MemberReportResponse> {

private final ServerMemberManager memberManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.core.control.TpsControl;
import com.alibaba.nacos.core.control.TpsControlConfig;
import com.alibaba.nacos.core.remote.grpc.InvokeSource;
import com.alibaba.nacos.plugin.control.ControlManagerCenter;
import com.google.common.collect.Sets;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
Expand All @@ -30,6 +32,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* RequestHandlerRegistry.
Expand All @@ -43,6 +46,8 @@ public class RequestHandlerRegistry implements ApplicationListener<ContextRefres

Map<String, RequestHandler> registryHandlers = new HashMap<>();

Map<String, Set<String>> sourceRegistry = new HashMap<>();

/**
* Get Request Handler By request Type.
*
Expand All @@ -53,6 +58,20 @@ public RequestHandler getByRequestType(String requestType) {
return registryHandlers.get(requestType);
}

/**
* check source invoke allowed.
*
* @param type type.
* @param source source.
* @return
*/
public boolean checkSourceInvokeAllowed(String type, String source) {
if (sourceRegistry.containsKey(type) && !sourceRegistry.get(type).contains(source)) {
return false;
}
return true;
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
Expand All @@ -71,7 +90,7 @@ public void onApplicationEvent(ContextRefreshedEvent event) {
if (skip) {
continue;
}

//register tps control.
try {
Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
Expand All @@ -82,7 +101,22 @@ public void onApplicationEvent(ContextRefreshedEvent event) {
} catch (Exception e) {
//ignore.
}

Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];

//register invoke source.
try {
if (clazz.isAnnotationPresent(InvokeSource.class)) {
InvokeSource tpsControl = clazz.getAnnotation(InvokeSource.class);
String[] sources = tpsControl.source();
if (sources != null && sources.length > 0) {
sourceRegistry.put(tClass.getSimpleName(), Sets.newHashSet(sources));
}
}
} catch (Exception e) {
//ignore.
}

registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.nacos.api.remote.response.ServerLoaderInfoResponse;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.remote.grpc.InvokeSource;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -37,6 +38,7 @@
* @version $Id: ServerLoaderInfoRequestHandler.java, v 0.1 2020年09月03日 2:51 PM liuzunfei Exp $
*/
@Component
@InvokeSource(source = {RemoteConstants.LABEL_SOURCE_CLUSTER})
public class ServerLoaderInfoRequestHandler extends RequestHandler<ServerLoaderInfoRequest, ServerLoaderInfoResponse> {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.nacos.api.remote.response.ServerReloadResponse;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.remote.grpc.InvokeSource;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.core.utils.RemoteUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -37,6 +38,7 @@
* @author liuzunfei
* @version $Id: ServerReloaderRequestHandler.java, v 0.1 2020年11月09日 4:38 PM liuzunfei Exp $
*/
@InvokeSource(source = {RemoteConstants.LABEL_SOURCE_CLUSTER})
@Component
public class ServerReloaderRequestHandler extends RequestHandler<ServerReloadRequest, ServerReloadResponse> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package com.alibaba.nacos.core.remote.grpc;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.core.monitor.MetricsMonitor;
import com.alibaba.nacos.core.remote.BaseRpcServer;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.remote.grpc.negotiator.NacosGrpcProtocolNegotiator;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
Expand All @@ -36,6 +41,7 @@
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import org.springframework.beans.factory.annotation.Autowired;

Expand Down Expand Up @@ -70,6 +76,9 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
@Autowired
private ConnectionManager connectionManager;

@Autowired
private RequestHandlerRegistry requestHandlerRegistry;

@Override
public ConnectionType getConnectionType() {
return ConnectionType.GRPC;
Expand Down Expand Up @@ -163,6 +172,32 @@ protected List<ServerTransportFilter> getServerTransportFilters() {
return Collections.singletonList(new AddressTransportFilter(connectionManager));
}

/**
* get source for the request.
*
* @return
*/
protected abstract String getSource();

private boolean invokeSourceAllowCheck(Payload grpcRequest) {
return requestHandlerRegistry.checkSourceInvokeAllowed(grpcRequest.getMetadata().getType(), getSource());
}

protected void handleCommonRequest(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
if (!invokeSourceAllowCheck(grpcRequest)) {
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY,
String.format(" invoke %s from %s is forbidden", grpcRequest.getMetadata().getType(),
this.getSource())));
responseObserver.onNext(payloadResponse);

responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(grpcRequest.getMetadata().getType(), false,
NacosException.BAD_GATEWAY, null, null, 0);
} else {
grpcCommonRequestAcceptor.request(grpcRequest, responseObserver);
}
}

private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {

// unary common call register.
Expand All @@ -174,7 +209,9 @@ private void addServices(MutableHandlerRegistry handlerRegistry, ServerIntercept
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall(
(request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
(request, responseObserver) -> {
handleCommonRequest(request, responseObserver);
});

final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
GrpcServerConstants.REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.nacos.core.remote.grpc;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilter;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilterServiceLoader;
import com.alibaba.nacos.core.remote.grpc.interceptor.NacosGrpcServerInterceptor;
Expand Down Expand Up @@ -126,4 +127,9 @@ protected List<ServerTransportFilter> getServerTransportFilters() {
NacosGrpcServerTransportFilter.CLUSTER_FILTER));
return result;
}

@Override
protected String getSource() {
return RemoteConstants.LABEL_SOURCE_CLUSTER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.nacos.core.remote.grpc;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilter;
import com.alibaba.nacos.core.remote.grpc.filter.NacosGrpcServerTransportFilterServiceLoader;
import com.alibaba.nacos.core.remote.grpc.interceptor.NacosGrpcServerInterceptor;
Expand Down Expand Up @@ -125,4 +126,8 @@ protected List<ServerTransportFilter> getServerTransportFilters() {
return result;
}

@Override
protected String getSource() {
return RemoteConstants.LABEL_SOURCE_SDK;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.core.remote.grpc;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;

/**
* annotation for invoke source.
*/
@Retention(RetentionPolicy.RUNTIME)
public @interface InvokeSource {


/**
* allowed sources.
* @return
*/
String[] source();
}
Loading

0 comments on commit d712ee5

Please sign in to comment.