未验证 提交 a9489b70 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

H2 storage (#1712)

* H2 storage code skeleton.

* Add batch dao for H2.

* Fix wrong refactor.

* Add codes of other dao. No metric dao, wait for @peng-yongsheng new PR.

* Change license of distribution

* Add topology query in H2.

* Add more daos.

* Add some daos.

* Commit all H2 daos. Wait @peng-yongsheng to submit new IAggregationQueryDAO interface.

* Fix some CIs

* Fix wrong refactor, which causes h2 plugin failure.

* Fix all codes of H2 storage. :) and CI should pass. No test.

* Fix CI and typo

* Fix connection closed.

* Fix register bug

* no message

* Fixed service inventory register bug.

* Fix a compile issue.

* Sync change

* Reset es as default storage.

* Fix rat

* Fix H2 compile issue.

* Fix bugs for H2 storage.

* Fix bug in mesh.

* 1. Fix Thermodynamic query bug
2. Fix service inventory query bug
3. Fix missing traceid in trace query bug

* Fix query bug.

* Submit reverse caused by prev PR.

* Fix duplicate spans.

* Use H2 as default storage implementor.

* Fix bug in topology query.

* Fix CI

* Fix rat

* Fix two query in H2 storage.

* Fix H2 query

* Make release running in H2 mode and develop mode running is ES mode in default
上级 b11d4793
......@@ -296,7 +296,7 @@ The text of each license is the standard Apache 2.0 license.
servo 0.10.1: https://github.com/Netflix/servo, Apache 2.0
validation-api 1.1.0.Final: http://beanvalidation.org/licensing/, Apache 2.0
zuul-core 1.3.0: https://github.com/Netflix/zuul, Apache 2.0
ben-manes caffeine 2.6.2: https://github.com/ben-manes/caffeine, Apache 2.0
HikariCP 3.1.0: https://github.com/brettwooldridge/HikariCP, Apache 2.0
zipkin 2.9.1: https://github.com/openzipkin/zipkin, Apache 2.0
sharding-jdbc-core 2.0.3: https://github.com/sharding-sphere/sharding-sphere, Apache 2.0
kubernetes-client 2.0.0: https://github.com/kubernetes-client/java, Apache 2.0
......
......@@ -68,7 +68,7 @@ public class ComponentsDefine {
public static final OfficialComponent JEDIS = new OfficialComponent(30, "Jedis");
public static final OfficialComponent H2_JDBC_DRIVER = new OfficialComponent(32, "h2-jdbc-driver");
public static final OfficialComponent H2_JDBC_DRIVER = new OfficialComponent(32, "jdbc-jdbc-driver");
public static final OfficialComponent MYSQL_JDBC_DRIVER = new OfficialComponent(33, "mysql-connector-java");
......
......@@ -30,7 +30,7 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
* @author zhangxin
*/
public class DriverInstrumentation extends AbstractDriverInstrumentation {
private static final String CLASS_OF_INTERCEPT_H2_DRIVER = "org.h2.Driver";
private static final String CLASS_OF_INTERCEPT_H2_DRIVER = "org.jdbc.Driver";
@Override
protected ClassMatch enhanceClass() {
......
......@@ -29,7 +29,7 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
* @author zhangxin
*/
public class JdbcConnectionInstrumentation extends AbstractConnectionInstrumentation {
public static final String ENHANCE_CLASS = "org.h2.jdbc.JdbcConnection";
public static final String ENHANCE_CLASS = "org.jdbc.jdbc.JdbcConnection";
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
......
......@@ -34,8 +34,8 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
*/
public class JdbcXAConnectionInstrumentation extends AbstractConnectionInstrumentation {
public static final String ENHANCE_CLASS = "org.h2.jdbcx.JdbcXAConnection";
public static final String CONSTRUCTOR_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.jdbc.h2.JdbcXAConnectionConstructorInterceptor";
public static final String ENHANCE_CLASS = "org.jdbc.jdbcx.JdbcXAConnection";
public static final String CONSTRUCTOR_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.jdbc.jdbc.JdbcXAConnectionConstructorInterceptor";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
......
......@@ -60,6 +60,7 @@
<elasticsearch.version>6.3.2</elasticsearch.version>
<joda-time.version>2.9.9</joda-time.version>
<kubernetes.version>2.0.0</kubernetes.version>
<hikaricp.version>3.1.0</hikaricp.version>
</properties>
<dependencies>
......@@ -261,6 +262,11 @@
<artifactId>client-java</artifactId>
<version>${kubernetes.version}</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
......@@ -121,6 +121,7 @@ public class TraceQueryService implements Service {
}
}
trace.getSpans().clear();
trace.getSpans().addAll(sortedSpans);
return trace;
}
......
......@@ -18,17 +18,26 @@
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.*;
import java.util.LinkedList;
import java.util.List;
import lombok.Getter;
/**
* @author peng-yongsheng
*/
public class IntValues {
@Getter private final List<KVInt> values;
@Getter private List<KVInt> values = new LinkedList<>();
public IntValues() {
this.values = new LinkedList<>();
public void addKVInt(KVInt e) {
values.add(e);
}
public int findValue(String id, int defaultValue) {
for (KVInt value : values) {
if (value.getId().equals(id)) {
return value.getValue();
}
}
return defaultValue;
}
}
......@@ -33,7 +33,28 @@ public class Thermodynamic {
this.nodes = new ArrayList<>();
}
public void setNodeValue(int columnNum, int rowNum, Long value) {
public void fromMatrixData(List<List<Long>> thermodynamicValueMatrix, int numOfSteps) {
thermodynamicValueMatrix.forEach(columnOfThermodynamic -> {
if (columnOfThermodynamic.size() == 0) {
if (numOfSteps > 0) {
for (int i = 0; i < numOfSteps; i++) {
columnOfThermodynamic.add(0L);
}
}
}
}
);
for (int colNum = 0; colNum < thermodynamicValueMatrix.size(); colNum++) {
List<Long> column = thermodynamicValueMatrix.get(colNum);
for (int rowNum = 0; rowNum < column.size(); rowNum++) {
Long value = column.get(rowNum);
this.setNodeValue(colNum, rowNum, value);
}
}
}
private void setNodeValue(int columnNum, int rowNum, Long value) {
List<Long> element = new ArrayList<>(3);
element.add((long)columnNum);
element.add((long)rowNum);
......
......@@ -71,10 +71,10 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
try {
sources.values().forEach(source -> {
try {
RegisterSource newSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(newSource)) {
newSource.combine(source);
registerDAO.forceUpdate(modelName, newSource);
RegisterSource dbSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(dbSource)) {
dbSource.combine(source);
registerDAO.forceUpdate(modelName, dbSource);
} else {
int sequence = registerDAO.max(modelName);
source.setSequence(sequence + 1);
......
......@@ -38,6 +38,22 @@ public class ServiceRelation extends Source {
return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId) + Const.ID_SPLIT + String.valueOf(componentId);
}
/**
* @param entityId
* @return 1. sourceServiceId 2. destServiceId 3. componentId
*/
public static Integer[] splitEntityId(String entityId) {
String[] parts = entityId.split(Const.ID_SPLIT);
if (parts.length != 3) {
throw new RuntimeException("Illegal ServiceRelation eneity id");
}
Integer[] ids = new Integer[3];
ids[0] = Integer.parseInt(parts[0]);
ids[1] = Integer.parseInt(parts[1]);
ids[2] = Integer.parseInt(parts[2]);
return ids;
}
@Getter @Setter private int sourceServiceId;
@Getter @Setter private String sourceServiceName;
@Getter @Setter private String sourceServiceInstanceName;
......
......@@ -24,9 +24,9 @@ import org.apache.skywalking.oap.server.core.query.entity.Step;
/**
* @author peng-yongsheng
*/
public class DownsampleingModelNameBuilder {
public class DownSamplingModelNameBuilder {
private DownsampleingModelNameBuilder() {
private DownSamplingModelNameBuilder() {
}
public static String build(Step step, String modelName) {
......
......@@ -26,4 +26,8 @@ public class StorageException extends Exception {
public StorageException(String message) {
super(message);
}
public StorageException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -64,7 +64,7 @@ public abstract class ModelInstaller {
for (Model model : downsamplingModels) {
if (!isExists(client, model)) {
logger.info("table: {} not exists", model.getName());
logger.info("table: {} does not exist", model.getName());
createTable(client, model);
} else if (debug) {
logger.info("table: {} exists", model.getName());
......
......@@ -36,8 +36,8 @@
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>io.shardingjdbc</groupId>
......
......@@ -16,20 +16,21 @@
*
*/
package org.apache.skywalking.oap.server.library.client.h2;
package org.apache.skywalking.oap.server.library.client.jdbc;
import org.apache.skywalking.oap.server.library.client.ClientException;
import java.io.IOException;
/**
* @author peng-yongsheng
* @author wusheng
*/
public class H2ClientException extends ClientException {
public class JDBCClientException extends IOException {
public H2ClientException(String message) {
public JDBCClientException(String message) {
super(message);
}
public H2ClientException(String message, Throwable cause) {
public JDBCClientException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -16,98 +16,113 @@
*
*/
package org.apache.skywalking.oap.server.library.client.h2;
package org.apache.skywalking.oap.server.library.client.jdbc.hikaricp;
import java.sql.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.client.Client;
import org.h2.util.IOUtils;
import org.slf4j.*;
import org.apache.skywalking.oap.server.library.client.ClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
* JDBC Client uses HikariCP connection management lib to execute SQL.
*
* @author wusheng
*/
public class H2Client implements Client {
public class JDBCHikariCPClient implements Client {
private final Logger logger = LoggerFactory.getLogger(JDBCHikariCPClient.class);
private final Logger logger = LoggerFactory.getLogger(H2Client.class);
private HikariDataSource dataSource;
private HikariConfig hikariConfig;
private Connection conn;
private String url;
private String userName;
private String password;
public JDBCHikariCPClient(Properties properties) {
hikariConfig = new HikariConfig(properties);
}
public H2Client() {
this.url = "jdbc:h2:mem:collector";
this.userName = "";
this.password = "";
@Override public void initialize() throws ClientException {
dataSource = new HikariDataSource(hikariConfig);
}
public H2Client(String url, String userName, String password) {
this.url = url;
this.userName = userName;
this.password = password;
@Override public void shutdown() {
}
@Override public void initialize() throws H2ClientException {
public Connection getConnection() throws JDBCClientException {
try {
Class.forName("org.h2.Driver");
conn = DriverManager.
getConnection(this.url, this.userName, this.password);
} catch (Exception e) {
throw new H2ClientException(e.getMessage(), e);
Connection connection = dataSource.getConnection();
connection.setAutoCommit(true);
return connection;
} catch (SQLException e) {
throw new JDBCClientException(e.getMessage(), e);
}
}
@Override public void shutdown() {
IOUtils.closeSilently(conn);
public void close(Connection connection) {
if (connection != null) {
try {
connection.commit();
connection.close();
} catch (SQLException e) {
}
}
}
public Connection getConnection() {
return conn;
}
public void execute(Connection connection, String sql) throws JDBCClientException {
try {
connection.setReadOnly(true);
} catch (SQLException e) {
public void execute(String sql) throws H2ClientException {
try (Statement statement = getConnection().createStatement()) {
}
logger.debug("execute aql: {}", sql);
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
throw new JDBCClientException(e.getMessage(), e);
}
}
public ResultSet executeQuery(String sql, Object[] params) throws H2ClientException {
public ResultSet executeQuery(Connection connection, String sql, Object... params) throws JDBCClientException {
logger.debug("execute query with result: {}", sql);
ResultSet rs;
PreparedStatement statement;
PreparedStatement statement = null;
try {
statement = getConnection().prepareStatement(sql);
statement = connection.prepareStatement(sql);
if (params != null) {
for (int i = 0; i < params.length; i++) {
statement.setObject(i + 1, params[i]);
Object param = params[i];
if (param instanceof String) {
statement.setString(i + 1, (String)param);
} else if (param instanceof Integer) {
statement.setInt(i + 1, (int)param);
} else if (param instanceof Double) {
statement.setDouble(i + 1, (double)param);
} else if (param instanceof Long) {
statement.setLong(i + 1, (long)param);
} else {
throw new JDBCClientException("Unsupported data type, type=" + param.getClass().getName());
}
}
}
rs = statement.executeQuery();
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
return rs;
}
public boolean execute(String sql, Object[] params) throws H2ClientException {
logger.debug("execute insert/update/delete: {}", sql);
boolean flag;
Connection conn = getConnection();
try (PreparedStatement statement = conn.prepareStatement(sql)) {
conn.setAutoCommit(true);
if (params != null) {
for (int i = 0; i < params.length; i++) {
statement.setObject(i + 1, params[i]);
if (statement != null) {
try {
statement.close();
} catch (SQLException e1) {
}
}
flag = statement.execute();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
throw new JDBCClientException(e.getMessage(), e);
}
return flag;
return rs;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client.shardingjdbc;
import io.shardingjdbc.core.api.ShardingDataSourceFactory;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
import java.sql.*;
import java.util.*;
import javax.sql.DataSource;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.skywalking.oap.server.library.client.Client;
import org.slf4j.*;
/**
* @author linjiaqi
*/
public class ShardingjdbcClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(ShardingjdbcClient.class);
private Map<String, ShardingjdbcClientConfig> shardingjdbcClientConfig;
private ShardingRuleConfiguration shardingRuleConfiguration;
private Map<String, DataSource> shardingDataSource = new HashMap<String, DataSource>();
private DataSource dataSource;
public ShardingjdbcClient(Map<String, ShardingjdbcClientConfig> shardingjdbcClientConfig,
ShardingRuleConfiguration shardingRuleConfiguration) {
this.shardingjdbcClientConfig = shardingjdbcClientConfig;
this.shardingRuleConfiguration = shardingRuleConfiguration;
}
@Override public void initialize() throws ShardingjdbcClientException {
try {
shardingjdbcClientConfig.forEach((key, value) -> {
BasicDataSource basicDataSource = new BasicDataSource();
basicDataSource.setDriverClassName(value.getDriverClass());
basicDataSource.setUrl(value.getUrl());
basicDataSource.setUsername(value.getUserName());
basicDataSource.setPassword(value.getPassword());
shardingDataSource.put(key, basicDataSource);
logger.info("add sharding datasource: {}, url: {}", key, value.getUrl());
});
dataSource = ShardingDataSourceFactory.createDataSource(shardingDataSource, shardingRuleConfiguration,
new HashMap<String, Object>(), new Properties());
} catch (Exception e) {
logger.error("case the exception is 'Cannot load JDBC driver class', please add the driver mysql-connector-java-5.1.36.jar to collector-libs manual");
throw new ShardingjdbcClientException(e.getMessage(), e);
}
}
@Override public void shutdown() {
}
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
public void execute(String sql) throws ShardingjdbcClientException {
Connection conn = null;
Statement statement = null;
try {
conn = getConnection();
statement = conn.createStatement();
statement.execute(sql);
} catch (SQLException e) {
throw new ShardingjdbcClientException(e.getMessage(), e);
} finally {
try {
if (statement != null) {
statement.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
throw new ShardingjdbcClientException(e.getMessage(), e);
}
}
}
public ResultSet executeQuery(String sql, Object[] params) throws ShardingjdbcClientException {
logger.debug("execute query with result: {}", sql);
ResultSet rs;
PreparedStatement statement;
try {
statement = getConnection().prepareStatement(sql);
if (params != null) {
for (int i = 0; i < params.length; i++) {
statement.setObject(i + 1, params[i]);
}
}
rs = statement.executeQuery();
} catch (SQLException e) {
throw new ShardingjdbcClientException(e.getMessage(), e);
}
return rs;
}
public boolean execute(String sql, Object[] params) throws ShardingjdbcClientException {
logger.debug("execute insert/update/delete: {}", sql);
boolean flag;
Connection conn = null;
PreparedStatement statement = null;
try {
conn = getConnection();
conn.setAutoCommit(true);
statement = conn.prepareStatement(sql);
if (params != null) {
for (int i = 0; i < params.length; i++) {
statement.setObject(i + 1, params[i]);
}
}
flag = statement.execute();
} catch (SQLException e) {
throw new ShardingjdbcClientException(e.getMessage(), e);
} finally {
try {
if (statement != null) {
statement.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
throw new ShardingjdbcClientException(e.getMessage(), e);
}
}
return flag;
}
}
Subproject commit 3a83be79a9c23aad6576ed2a4a04b82de6d7a829
Subproject commit 1122e97b5604ae96447bd58ecdb248d7e02952aa
......@@ -22,6 +22,10 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.register.module.RegisterModule;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc.ApplicationRegisterHandler;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc.InstanceDiscoveryServiceHandler;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc.NetworkAddressRegisterServiceHandler;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc.ServiceNameDiscoveryHandler;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.rest.*;
/**
......@@ -45,6 +49,13 @@ public class RegisterModuleProvider extends ModuleProvider {
}
@Override public void start() {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
......
......@@ -85,7 +85,7 @@
<!-- storage module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-h2-plugin</artifactId>
<artifactId>storage-jdbc-hikaricp-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......
......@@ -44,15 +44,19 @@ core:
dayMetricsDataTTL: 45 # Unit is day
monthMetricsDataTTL: 18 # Unit is month
storage:
elasticsearch:
clusterNodes: localhost:9200
indexShardsNumber: 2
indexReplicasNumber: 0
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: 2000 # Execute the bulk every 2000 requests
bulkSize: 20 # flush the bulk every 20mb
flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: 2 # the number of concurrent requests
h2:
driver: org.h2.jdbcx.JdbcDataSource
url: jdbc:h2:mem:skywalking-oap-db
user: sa
# elasticsearch:
# clusterNodes: localhost:9200
# indexShardsNumber: 2
# indexReplicasNumber: 0
# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
# bulkActions: 2000 # Execute the bulk every 2000 requests
# bulkSize: 20 # flush the bulk every 20mb
# flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
# concurrentRequests: 2 # the number of concurrent requests
receiver-register:
default:
receiver-trace:
......
......@@ -53,6 +53,10 @@ storage:
bulkSize: 20 # flush the bulk every 20mb
flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: 2 # the number of concurrent requests
# h2:
# driver: org.h2.jdbcx.JdbcDataSource
# url: jdbc:h2:mem:skywalking-oap-db
# user: sa
receiver-register:
default:
receiver-trace:
......
......@@ -17,7 +17,7 @@
~
-->
<Configuration status="INFO">
<Configuration status="DEBUG">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c - %L [%t] %-5p %x - %m%n"/>
......@@ -33,8 +33,9 @@
<logger name="org.apache.http" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core.alarm.AlarmStandardPersistence" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core.remote" level="INFO"/>
<Root level="INFO">
<logger name="org.apache.skywalking.oap.server.core" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.library.buffer" level="INFO"/>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -30,7 +30,7 @@
<artifactId>server-storage-plugin</artifactId>
<packaging>pom</packaging>
<modules>
<module>storage-h2-plugin</module>
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
</modules>
......
......@@ -53,7 +53,7 @@ public class EndpointInventoryCacheEsDAO extends EsDAO implements IEndpointInven
return Const.NONE;
}
} catch (Throwable e) {
logger.error(e.getMessage());
logger.error(e.getMessage(), e);
return Const.NONE;
}
}
......@@ -72,7 +72,7 @@ public class EndpointInventoryCacheEsDAO extends EsDAO implements IEndpointInven
return null;
}
} catch (Throwable e) {
logger.error(e.getMessage());
logger.error(e.getMessage(), e);
return null;
}
}
......
......@@ -48,7 +48,7 @@ public class NetworkAddressInventoryCacheEsDAO extends EsDAO implements INetwork
String id = NetworkAddressInventory.buildId(networkAddress);
GetResponse response = getClient().get(NetworkAddressInventory.MODEL_NAME, id);
if (response.isExists()) {
return (int)response.getSource().getOrDefault(RegisterSource.SEQUENCE, 0);
return (int)response.getSource().getOrDefault(NetworkAddressInventory.SEQUENCE, 0);
} else {
return Const.NONE;
}
......
......@@ -23,7 +23,7 @@ import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.DownsampleingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
......@@ -46,7 +46,7 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
@Override
public List<TopNEntity> getServiceTopN(String indName, String valueCName, int topN, Step step, long startTB,
long endTB, Order order) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
......@@ -55,7 +55,7 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
@Override public List<TopNEntity> getAllServiceInstanceTopN(String indName, String valueCName, int topN, Step step,
long startTB, long endTB, Order order) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
......@@ -64,7 +64,7 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
@Override public List<TopNEntity> getServiceInstanceTopN(int serviceId, String indName, String valueCName, int topN,
Step step, long startTB, long endTB, Order order) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
......@@ -80,7 +80,7 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
@Override
public List<TopNEntity> getAllEndpointTopN(String indName, String valueCName, int topN, Step step, long startTB,
long endTB, Order order) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
......@@ -90,7 +90,7 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
@Override
public List<TopNEntity> getEndpointTopN(int serviceId, String indName, String valueCName, int topN, Step step,
long startTB, long endTB, Order order) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
......
......@@ -23,7 +23,7 @@ import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.query.sql.*;
import org.apache.skywalking.oap.server.core.storage.DownsampleingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
......@@ -46,7 +46,7 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
public IntValues getValues(String indName, Step step, long startTB, long endTB, Where where, String valueCName,
Function function) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
queryBuild(sourceBuilder, where, startTB, endTB);
......@@ -101,7 +101,7 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
@Override public IntValues getLinearIntValues(String indName, Step step, List<String> ids,
String valueCName) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
MultiGetResponse response = getClient().multiGet(indexName, ids);
......@@ -122,7 +122,7 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
@Override public Thermodynamic getThermodynamic(String indName, Step step, List<String> ids,
String valueCName) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, indName);
String indexName = DownSamplingModelNameBuilder.build(step, indName);
MultiGetResponse response = getClient().multiGet(indexName, ids);
......@@ -138,7 +138,7 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
} else {
int axisYStep = ((Number)source.get(ThermodynamicIndicator.STEP)).intValue();
thermodynamic.setAxisYStep(axisYStep);
numOfSteps = ((Number)source.get(ThermodynamicIndicator.NUM_OF_STEPS)).intValue();
numOfSteps = ((Number)source.get(ThermodynamicIndicator.NUM_OF_STEPS)).intValue() + 1;
String value = (String)source.get(ThermodynamicIndicator.DETAIL_GROUP);
IntKeyLongValueArray intKeyLongValues = new IntKeyLongValueArray(5);
......@@ -157,26 +157,7 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
}
}
int defaultNumOfSteps = numOfSteps;
thermodynamicValueMatrix.forEach(columnOfThermodynamic -> {
if (columnOfThermodynamic.size() == 0) {
if (defaultNumOfSteps > 0) {
for (int i = 0; i < defaultNumOfSteps; i++) {
columnOfThermodynamic.add(0L);
}
}
}
}
);
for (int colNum = 0; colNum < thermodynamicValueMatrix.size(); colNum++) {
List<Long> column = thermodynamicValueMatrix.get(colNum);
for (int rowNum = 0; rowNum < column.size(); rowNum++) {
Long value = column.get(rowNum);
thermodynamic.setNodeValue(colNum, rowNum, value);
}
}
thermodynamic.fromMatrixData(thermodynamicValueMatrix, numOfSteps);
return thermodynamic;
}
......
......@@ -19,20 +19,25 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointRelationServerSideIndicator;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceRelationClientSideIndicator;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceRelationServerSideIndicator;
import org.apache.skywalking.oap.server.core.query.entity.Call;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.DownsampleingModelNameBuilder;
import org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
......@@ -57,7 +62,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
sourceBuilder.size(0);
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = DownsampleingModelNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
String indexName = DownSamplingModelNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
return load(sourceBuilder, indexName, DetectPoint.SERVER);
}
......@@ -72,7 +77,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
sourceBuilder.size(0);
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = DownsampleingModelNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
String indexName = DownSamplingModelNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
return load(sourceBuilder, indexName, DetectPoint.CLIENT);
}
......@@ -95,7 +100,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
}
@Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
String indexName = DownSamplingModelNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
sourceBuilder.size(0);
......@@ -104,7 +109,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
}
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
String indexName = DownSamplingModelNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
sourceBuilder.size(0);
......@@ -115,7 +120,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
@Override
public List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, EndpointRelationServerSideIndicator.INDEX_NAME);
String indexName = DownSamplingModelNameBuilder.build(step, EndpointRelationServerSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
......@@ -143,16 +148,15 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
Terms entityTerms = response.getAggregations().get(Indicator.ENTITY_ID);
for (Terms.Bucket entityBucket : entityTerms.getBuckets()) {
String entityId = entityBucket.getKeyAsString();
String[] ids = entityId.split(Const.ID_SPLIT);
if (ids.length == 3) {
Call call = new Call();
call.setId(entityBucket.getKeyAsString());
call.setSource(Integer.valueOf(ids[0]));
call.setTarget(Integer.valueOf(ids[1]));
call.setComponentId(Integer.valueOf(ids[2]));
call.setDetectPoint(detectPoint);
calls.add(call);
}
Integer[] entityIds = ServiceRelation.splitEntityId(entityId);
Call call = new Call();
call.setId(entityId);
call.setSource(entityIds[0]);
call.setTarget(entityIds[1]);
call.setComponentId(entityIds[2]);
call.setDetectPoint(detectPoint);
calls.add(call);
}
return calls;
}
......
......@@ -27,7 +27,24 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>storage-h2-plugin</artifactId>
<artifactId>storage-jdbc-hikaricp-plugin</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc;
/**
* SQLBuilder
*
* @author wusheng
*/
public class SQLBuilder {
private static String LINE_END = System.lineSeparator();
private StringBuilder text;
public SQLBuilder() {
this.text = new StringBuilder();
}
public SQLBuilder(String initLine) {
this();
this.appendLine(initLine);
}
public SQLBuilder append(String fragment) {
text.append(fragment);
return this;
}
public SQLBuilder appendLine(String line) {
text.append(line).append(LINE_END);
return this;
}
public String toStringInNewLine() {
return LINE_END + toString();
}
@Override public String toString() {
return text.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
/**
* A SQL executor.
*
* @author wusheng
*/
public class SQLExecutor {
private String sql;
private List<Object> param;
public SQLExecutor(String sql, List<Object> param) {
this.sql = sql;
this.param = param;
}
public void invoke(Connection connection) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (int i = 0; i < param.size(); i++) {
preparedStatement.setObject(i + 1, param.get(i));
}
preparedStatement.execute();
}
}
......@@ -16,38 +16,23 @@
*
*/
package org.apache.skywalking.oap.server.library.client.h2;
package org.apache.skywalking.oap.server.storage.plugin.jdbc;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.storage.model.Model;
/**
* @author peng-yongsheng
* @author wusheng
*/
public abstract class H2ClientConfig {
private String url;
private String userName;
private String password;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public class TableMetaInfo {
private static Map<String, Model> TABLES = new HashMap<>();
public String getPassword() {
return password;
public static void addModel(Model model) {
TABLES.put(model.getName(), model);
}
public void setPassword(String password) {
this.password = password;
public static Model get(String moduleName) {
return TABLES.get(moduleName);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class H2RegisterLockInstaller {
public static final String LOCK_TABLE_NAME = "register_lock";
private static final Logger logger = LoggerFactory.getLogger(H2RegisterLockInstaller.class);
/**
* For H2 storage, no concurrency situation, so, on lock table required. If someone wants to implement a storage by
* referring H2, please consider to create a LOCK table.
*
* @param client
* @throws StorageException
*/
public void install(Client client) throws StorageException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author wusheng
*/
@Setter
@Getter
public class H2StorageConfig extends ModuleConfig {
private String driver = "org.h2.jdbcx.JdbcDataSource";
private String url = "jdbc:h2:mem:collector";
private String user = "";
private String password = "";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.ClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* H2 Storage provider is for demonstration and preview only.
* I will find that haven't implemented several interfaces, because not necessary,
* and don't consider about performance very much.
*
* If someone wants to implement SQL-style database as storage, please just refer the logic.
*
* @author wusheng
*/
public class H2StorageProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(H2StorageProvider.class);
private H2StorageConfig config;
private JDBCHikariCPClient h2Client;
public H2StorageProvider() {
config = new H2StorageConfig();
}
@Override public String name() {
return "h2";
}
@Override public Class<? extends ModuleDefine> module() {
return StorageModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
Properties settings = new Properties();
settings.setProperty("dataSourceClassName", config.getDriver());
settings.setProperty("dataSource.url", config.getUrl());
settings.setProperty("dataSource.user", config.getUser());
settings.setProperty("dataSource.password", config.getPassword());
h2Client = new JDBCHikariCPClient(settings);
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(h2Client));
this.registerServiceImplementation(IRegisterLockDAO.class, new H2RegisterLockDAO());
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new H2ServiceInventoryCacheDAO(h2Client));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new H2ServiceInstanceInventoryCacheDAO(h2Client));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new H2EndpointInventoryCacheDAO(h2Client));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new H2NetworkAddressInventoryCacheDAO(h2Client));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
this.registerServiceImplementation(IMetricQueryDAO.class, new H2MetricQueryDAO(h2Client));
this.registerServiceImplementation(ITraceQueryDAO.class, new H2TraceQueryDAO(h2Client));
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO());
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO());
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
try {
h2Client.initialize();
H2TableInstaller installer = new H2TableInstaller(getManager());
installer.install(h2Client);
new H2RegisterLockInstaller().install(h2Client);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
} catch (ClientException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.query.entity.Order;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.query.entity.TopNEntity;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
/**
* @author wusheng
*/
public class H2AggregationQueryDAO implements IAggregationQueryDAO {
private JDBCHikariCPClient h2Client;
public H2AggregationQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override
public List<TopNEntity> getServiceTopN(String indName, String valueCName, int topN, Step step,
long startTB, long endTB, Order order) throws IOException {
return topNQuery(indName, valueCName, topN, step, startTB, endTB, order, null);
}
@Override public List<TopNEntity> getAllServiceInstanceTopN(String indName, String valueCName, int topN,
Step step, long startTB, long endTB, Order order) throws IOException {
return topNQuery(indName, valueCName, topN, step, startTB, endTB, order, null);
}
@Override
public List<TopNEntity> getServiceInstanceTopN(int serviceId, String indName, String valueCName,
int topN, Step step, long startTB, long endTB, Order order) throws IOException {
return topNQuery(indName, valueCName, topN, step, startTB, endTB, order, (sql, conditions) -> {
sql.append(" and ").append(ServiceInstanceInventory.SERVICE_ID).append("=?");
conditions.add(serviceId);
});
}
@Override
public List<TopNEntity> getAllEndpointTopN(String indName, String valueCName, int topN, Step step,
long startTB, long endTB, Order order) throws IOException {
return topNQuery(indName, valueCName, topN, step, startTB, endTB, order, null);
}
@Override public List<TopNEntity> getEndpointTopN(int serviceId, String indName, String valueCName,
int topN, Step step, long startTB, long endTB, Order order) throws IOException {
return topNQuery(indName, valueCName, topN, step, startTB, endTB, order, (sql, conditions) -> {
sql.append(" and ").append(EndpointInventory.SERVICE_ID).append("=?");
conditions.add(serviceId);
});
}
public List<TopNEntity> topNQuery(String indName, String valueCName, int topN, Step step,
long startTB, long endTB, Order order, AppendCondition appender) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, indName);
StringBuilder sql = new StringBuilder();
List<Object> conditions = new ArrayList<>(10);
sql.append("select * from (select avg(").append(valueCName).append(") value,").append(Indicator.ENTITY_ID).append(" from ")
.append(tableName).append(" where ");
this.setTimeRangeCondition(sql, conditions, startTB, endTB);
if (appender != null) {
appender.append(sql, conditions);
}
sql.append(" group by ").append(Indicator.ENTITY_ID);
sql.append(") order by value ").append(order.equals(Order.ASC) ? "asc" : "desc").append(" limit ").append(topN);
Connection connection = null;
List<TopNEntity> topNEntities = new ArrayList<>();
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), conditions.toArray(new Object[0]));
try {
while (resultSet.next()) {
TopNEntity topNEntity = new TopNEntity();
topNEntity.setId(resultSet.getString(Indicator.ENTITY_ID));
topNEntity.setValue(resultSet.getInt("value"));
topNEntities.add(topNEntity);
}
} catch (SQLException e) {
throw new IOException(e);
}
} finally {
h2Client.close(connection);
}
return topNEntities;
}
private void setTimeRangeCondition(StringBuilder sql, List<Object> conditions, long startTimestamp,
long endTimestamp) {
sql.append(Indicator.TIME_BUCKET).append(" >= ? and ").append(Indicator.TIME_BUCKET).append(" <= ?");
conditions.add(startTimestamp);
conditions.add(endTimestamp);
}
private interface AppendCondition {
void append(StringBuilder sql, List<Object> conditions);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.query.entity.Alarms;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
/**
* As a demo show env, not necessary to support alarm.
*
* @author wusheng
*/
public class H2AlarmQueryDAO implements IAlarmQueryDAO {
@Override
public Alarms getAlarm(Scope scope, String keyword, int limit, int from, long startTB,
long endTB) throws IOException {
return new Alarms();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class H2BatchDAO implements IBatchDAO {
private static final Logger logger = LoggerFactory.getLogger(H2BatchDAO.class);
private JDBCHikariCPClient h2Client;
public H2BatchDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public void batchPersistence(List<?> batchCollection) {
if (batchCollection.size() == 0) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("batch sql statements execute, data size: {}", batchCollection.size());
}
try (Connection connection = h2Client.getConnection()) {
for (Object exe : batchCollection) {
SQLExecutor sqlExecutor = (SQLExecutor)exe;
sqlExecutor.invoke(connection);
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
} catch (JDBCClientException e) {
logger.error(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class H2EndpointInventoryCacheDAO extends H2SQLExecutor implements IEndpointInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(H2EndpointInventoryCacheDAO.class);
private JDBCHikariCPClient h2Client;
public H2EndpointInventoryCacheDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public int getEndpointId(int serviceId, String endpointName) {
String id = EndpointInventory.buildId(serviceId, endpointName);
return getEntityIDByID(h2Client, EndpointInventory.SEQUENCE, EndpointInventory.MODEL_NAME, id);
}
@Override public EndpointInventory get(int endpointId) {
try {
return (EndpointInventory)getByColumn(h2Client, EndpointInventory.MODEL_NAME, EndpointInventory.SEQUENCE, endpointId, new EndpointInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
}
}
}
......@@ -16,20 +16,17 @@
*
*/
package org.apache.skywalking.oap.server.library.client.shardingjdbc;
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import org.apache.skywalking.oap.server.library.client.ClientException;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
/**
* @author linjiaqi
* @author wusheng
*/
public class ShardingjdbcClientException extends ClientException {
public class H2HistoryDeleteDAO implements IHistoryDeleteDAO {
@Override
public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
public ShardingjdbcClientException(String message) {
super(message);
}
public ShardingjdbcClientException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
/**
* @author wusheng
*/
public class H2IndicatorDAO extends H2SQLExecutor implements IIndicatorDAO<SQLExecutor, SQLExecutor> {
private JDBCHikariCPClient h2Client;
private StorageBuilder<Indicator> storageBuilder;
public H2IndicatorDAO(JDBCHikariCPClient h2Client, StorageBuilder<Indicator> storageBuilder) {
this.h2Client = h2Client;
this.storageBuilder = storageBuilder;
}
@Override public Indicator get(String modelName, Indicator indicator) throws IOException {
return (Indicator)getByID(h2Client, modelName, indicator.id(), storageBuilder);
}
@Override public SQLExecutor prepareBatchInsert(String modelName, Indicator indicator) throws IOException {
return getInsertExecutor(modelName, indicator, storageBuilder);
}
@Override public SQLExecutor prepareBatchUpdate(String modelName, Indicator indicator) throws IOException {
return getUpdateExecutor(modelName, indicator, storageBuilder);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.Attribute;
import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
import org.apache.skywalking.oap.server.core.query.entity.LanguageTrans;
import org.apache.skywalking.oap.server.core.query.entity.Service;
import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.StringUtils;
/**
* @author wusheng
*/
public class H2MetadataQueryDAO implements IMetadataQueryDAO {
private JDBCHikariCPClient h2Client;
public H2MetadataQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public int numOfService(long startTimestamp, long endTimestamp) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(ServiceInventory.MODEL_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
while (resultSet.next()) {
return resultSet.getInt("num");
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return 0;
}
@Override public int numOfEndpoint(long startTimestamp, long endTimestamp) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(EndpointInventory.MODEL_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(EndpointInventory.DETECT_POINT).append("=").append(DetectPoint.SERVER.ordinal());
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
while (resultSet.next()) {
return resultSet.getInt("num");
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return 0;
}
@Override public int numOfConjectural(long startTimestamp, long endTimestamp,
int srcLayer) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(NetworkAddressInventory.MODEL_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(NetworkAddressInventory.SRC_LAYER).append("=?");
condition.add(srcLayer);
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
while (resultSet.next()) {
return resultSet.getInt("num");
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return 0;
}
@Override
public List<Service> getAllServices(long startTimestamp, long endTimestamp) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=? limit 100");
condition.add(BooleanUtils.FALSE);
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
return buildServices(resultSet);
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
}
@Override public List<Service> searchServices(long startTimestamp, long endTimestamp,
String keyword) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=?");
condition.add(BooleanUtils.FALSE);
if (StringUtils.isNotEmpty(keyword)) {
sql.append(" and ").append(ServiceInventory.NAME).append(" like \"%").append(keyword).append("%\"");
}
sql.append(" limit 100");
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
return buildServices(resultSet);
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
}
@Override public Service searchService(String serviceCode) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append(ServiceInventory.IS_ADDRESS).append("=?");
condition.add(BooleanUtils.FALSE);
sql.append(" and ").append(ServiceInventory.NAME).append(" = ?");
condition.add(serviceCode);
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
while (resultSet.next()) {
Service service = new Service();
service.setId(resultSet.getInt(ServiceInventory.SEQUENCE));
service.setName(resultSet.getString(ServiceInventory.NAME));
return service;
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return null;
}
@Override public List<Endpoint> searchEndpoint(String keyword, String serviceId,
int limit) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(EndpointInventory.MODEL_NAME).append(" where ");
sql.append(EndpointInventory.SERVICE_ID).append("=?");
condition.add(serviceId);
if (StringUtils.isNotEmpty(keyword)) {
sql.append(" and ").append(EndpointInventory.NAME).append(" like \"%").append(keyword).append("%\" ");
}
sql.append(" and ").append(EndpointInventory.DETECT_POINT).append(" = ?");
condition.add(DetectPoint.SERVER.ordinal());
sql.append(" limit ").append(limit);
List<Endpoint> endpoints = new ArrayList<>();
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
while (resultSet.next()) {
Endpoint endpoint = new Endpoint();
endpoint.setId(resultSet.getInt(EndpointInventory.SEQUENCE));
endpoint.setName(resultSet.getString(EndpointInventory.NAME));
endpoints.add(endpoint);
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return endpoints;
}
@Override public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp,
String serviceId) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInstanceInventory.MODEL_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInstanceInventory.SERVICE_ID).append("=?");
condition.add(serviceId);
Connection connection = null;
List<ServiceInstance> serviceInstances = new ArrayList<>();
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
while (resultSet.next()) {
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setId(resultSet.getString(ServiceInstanceInventory.SEQUENCE));
serviceInstance.setName(resultSet.getString(ServiceInstanceInventory.NAME));
int languageId = resultSet.getInt(ServiceInstanceInventory.LANGUAGE);
serviceInstance.setLanguage(LanguageTrans.INSTANCE.value(languageId));
String osName = resultSet.getString(ServiceInstanceInventory.OS_NAME);
if (StringUtils.isNotEmpty(osName)) {
serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.OS_NAME, osName));
}
String hostName = resultSet.getString(ServiceInstanceInventory.HOST_NAME);
if (StringUtils.isNotEmpty(hostName)) {
serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.HOST_NAME, hostName));
}
serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.PROCESS_NO, resultSet.getString(ServiceInstanceInventory.PROCESS_NO)));
List<String> ipv4s = ServiceInstanceInventory.AgentOsInfo.ipv4sDeserialize(resultSet.getString(ServiceInstanceInventory.IPV4S));
for (String ipv4 : ipv4s) {
serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.IPV4S, ipv4));
}
serviceInstances.add(serviceInstance);
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return serviceInstances;
}
private void setTimeRangeCondition(StringBuilder sql, List<Object> conditions, long startTimestamp,
long endTimestamp) {
sql.append(" ( (").append(RegisterSource.HEARTBEAT_TIME).append(" >= ? and ")
.append(RegisterSource.REGISTER_TIME).append(" <= ? )");
conditions.add(endTimestamp);
conditions.add(endTimestamp);
sql.append(" or (").append(RegisterSource.REGISTER_TIME).append(" <= ? and ")
.append(RegisterSource.HEARTBEAT_TIME).append(" >= ? ) ) ");
conditions.add(endTimestamp);
conditions.add(startTimestamp);
}
private List<Service> buildServices(ResultSet resultSet) throws SQLException {
List<Service> services = new ArrayList<>();
while (resultSet.next()) {
Service service = new Service();
service.setId(resultSet.getInt(ServiceInventory.SEQUENCE));
service.setName(resultSet.getString(ServiceInventory.NAME));
services.add(service);
}
return services;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValue;
import org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray;
import org.apache.skywalking.oap.server.core.analysis.indicator.ThermodynamicIndicator;
import org.apache.skywalking.oap.server.core.query.entity.IntValues;
import org.apache.skywalking.oap.server.core.query.entity.KVInt;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.query.entity.Thermodynamic;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.query.sql.KeyValues;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
/**
* @author wusheng
*/
public class H2MetricQueryDAO extends H2SQLExecutor implements IMetricQueryDAO {
private JDBCHikariCPClient h2Client;
public H2MetricQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override
public IntValues getValues(String indName, Step step, long startTB, long endTB, Where where, String valueCName,
Function function) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, indName);
List<KeyValues> whereKeyValues = where.getKeyValues();
String op;
switch (function) {
case Avg:
op = "avg";
break;
default:
op = "sum";
}
List<String> ids = new ArrayList<>(20);
StringBuilder whereSql = new StringBuilder();
if (whereKeyValues.size() > 0) {
whereSql.append("(");
for (int i = 0; i < whereKeyValues.size(); i++) {
if (i != 0) {
whereSql.append(" or ");
}
KeyValues keyValues = whereKeyValues.get(i);
StringBuilder valueCollection = new StringBuilder();
List<String> values = keyValues.getValues();
for (int valueIdx = 0; valueIdx < values.size(); valueIdx++) {
if (valueIdx != 0) {
valueCollection.append(",");
}
String id = values.get(valueIdx);
ids.add(id);
valueCollection.append("'").append(id).append("'");
}
whereSql.append(keyValues.getKey()).append(" in (" + valueCollection + ")");
}
whereSql.append(") and ");
}
IntValues intValues = new IntValues();
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet resultSet = h2Client.executeQuery(connection, "select " + Indicator.ENTITY_ID + " id, " + op + "(" + valueCName + ") value from " + tableName
+ " where " + whereSql
+ Indicator.TIME_BUCKET + ">= ? and " + Indicator.TIME_BUCKET + "<=?"
+ " group by " + Indicator.ENTITY_ID,
startTB, endTB)) {
while (resultSet.next()) {
KVInt kv = new KVInt();
kv.setId(resultSet.getString("id"));
kv.setValue(resultSet.getInt("value"));
intValues.getValues().add(kv);
}
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return orderWithDefault0(intValues, ids);
}
@Override public IntValues getLinearIntValues(String indName, Step step, List<String> ids,
String valueCName) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, indName);
StringBuilder idValues = new StringBuilder();
for (int valueIdx = 0; valueIdx < ids.size(); valueIdx++) {
if (valueIdx != 0) {
idValues.append(",");
}
idValues.append("'").append(ids.get(valueIdx)).append("'");
}
IntValues intValues = new IntValues();
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet resultSet = h2Client.executeQuery(connection, "select id, " + valueCName + " from " + tableName + " where id in (" + idValues.toString() + ")")) {
while (resultSet.next()) {
KVInt kv = new KVInt();
kv.setId(resultSet.getString("id"));
kv.setValue(resultSet.getInt(valueCName));
intValues.getValues().add(kv);
}
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return orderWithDefault0(intValues, ids);
}
/**
* Make sure the order is same as the expected order, and keep default value as 0.
*
* @param origin
* @param expectedOrder
* @return
*/
private IntValues orderWithDefault0(IntValues origin, List<String> expectedOrder) {
IntValues intValues = new IntValues();
expectedOrder.forEach(id -> {
KVInt e = new KVInt();
e.setId(id);
e.setValue(origin.findValue(id, 0));
intValues.addKVInt(e);
});
return intValues;
}
@Override public Thermodynamic getThermodynamic(String indName, Step step, List<String> ids,
String valueCName) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, indName);
StringBuilder idValues = new StringBuilder();
for (int valueIdx = 0; valueIdx < ids.size(); valueIdx++) {
if (valueIdx != 0) {
idValues.append(",");
}
idValues.append("'").append(ids.get(valueIdx)).append("'");
}
List<List<Long>> thermodynamicValueCollection = new ArrayList<>();
Map<String, List<Long>> thermodynamicValueMatrix = new HashMap<>();
Connection connection = null;
try {
connection = h2Client.getConnection();
Thermodynamic thermodynamic = new Thermodynamic();
int numOfSteps = 0;
int axisYStep = 0;
try (ResultSet resultSet = h2Client.executeQuery(connection, "select " + ThermodynamicIndicator.STEP + " step, "
+ ThermodynamicIndicator.NUM_OF_STEPS + " num_of_steps, "
+ ThermodynamicIndicator.DETAIL_GROUP + " detail_group, "
+ "id "
+ " from " + tableName + " where id in (" + idValues.toString() + ")")) {
while (resultSet.next()) {
axisYStep = resultSet.getInt("step");
String id = resultSet.getString("id");
numOfSteps = resultSet.getInt("num_of_steps") + 1;
String value = resultSet.getString("detail_group");
IntKeyLongValueArray intKeyLongValues = new IntKeyLongValueArray(5);
intKeyLongValues.toObject(value);
List<Long> axisYValues = new ArrayList<>();
for (int i = 0; i < numOfSteps; i++) {
axisYValues.add(0L);
}
for (IntKeyLongValue intKeyLongValue : intKeyLongValues) {
axisYValues.set(intKeyLongValue.getKey(), intKeyLongValue.getValue());
}
thermodynamicValueMatrix.put(id, axisYValues);
}
// try to add default values when there is no data in that time bucket.
ids.forEach(id -> {
if (thermodynamicValueMatrix.containsKey(id)) {
thermodynamicValueCollection.add(thermodynamicValueMatrix.get(id));
} else {
thermodynamicValueCollection.add(new ArrayList<>());
}
});
}
thermodynamic.fromMatrixData(thermodynamicValueCollection, numOfSteps);
thermodynamic.setAxisYStep(axisYStep);
return thermodynamic;
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class H2NetworkAddressInventoryCacheDAO extends H2SQLExecutor implements INetworkAddressInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(H2NetworkAddressInventoryCacheDAO.class);
private JDBCHikariCPClient h2Client;
public H2NetworkAddressInventoryCacheDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public int getAddressId(String networkAddress) {
String id = NetworkAddressInventory.buildId(networkAddress);
return getEntityIDByID(h2Client, NetworkAddressInventory.SEQUENCE, NetworkAddressInventory.MODEL_NAME, id);
}
@Override public NetworkAddressInventory get(int addressId) {
try {
return (NetworkAddressInventory)getByColumn(h2Client, NetworkAddressInventory.MODEL_NAME, NetworkAddressInventory.SEQUENCE, addressId, new NetworkAddressInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
/**
* @author wusheng
*/
public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO<SQLExecutor> {
private JDBCHikariCPClient h2Client;
private StorageBuilder<Record> storageBuilder;
public H2RecordDAO(JDBCHikariCPClient h2Client, StorageBuilder<Record> storageBuilder) {
this.h2Client = h2Client;
this.storageBuilder = storageBuilder;
}
@Override public SQLExecutor prepareBatchInsert(String modelName, Record record) throws IOException {
return getInsertExecutor(modelName, record, storageBuilder);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
/**
* @author wusheng
*/
public class H2RegisterDAO extends H2SQLExecutor implements IRegisterDAO {
private final JDBCHikariCPClient h2Client;
private final StorageBuilder<RegisterSource> storageBuilder;
public H2RegisterDAO(JDBCHikariCPClient h2Client,
StorageBuilder<RegisterSource> storageBuilder) {
this.h2Client = h2Client;
this.storageBuilder = storageBuilder;
}
@Override public int max(String modelName) throws IOException {
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT max(sequence) max_id FROM " + modelName)) {
while (rs.next()) {
int maxId = rs.getInt("max_id");
if (maxId == 0) {
return 1;
} else {
return maxId;
}
}
}
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
} catch (JDBCClientException e) {
throw new IOException(e.getMessage(), e);
} finally {
h2Client.close(connection);
}
return Const.NONE;
}
@Override public RegisterSource get(String modelName, String id) throws IOException {
return (RegisterSource)getByID(h2Client, modelName, id, storageBuilder);
}
@Override public void forceInsert(String modelName, RegisterSource source) throws IOException {
try (Connection connection = h2Client.getConnection()) {
getInsertExecutor(modelName, source, storageBuilder).invoke(connection);
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
} catch (JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
@Override public void forceUpdate(String modelName, RegisterSource source) throws IOException {
try (Connection connection = h2Client.getConnection()) {
getUpdateExecutor(modelName, source, storageBuilder).invoke(connection);
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
} catch (JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
/**
* No need to create any lock table. In SQL based database, could use `select... for update` to avoid lock table.
*
* @author wusheng
*/
public class H2RegisterLockDAO implements IRegisterLockDAO {
@Override public boolean tryLock(Scope scope) {
return true;
}
@Override public void releaseLock(Scope scope) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class H2SQLExecutor {
private static final Logger logger = LoggerFactory.getLogger(H2SQLExecutor.class);
protected StorageData getByID(JDBCHikariCPClient h2Client, String modelName, String id,
StorageBuilder storageBuilder) throws IOException {
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE id = ?", id)) {
return toStorageData(rs, modelName, storageBuilder);
}
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
} catch (JDBCClientException e) {
throw new IOException(e.getMessage(), e);
} finally {
h2Client.close(connection);
}
}
protected StorageData getByColumn(JDBCHikariCPClient h2Client, String modelName, String columnName, Object value,
StorageBuilder storageBuilder) throws IOException {
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE " + columnName + " = ?", value)) {
return toStorageData(rs, modelName, storageBuilder);
}
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
} catch (JDBCClientException e) {
throw new IOException(e.getMessage(), e);
} finally {
h2Client.close(connection);
}
}
protected StorageData toStorageData(ResultSet rs, String modelName,
StorageBuilder storageBuilder) throws SQLException {
while (rs.next()) {
Map data = new HashMap();
List<ModelColumn> columns = TableMetaInfo.get(modelName).getColumns();
for (ModelColumn column : columns) {
data.put(column.getColumnName().getName(), rs.getObject(column.getColumnName().getName()));
}
return storageBuilder.map2Data(data);
}
return null;
}
protected int getEntityIDByID(JDBCHikariCPClient h2Client, String entityColumnName, String modelName, String id) {
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT " + entityColumnName + " FROM " + modelName + " WHERE ID=?", id)) {
while (rs.next()) {
return rs.getInt(ServiceInstanceInventory.SEQUENCE);
}
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
} catch (JDBCClientException e) {
logger.error(e.getMessage(), e);
} finally {
h2Client.close(connection);
}
return Const.NONE;
}
protected SQLExecutor getInsertExecutor(String modelName, StorageData indicator,
StorageBuilder storageBuilder) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(indicator);
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " VALUES");
List<ModelColumn> columns = TableMetaInfo.get(modelName).getColumns();
List<Object> param = new ArrayList<>();
sqlBuilder.append("(?,");
param.add(indicator.id());
for (int i = 0; i < columns.size(); i++) {
ModelColumn column = columns.get(i);
sqlBuilder.append("?");
if (i != columns.size() - 1) {
sqlBuilder.append(",");
}
Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataType) {
param.add(((StorageDataType)value).toStorageData());
} else {
param.add(value);
}
}
sqlBuilder.append(")");
return new SQLExecutor(sqlBuilder.toString(), param);
}
protected SQLExecutor getUpdateExecutor(String modelName, StorageData indicator,
StorageBuilder storageBuilder) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(indicator);
SQLBuilder sqlBuilder = new SQLBuilder("UPDATE " + modelName + " SET ");
List<ModelColumn> columns = TableMetaInfo.get(modelName).getColumns();
List<Object> param = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
ModelColumn column = columns.get(i);
sqlBuilder.append(column.getColumnName().getName() + "= ?");
if (i != columns.size() - 1) {
sqlBuilder.append(",");
}
Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataType) {
param.add(((StorageDataType)value).toStorageData());
} else {
param.add(value);
}
}
sqlBuilder.append(" WHERE id = ?");
param.add(indicator.id());
return new SQLExecutor(sqlBuilder.toString(), param);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class H2ServiceInstanceInventoryCacheDAO extends H2SQLExecutor implements IServiceInstanceInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(H2ServiceInstanceInventoryCacheDAO.class);
private JDBCHikariCPClient h2Client;
public H2ServiceInstanceInventoryCacheDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public ServiceInstanceInventory get(int serviceInstanceId) {
try {
return (ServiceInstanceInventory)getByColumn(h2Client, ServiceInstanceInventory.MODEL_NAME, ServiceInstanceInventory.SEQUENCE, serviceInstanceId, new ServiceInstanceInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
}
}
@Override public int getServiceInstanceId(int serviceId, String serviceInstanceName) {
String id = ServiceInstanceInventory.buildId(serviceId, serviceInstanceName);
return getByID(id);
}
@Override public int getServiceInstanceId(int serviceId, int addressId) {
String id = ServiceInstanceInventory.buildId(serviceId, addressId);
return getByID(id);
}
private int getByID(String id) {
return getEntityIDByID(h2Client, ServiceInstanceInventory.SEQUENCE, ServiceInstanceInventory.MODEL_NAME, id);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class H2ServiceInventoryCacheDAO extends H2SQLExecutor implements IServiceInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(H2ServiceInventoryCacheDAO.class);
private JDBCHikariCPClient h2Client;
public H2ServiceInventoryCacheDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public int getServiceId(String serviceName) {
String id = ServiceInventory.buildId(serviceName);
return getEntityIDByID(h2Client, ServiceInventory.SEQUENCE, ServiceInventory.MODEL_NAME, id);
}
@Override public int getServiceId(int addressId) {
String id = ServiceInventory.buildId(addressId);
return getEntityIDByID(h2Client, ServiceInventory.SEQUENCE, ServiceInventory.MODEL_NAME, id);
}
@Override public ServiceInventory get(int serviceId) {
try {
return (ServiceInventory)getByColumn(h2Client, ServiceInventory.MODEL_NAME, ServiceInventory.SEQUENCE, serviceId, new ServiceInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
}
}
@Override public List<ServiceInventory> loadLastMappingUpdate() {
List<ServiceInventory> serviceInventories = new ArrayList<>();
try {
StringBuilder sql = new StringBuilder("select * from ");
sql.append(ServiceInventory.MODEL_NAME);
sql.append(" where ").append(ServiceInventory.IS_ADDRESS).append("=? ");
sql.append(" and ").append(ServiceInventory.MAPPING_LAST_UPDATE_TIME).append(">?");
sql.append(" LIMIT 50 ");
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), BooleanUtils.TRUE, System.currentTimeMillis() - 10000)) {
while (resultSet.next()) {
ServiceInventory serviceInventory = (ServiceInventory)toStorageData(resultSet, ServiceInventory.MODEL_NAME, new ServiceInventory.Builder());
if (serviceInventory != null) {
serviceInventories.add(serviceInventory);
}
}
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
} catch (Throwable e) {
logger.error(e.getMessage());
}
return serviceInventories;
}
}
......@@ -16,58 +16,37 @@
*
*/
package org.apache.skywalking.oap.server.library.client.shardingjdbc;
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
/**
* @author linjiaqi
* @author wusheng
*/
public class ShardingjdbcClientConfig {
public class H2StorageDAO implements StorageDAO {
private JDBCHikariCPClient h2Client;
private String driverClass;
private String url;
private String userName;
private String password;
public ShardingjdbcClientConfig() {
}
public ShardingjdbcClientConfig(String driverClass, String url, String username, String password) {
this.driverClass = driverClass;
this.url = url;
this.userName = username;
this.password = password;
}
public String getDriverClass() {
return driverClass;
}
public void setDriverClass(String driverClass) {
this.driverClass = driverClass;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUserName() {
return userName;
public H2StorageDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
public void setUserName(String userName) {
this.userName = userName;
@Override public IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator> storageBuilder) {
return new H2IndicatorDAO(h2Client, storageBuilder);
}
public String getPassword() {
return password;
@Override public IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder) {
return new H2RegisterDAO(h2Client, storageBuilder);
}
public void setPassword(String password) {
this.password = password;
@Override public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
return new H2RecordDAO(h2Client, storageBuilder);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class H2TableInstaller extends ModelInstaller {
private static final Logger logger = LoggerFactory.getLogger(H2TableInstaller.class);
public H2TableInstaller(ModuleManager moduleManager) {
super(moduleManager);
}
@Override protected boolean isExists(Client client, Model model) throws StorageException {
JDBCHikariCPClient h2Client = (JDBCHikariCPClient)client;
try (Connection conn = h2Client.getConnection()) {
try (ResultSet rset = conn.getMetaData().getTables(null, null, model.getName(), null)) {
if (rset.next()) {
return true;
}
}
} catch (SQLException e) {
throw new StorageException(e.getMessage(), e);
} catch (JDBCClientException e) {
throw new StorageException(e.getMessage(), e);
}
return false;
}
@Override protected void columnCheck(Client client, Model model) throws StorageException {
}
@Override protected void deleteTable(Client client, Model model) throws StorageException {
}
@Override protected void createTable(Client client, Model model) throws StorageException {
TableMetaInfo.addModel(model);
JDBCHikariCPClient h2Client = (JDBCHikariCPClient)client;
SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + model.getName() + " (");
tableCreateSQL.appendLine("id VARCHAR2(300), ");
for (int i = 0; i < model.getColumns().size(); i++) {
ModelColumn column = model.getColumns().get(i);
ColumnName name = column.getColumnName();
tableCreateSQL.appendLine(name.getName() + " " + getColumnType(column.getType()) + (i != model.getColumns().size() - 1 ? "," : ""));
}
tableCreateSQL.appendLine(")");
if (logger.isDebugEnabled()) {
logger.debug("creating table: " + tableCreateSQL.toStringInNewLine());
}
Connection connection = null;
try {
connection = h2Client.getConnection();
h2Client.execute(connection, tableCreateSQL.toString());
} catch (JDBCClientException e) {
throw new StorageException(e.getMessage(), e);
} finally {
h2Client.close(connection);
}
}
private String getColumnType(Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
return "INT";
} else if (Long.class.equals(type) || long.class.equals(type)) {
return "BIGINT";
} else if (Double.class.equals(type) || double.class.equals(type)) {
return "DOUBLE";
} else if (String.class.equals(type)) {
return "VARCHAR(2000)";
} else if (IntKeyLongValueArray.class.equals(type)) {
return "VARCHAR(20000)";
} else if (byte[].class.equals(type)) {
return "VARCHAR(20000)";
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointRelationServerSideIndicator;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceRelationClientSideIndicator;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceRelationServerSideIndicator;
import org.apache.skywalking.oap.server.core.query.entity.Call;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
/**
* @author wusheng
*/
public class H2TopologyQueryDAO implements ITopologyQueryDAO {
private JDBCHikariCPClient h2Client;
public H2TopologyQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds, true);
}
@Override public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds, false);
}
@Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, new ArrayList<>(0), false);
}
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, new ArrayList<>(0), true);
}
@Override public List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, EndpointRelationServerSideIndicator.INDEX_NAME);
return loadEndpointFromSide(tableName, startTB, endTB, EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, destEndpointId, false);
}
private List<Call> loadServiceCalls(String tableName, long startTB, long endTB, String sourceCName,
String destCName, List<Integer> serviceIds, boolean isClientSide) throws IOException {
Object[] conditions = new Object[serviceIds.size() * 2 + 2];
conditions[0] = startTB;
conditions[1] = endTB;
StringBuilder serviceIdMatchSql = new StringBuilder();
if (serviceIds.size() > 0) {
serviceIdMatchSql.append("and (");
for (int i = 0; i < serviceIds.size(); i++) {
serviceIdMatchSql.append(sourceCName + "=? or " + destCName + "=? ");
conditions[i * 2 + 2] = serviceIds.get(i);
conditions[i * 2 + 1 + 2] = serviceIds.get(i);
if (i != serviceIds.size() - 1) {
serviceIdMatchSql.append("or ");
}
}
serviceIdMatchSql.append(")");
}
List<Call> calls = new ArrayList<>();
Connection connection = null;
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Indicator.ENTITY_ID
+ " component_id from " + tableName + " where "
+ Indicator.TIME_BUCKET + ">= ? and " + Indicator.TIME_BUCKET + "<=? "
+ serviceIdMatchSql.toString()
+ " group by " + Indicator.ENTITY_ID,
conditions);
buildCalls(resultSet, calls, isClientSide);
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return calls;
}
private List<Call> loadEndpointFromSide(String tableName, long startTB, long endTB, String sourceCName,
String destCName, int id, boolean isSourceId) throws IOException {
Object[] conditions = new Object[3];
conditions[0] = startTB;
conditions[1] = endTB;
conditions[2] = id;
Connection connection = null;
List<Call> calls = new ArrayList<>();
try {
connection = h2Client.getConnection();
ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Indicator.ENTITY_ID
+ " from " + tableName + " where "
+ Indicator.TIME_BUCKET + ">= ? and " + Indicator.TIME_BUCKET + "<=? and "
+ (isSourceId ? sourceCName : destCName) + "=?"
+ " group by " + Indicator.ENTITY_ID,
conditions);
buildCalls(resultSet, calls, isSourceId);
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return calls;
}
private void buildCalls(ResultSet resultSet, List<Call> calls, boolean isClientSide) throws SQLException {
while (resultSet.next()) {
Call call = new Call();
String entityId = resultSet.getString(Indicator.ENTITY_ID);
Integer[] entityIds = ServiceRelation.splitEntityId(entityId);
call.setSource(entityIds[0]);
call.setTarget(entityIds[1]);
call.setComponentId(entityIds[2]);
call.setDetectPoint(isClientSide ? DetectPoint.CLIENT : DetectPoint.SERVER);
call.setId(entityId);
calls.add(call);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.entity.BasicTrace;
import org.apache.skywalking.oap.server.core.query.entity.QueryOrder;
import org.apache.skywalking.oap.server.core.query.entity.TraceBrief;
import org.apache.skywalking.oap.server.core.query.entity.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.StringUtils;
import org.elasticsearch.search.sort.SortOrder;
/**
* @author wusheng
*/
public class H2TraceQueryDAO implements ITraceQueryDAO {
private JDBCHikariCPClient h2Client;
public H2TraceQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
String endpointName, int serviceId, int endpointId, String traceId, int limit, int from, TraceState traceState,
QueryOrder queryOrder) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> parameters = new ArrayList<>(10);
sql.append("from ").append(SegmentRecord.INDEX_NAME).append(" where ");
sql.append(" 1=1 ");
if (startSecondTB != 0 && endSecondTB != 0) {
sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
parameters.add(startSecondTB);
sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
parameters.add(endSecondTB);
}
if (minDuration != 0 || maxDuration != 0) {
if (minDuration != 0) {
sql.append(" and ").append(SegmentRecord.LATENCY).append(" >= ?");
parameters.add(minDuration);
}
if (maxDuration != 0) {
sql.append(" and ").append(SegmentRecord.LATENCY).append(" <= ?");
parameters.add(maxDuration);
}
}
if (StringUtils.isNotEmpty(endpointName)) {
sql.append(" and ").append(SegmentRecord.ENDPOINT_NAME).append(" like '%" + endpointName + "%'");
}
if (serviceId != 0) {
sql.append(" and ").append(SegmentRecord.SERVICE_ID).append(" = ?");
parameters.add(serviceId);
}
if (endpointId != 0) {
sql.append(" and ").append(SegmentRecord.ENDPOINT_ID).append(" = ?");
parameters.add(endpointId);
}
if (StringUtils.isNotEmpty(traceId)) {
sql.append(" and ").append(SegmentRecord.TRACE_ID).append(" = ?");
parameters.add(traceId);
}
switch (traceState) {
case ERROR:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE);
break;
case SUCCESS:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.FALSE);
break;
}
switch (queryOrder) {
case BY_START_TIME:
sql.append(" order by ").append(SegmentRecord.START_TIME).append(" ").append(SortOrder.DESC);
break;
case BY_DURATION:
sql.append(" order by ").append(SegmentRecord.LATENCY).append(" ").append(SortOrder.DESC);
break;
}
sql.append(" LIMIT ").append(limit);
sql.append(" OFFSET ").append(from);
TraceBrief traceBrief = new TraceBrief();
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet resultSet = h2Client.executeQuery(connection, "select count(1) total from (select 1 " + sql.toString() + " )", parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
traceBrief.setTotal(resultSet.getInt("total"));
}
}
try (ResultSet resultSet = h2Client.executeQuery(connection, "select * " + sql.toString(), parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(resultSet.getString(SegmentRecord.SEGMENT_ID));
basicTrace.setStart(resultSet.getString(SegmentRecord.START_TIME));
basicTrace.getEndpointNames().add(resultSet.getString(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(resultSet.getInt(SegmentRecord.LATENCY));
basicTrace.setError(BooleanUtils.valueToBoolean(resultSet.getInt(SegmentRecord.IS_ERROR)));
String traceIds = resultSet.getString(SegmentRecord.TRACE_ID);
basicTrace.getTraceIds().add(traceIds);
traceBrief.getTraces().add(basicTrace);
}
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return traceBrief;
}
@Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
List<SegmentRecord> segmentRecords = new ArrayList<>();
Connection connection = null;
try {
connection = h2Client.getConnection();
try (ResultSet resultSet = h2Client.executeQuery(connection, "select * from " + SegmentRecord.INDEX_NAME + " where " + SegmentRecord.TRACE_ID + " = ?", traceId)) {
while (resultSet.next()) {
SegmentRecord segmentRecord = new SegmentRecord();
segmentRecord.setSegmentId(resultSet.getString(SegmentRecord.SEGMENT_ID));
segmentRecord.setTraceId(resultSet.getString(SegmentRecord.TRACE_ID));
segmentRecord.setServiceId(resultSet.getInt(SegmentRecord.SERVICE_ID));
segmentRecord.setEndpointName(resultSet.getString(SegmentRecord.ENDPOINT_NAME));
segmentRecord.setStartTime(resultSet.getLong(SegmentRecord.START_TIME));
segmentRecord.setEndTime(resultSet.getLong(SegmentRecord.END_TIME));
segmentRecord.setLatency(resultSet.getInt(SegmentRecord.LATENCY));
segmentRecord.setIsError(resultSet.getInt(SegmentRecord.IS_ERROR));
String dataBinaryBase64 = resultSet.getString(SegmentRecord.DATA_BINARY);
if (StringUtils.isNotEmpty(dataBinaryBase64)) {
segmentRecord.setDataBinary(Base64.getDecoder().decode(dataBinaryBase64));
}
segmentRecords.add(segmentRecord);
}
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
h2Client.close(connection);
}
return segmentRecords;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider
\ No newline at end of file
Subproject commit 0358e04573c1028a9abed82fbd0f73e9cb9de904
Subproject commit c528b35c8bb78fea8cb87c160abf422029d59b1b
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册