提交 7de57184 编写于 作者: P peng-yongsheng

1. Add the column named srcSpanType into service_name table.

2. Add column named service_name_keyword into service_name elastic search table by the type of keyword.
上级 9cd0f98b
......@@ -53,7 +53,8 @@ public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServ
for (ServiceNameElement serviceNameElement : serviceNameElementList) {
int applicationId = serviceNameElement.getApplicationId();
String serviceName = serviceNameElement.getServiceName();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
int srcSpanType = serviceNameElement.getSrcSpanTypeValue();
int serviceId = serviceNameService.getOrCreate(applicationId, srcSpanType, serviceName);
if (serviceId != 0) {
ServiceNameMappingElement.Builder mappingElement = ServiceNameMappingElement.newBuilder();
......
......@@ -34,6 +34,7 @@ import org.apache.skywalking.apm.network.proto.ServiceNameCollection;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameElement;
import org.apache.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.joda.time.DateTime;
import org.slf4j.Logger;
......@@ -95,6 +96,7 @@ class RegisterMock {
ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
serviceNameElement.setSrcSpanType(SpanType.Exit);
serviceNameCollection.addElements(serviceNameElement);
registerServiceName(serviceNameCollection);
......@@ -139,6 +141,7 @@ class RegisterMock {
ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
serviceNameElement.setSrcSpanType(SpanType.Entry);
serviceNameCollection.addElements(serviceNameElement);
registerServiceName(serviceNameCollection);
......
......@@ -44,6 +44,7 @@ public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private static final String APPLICATION_ID = "ai";
private static final String SERVICE_NAME = "sn";
private static final String SRC_SPAN_TYPE = "st";
private static final String SERVICE_ID = "si";
private static final String ELEMENT = "el";
......@@ -66,8 +67,9 @@ public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
for (JsonElement service : services) {
int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt();
String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString();
int srcSpanType = service.getAsJsonObject().get(SRC_SPAN_TYPE).getAsInt();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
int serviceId = serviceNameService.getOrCreate(applicationId, srcSpanType, serviceName);
if (serviceId != 0) {
JsonObject responseJson = new JsonObject();
responseJson.addProperty(SERVICE_ID, serviceId);
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.register.define.service;
import org.apache.skywalking.apm.collector.core.module.Service;
......@@ -25,5 +24,5 @@ import org.apache.skywalking.apm.collector.core.module.Service;
* @author peng-yongsheng
*/
public interface IServiceNameService extends Service {
int getOrCreate(int applicationId, String serviceName);
int getOrCreate(int applicationId, int srcSpanType, String serviceName);
}
......@@ -42,7 +42,7 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
private final IServiceNameRegisterDAO serviceNameRegisterDAO;
private final ServiceIdCacheService serviceIdCacheService;
public ServiceNameRegisterSerialWorker(ModuleManager moduleManager) {
ServiceNameRegisterSerialWorker(ModuleManager moduleManager) {
super(moduleManager);
this.serviceNameRegisterDAO = getModuleManager().find(StorageModule.NAME).getService(IServiceNameRegisterDAO.class);
this.serviceIdCacheService = getModuleManager().find(CacheModule.NAME).getService(ServiceIdCacheService.class);
......@@ -54,7 +54,7 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
@Override protected void onWork(ServiceName serviceName) throws WorkerException {
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
int serviceId = serviceIdCacheService.get(serviceName.getApplicationId(), serviceName.getServiceName());
int serviceId = serviceIdCacheService.get(serviceName.getApplicationId(), serviceName.getSrcSpanType(), serviceName.getServiceName());
if (serviceId == 0) {
ServiceName newServiceName;
......@@ -62,15 +62,17 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
if (min == 0) {
ServiceName noneServiceName = new ServiceName();
noneServiceName.setId("1");
noneServiceName.setApplicationId(0);
noneServiceName.setApplicationId(Const.NONE_APPLICATION_ID);
noneServiceName.setServiceId(Const.NONE_SERVICE_ID);
noneServiceName.setServiceName(Const.NONE_SERVICE_NAME);
noneServiceName.setSrcSpanType(Const.SPAN_TYPE_VIRTUAL);
serviceNameRegisterDAO.save(noneServiceName);
newServiceName = new ServiceName();
newServiceName.setId("-1");
newServiceName.setApplicationId(serviceName.getApplicationId());
newServiceName.setServiceId(-1);
newServiceName.setSrcSpanType(serviceName.getSrcSpanType());
newServiceName.setServiceName(serviceName.getServiceName());
} else {
int max = serviceNameRegisterDAO.getMaxServiceId();
......@@ -80,6 +82,7 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
newServiceName.setId(String.valueOf(serviceId));
newServiceName.setApplicationId(serviceName.getApplicationId());
newServiceName.setServiceId(serviceId);
newServiceName.setSrcSpanType(serviceName.getSrcSpanType());
newServiceName.setServiceName(serviceName.getServiceName());
}
serviceNameRegisterDAO.save(newServiceName);
......
......@@ -59,14 +59,15 @@ public class ServiceNameService implements IServiceNameService {
return serviceNameRegisterGraph;
}
public int getOrCreate(int applicationId, String serviceName) {
int serviceId = getServiceIdCacheService().get(applicationId, serviceName);
@Override public int getOrCreate(int applicationId, int srcSpanType, String serviceName) {
int serviceId = getServiceIdCacheService().get(applicationId, srcSpanType, serviceName);
if (serviceId == 0) {
ServiceName service = new ServiceName();
service.setId("0");
service.setApplicationId(applicationId);
service.setServiceName(serviceName);
service.setSrcSpanType(srcSpanType);
service.setServiceId(0);
getServiceNameRegisterGraph().start(service);
......
......@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -57,7 +58,7 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
@Override public boolean exchange(ReferenceDecorator standardBuilder, int applicationId) {
if (standardBuilder.getEntryServiceId() == 0 && StringUtils.isNotEmpty(standardBuilder.getEntryServiceName())) {
int entryServiceId = serviceNameService.getOrCreate(instanceCacheService.getApplicationId(standardBuilder.getEntryApplicationInstanceId()), standardBuilder.getEntryServiceName());
int entryServiceId = serviceNameService.getOrCreate(instanceCacheService.getApplicationId(standardBuilder.getEntryApplicationInstanceId()), SpanType.Entry_VALUE, standardBuilder.getEntryServiceName());
if (entryServiceId == 0) {
if (logger.isDebugEnabled()) {
......@@ -73,7 +74,7 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
if (standardBuilder.getParentServiceId() == 0 && StringUtils.isNotEmpty(standardBuilder.getParentServiceName())) {
int parentServiceId = serviceNameService.getOrCreate(instanceCacheService.getApplicationId(standardBuilder.getParentApplicationInstanceId()), standardBuilder.getParentServiceName());
int parentServiceId = serviceNameService.getOrCreate(instanceCacheService.getApplicationId(standardBuilder.getParentApplicationInstanceId()), SpanType.Entry_VALUE, standardBuilder.getParentServiceName());
if (parentServiceId == 0) {
if (logger.isDebugEnabled()) {
......
......@@ -71,7 +71,7 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
}
if (standardBuilder.getOperationNameId() == 0 && StringUtils.isNotEmpty(standardBuilder.getOperationName())) {
int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getOperationName());
int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getSpanTypeValue(), standardBuilder.getOperationName());
if (operationNameId == 0) {
logger.debug("service name: {} from application id: {} exchange failed", standardBuilder.getOperationName(), applicationId);
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.cache.service;
import org.apache.skywalking.apm.collector.core.module.Service;
......@@ -25,5 +24,5 @@ import org.apache.skywalking.apm.collector.core.module.Service;
* @author peng-yongsheng
*/
public interface ServiceIdCacheService extends Service {
int get(int applicationId, String serviceName);
int get(int applicationId, int srcSpanType, String serviceName);
}
......@@ -16,15 +16,14 @@
*
*/
package org.apache.skywalking.apm.collector.cache.guava.service;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.cache.IServiceNameCacheDAO;
import org.slf4j.Logger;
......@@ -53,18 +52,19 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
return this.serviceNameCacheDAO;
}
public int get(int applicationId, String serviceName) {
@Override public int get(int applicationId, int srcSpanType, String serviceName) {
int serviceId = 0;
String id = applicationId + Const.ID_SPLIT + srcSpanType + Const.ID_SPLIT + serviceName;
try {
serviceId = serviceIdCache.get(applicationId + Const.ID_SPLIT + serviceName, () -> getServiceNameCacheDAO().getServiceId(applicationId, serviceName));
serviceId = serviceIdCache.get(id, () -> getServiceNameCacheDAO().getServiceId(applicationId, srcSpanType, serviceName));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
serviceId = getServiceNameCacheDAO().getServiceId(applicationId, serviceName);
serviceId = getServiceNameCacheDAO().getServiceId(applicationId, srcSpanType, serviceName);
if (serviceId != 0) {
serviceIdCache.put(applicationId + Const.ID_SPLIT + serviceName, serviceId);
serviceIdCache.put(id, serviceId);
}
}
return serviceId;
......
......@@ -34,4 +34,5 @@ public class Const {
public static final String EXCEPTION = "Exception";
public static final String EMPTY_STRING = "";
public static final String FILE_SUFFIX = "sw";
public static final int SPAN_TYPE_VIRTUAL = 9;
}
......@@ -27,5 +27,5 @@ import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
public interface IServiceNameCacheDAO extends DAO {
ServiceName get(int serviceId);
int getServiceId(int applicationId, String serviceName);
int getServiceId(int applicationId, int srcSpanType, String serviceName);
}
......@@ -36,10 +36,11 @@ public class ServiceName extends StreamData {
private static final Column[] LONG_COLUMNS = {};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(ServiceNameTable.COLUMN_APPLICATION_ID, new CoverOperation()),
new Column(ServiceNameTable.COLUMN_SERVICE_ID, new CoverOperation()),
new Column(ServiceNameTable.COLUMN_SRC_SPAN_TYPE, new CoverOperation()),
};
private static final Column[] BYTE_COLUMNS = {};
......@@ -87,4 +88,12 @@ public class ServiceName extends StreamData {
public void setServiceId(int serviceId) {
setDataInteger(1, serviceId);
}
public int getSrcSpanType() {
return getDataInteger(2);
}
public void setSrcSpanType(int srcSpanType) {
setDataInteger(2, srcSpanType);
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.storage.table.register;
import org.apache.skywalking.apm.collector.core.data.CommonTable;
......@@ -27,6 +26,8 @@ import org.apache.skywalking.apm.collector.core.data.CommonTable;
public class ServiceNameTable extends CommonTable {
public static final String TABLE = "service_name";
public static final String COLUMN_SERVICE_NAME = "service_name";
public static final String COLUMN_SERVICE_NAME_KEYWORD = "service_name_keyword";
public static final String COLUMN_SRC_SPAN_TYPE = "src_span_type";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_SERVICE_ID = "service_id";
}
......@@ -56,14 +56,15 @@ public class ServiceNameEsCacheDAO extends EsDAO implements IServiceNameCacheDAO
return null;
}
@Override public int getServiceId(int applicationId, String serviceName) {
@Override public int getServiceId(int applicationId, int srcSpanType, String serviceName) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceNameTable.TABLE);
searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.matchQuery(ServiceNameTable.COLUMN_APPLICATION_ID, applicationId));
boolQuery.must().add(QueryBuilders.matchQuery(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName));
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_APPLICATION_ID, applicationId));
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_SRC_SPAN_TYPE, srcSpanType));
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_SERVICE_NAME_KEYWORD, serviceName));
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(1);
......
......@@ -56,6 +56,8 @@ public class ServiceNameRegisterEsDAO extends EsDAO implements IServiceNameRegis
source.put(ServiceNameTable.COLUMN_SERVICE_ID, serviceName.getServiceId());
source.put(ServiceNameTable.COLUMN_APPLICATION_ID, serviceName.getApplicationId());
source.put(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName.getServiceName());
source.put(ServiceNameTable.COLUMN_SERVICE_NAME_KEYWORD, serviceName.getServiceName());
source.put(ServiceNameTable.COLUMN_SRC_SPAN_TYPE, serviceName.getSrcSpanType());
IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, serviceName.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save service name register info, application getApplicationId: {}, service name: {}, status: {}", serviceName.getId(), serviceName.getServiceName(), response.status().name());
......
......@@ -38,6 +38,8 @@ public class ServiceNameEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Text.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME_KEYWORD, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SRC_SPAN_TYPE, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
......@@ -38,7 +38,7 @@ public class ServiceNameH2CacheDAO extends H2DAO implements IServiceNameCacheDAO
private final Logger logger = LoggerFactory.getLogger(ServiceNameH2CacheDAO.class);
private static final String GET_SERVICE_NAME_SQL = "select {0},{1} from {2} where {3} = ?";
private static final String GET_SERVICE_ID_SQL = "select {0} from {1} where {2} = ? and {3} = ? limit 1";
private static final String GET_SERVICE_ID_SQL = "select {0} from {1} where {2} = ? and {3} = ? and {4} = ? limit 1";
public ServiceNameH2CacheDAO(H2Client client) {
super(client);
......@@ -63,11 +63,12 @@ public class ServiceNameH2CacheDAO extends H2DAO implements IServiceNameCacheDAO
return null;
}
@Override public int getServiceId(int applicationId, String serviceName) {
@Override public int getServiceId(int applicationId, int srcSpanType, String serviceName) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SERVICE_ID_SQL, ServiceNameTable.COLUMN_SERVICE_ID,
ServiceNameTable.TABLE, ServiceNameTable.COLUMN_APPLICATION_ID, ServiceNameTable.COLUMN_SERVICE_NAME);
Object[] params = new Object[] {applicationId, serviceName};
String sql = SqlBuilder.buildSql(GET_SERVICE_ID_SQL, ServiceNameTable.COLUMN_SERVICE_ID, ServiceNameTable.TABLE,
ServiceNameTable.COLUMN_APPLICATION_ID, ServiceNameTable.COLUMN_SRC_SPAN_TYPE, ServiceNameTable.COLUMN_SERVICE_NAME);
Object[] params = new Object[] {applicationId, srcSpanType, serviceName};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
return rs.getInt(ServiceNameTable.COLUMN_SERVICE_ID);
......
......@@ -16,18 +16,17 @@
*
*/
package org.apache.skywalking.apm.collector.storage.h2.dao.register;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -61,6 +60,7 @@ public class ServiceNameRegisterH2DAO extends H2DAO implements IServiceNameRegis
source.put(ServiceNameTable.COLUMN_SERVICE_ID, serviceName.getServiceId());
source.put(ServiceNameTable.COLUMN_APPLICATION_ID, serviceName.getApplicationId());
source.put(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName.getServiceName());
source.put(ServiceNameTable.COLUMN_SRC_SPAN_TYPE, serviceName.getSrcSpanType());
String sql = SqlBuilder.buildBatchInsertSql(ServiceNameTable.TABLE, source.keySet());
Object[] params = source.values().toArray(new Object[0]);
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.storage.h2.define.register;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2ColumnDefine;
......@@ -37,5 +36,6 @@ public class ServiceNameH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SRC_SPAN_TYPE, H2ColumnDefine.Type.Int.name()));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册