Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
05764660
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
05764660
编写于
9月 21, 2018
作者:
彭
彭勇升 pengys
提交者:
wu-sheng
9月 21, 2018
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Implementation of metadata query. (#1686)
上级
99fdf02b
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
282 addition
and
40 deletion
+282
-40
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
...ava/org/apache/skywalking/oap/server/core/CoreModule.java
+1
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
...apache/skywalking/oap/server/core/CoreModuleProvider.java
+1
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/DurationUtils.java
...pache/skywalking/oap/server/core/query/DurationUtils.java
+16
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
...kywalking/oap/server/core/query/MetadataQueryService.java
+15
-7
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
...kywalking/oap/server/core/register/EndpointInventory.java
+1
-1
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
...ing/oap/server/core/register/NetworkAddressInventory.java
+12
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
...e/skywalking/oap/server/core/register/RegisterSource.java
+2
-2
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
...skywalking/oap/server/core/register/ServiceInventory.java
+2
-2
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/INetworkAddressInventoryRegister.java
...re/register/service/INetworkAddressInventoryRegister.java
+2
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
...ore/register/service/NetworkAddressInventoryRegister.java
+21
-1
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
...che/skywalking/oap/server/core/storage/StorageModule.java
+1
-1
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
...king/oap/server/core/storage/query/IMetadataQueryDAO.java
+11
-0
oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQuery.java
.../skywalking/oap/query/graphql/resolver/MetadataQuery.java
+25
-23
oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
...race/provider/parser/standardization/SpanIdExchanger.java
+3
-3
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
...gin/elasticsearch/StorageModuleElasticsearchProvider.java
+1
-0
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
...torage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+168
-0
未找到文件。
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
浏览文件 @
05764660
...
...
@@ -60,6 +60,7 @@ public class CoreModule extends ModuleDefine {
classes
.
add
(
TopologyQueryService
.
class
);
classes
.
add
(
MetricQueryService
.
class
);
classes
.
add
(
TraceQueryService
.
class
);
classes
.
add
(
MetadataQueryService
.
class
);
}
private
void
addServerInterface
(
List
<
Class
>
classes
)
{
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
浏览文件 @
05764660
...
...
@@ -113,6 +113,7 @@ public class CoreModuleProvider extends ModuleProvider {
this
.
registerServiceImplementation
(
TopologyQueryService
.
class
,
new
TopologyQueryService
(
getManager
()));
this
.
registerServiceImplementation
(
MetricQueryService
.
class
,
new
MetricQueryService
(
getManager
()));
this
.
registerServiceImplementation
(
TraceQueryService
.
class
,
new
TraceQueryService
(
getManager
()));
this
.
registerServiceImplementation
(
MetadataQueryService
.
class
,
new
MetadataQueryService
(
getManager
()));
annotationScan
.
registerListener
(
storageAnnotationListener
);
annotationScan
.
registerListener
(
streamAnnotationListener
);
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/DurationUtils.java
浏览文件 @
05764660
...
...
@@ -224,4 +224,20 @@ public enum DurationUtils {
return
durations
;
}
public
long
toTimestamp
(
Step
step
,
String
dateStr
)
throws
ParseException
{
switch
(
step
)
{
case
MONTH:
return
new
SimpleDateFormat
(
"yyyy-MM"
).
parse
(
dateStr
).
getTime
();
case
DAY:
return
new
SimpleDateFormat
(
"yyyy-MM-dd"
).
parse
(
dateStr
).
getTime
();
case
HOUR:
return
new
SimpleDateFormat
(
"yyyy-MM-dd HH"
).
parse
(
dateStr
).
getTime
();
case
MINUTE:
return
new
SimpleDateFormat
(
"yyyy-MM-dd HHmm"
).
parse
(
dateStr
).
getTime
();
case
SECOND:
return
new
SimpleDateFormat
(
"yyyy-MM-dd HHmmss"
).
parse
(
dateStr
).
getTime
();
}
throw
new
UnexpectedException
(
"Unsupported step "
+
step
.
name
());
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
浏览文件 @
05764660
...
...
@@ -18,7 +18,9 @@
package
org.apache.skywalking.oap.server.core.query
;
import
java.io.IOException
;
import
java.util.*
;
import
org.apache.skywalking.apm.network.language.agent.SpanLayer
;
import
org.apache.skywalking.oap.server.core.query.entity.*
;
import
org.apache.skywalking.oap.server.core.storage.StorageModule
;
import
org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO
;
...
...
@@ -43,19 +45,25 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
return
metadataQueryDAO
;
}
public
ClusterBrief
getGlobalBrief
(
final
Step
step
,
final
long
startTB
,
final
long
endTB
)
{
return
new
ClusterBrief
();
public
ClusterBrief
getGlobalBrief
(
final
long
startTimestamp
,
final
long
endTimestamp
)
throws
IOException
{
ClusterBrief
clusterBrief
=
new
ClusterBrief
();
clusterBrief
.
setNumOfService
(
getMetadataQueryDAO
().
numOfService
(
startTimestamp
,
endTimestamp
));
clusterBrief
.
setNumOfEndpoint
(
getMetadataQueryDAO
().
numOfEndpoint
(
startTimestamp
,
endTimestamp
));
clusterBrief
.
setNumOfDatabase
(
getMetadataQueryDAO
().
numOfConjectural
(
startTimestamp
,
endTimestamp
,
SpanLayer
.
Database_VALUE
));
clusterBrief
.
setNumOfCache
(
getMetadataQueryDAO
().
numOfConjectural
(
startTimestamp
,
endTimestamp
,
SpanLayer
.
Cache_VALUE
));
clusterBrief
.
setNumOfMQ
(
getMetadataQueryDAO
().
numOfConjectural
(
startTimestamp
,
endTimestamp
,
SpanLayer
.
MQ_VALUE
));
return
clusterBrief
;
}
public
List
<
Service
>
getAllServices
(
final
Step
step
,
final
long
startTB
,
final
long
endTB
)
{
return
Collections
.
emptyList
(
);
public
List
<
Service
>
getAllServices
(
final
long
startTimestamp
,
final
long
endTimestamp
)
throws
IOException
{
return
getMetadataQueryDAO
().
getAllServices
(
startTimestamp
,
endTimestamp
);
}
public
List
<
Service
>
searchServices
(
final
Step
step
,
final
long
startTB
,
final
long
endTB
,
final
String
keyword
)
{
public
List
<
Service
>
searchServices
(
final
long
startTimestamp
,
final
long
endTimestamp
,
final
String
keyword
)
{
return
Collections
.
emptyList
();
}
public
List
<
ServiceInstance
>
getServiceInstances
(
final
Step
step
,
final
long
startTB
,
final
long
endTB
,
public
List
<
ServiceInstance
>
getServiceInstances
(
final
long
startTimestamp
,
final
long
endTimestamp
,
final
String
id
)
{
return
Collections
.
emptyList
();
}
...
...
@@ -64,7 +72,7 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
return
Collections
.
emptyList
();
}
public
Service
searchService
(
final
Step
step
,
final
long
startTB
,
final
long
endTB
,
final
String
serviceCode
)
{
public
Service
searchService
(
final
long
startTimestamp
,
final
long
endTimestamp
,
final
String
serviceCode
)
{
return
new
Service
();
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
浏览文件 @
05764660
...
...
@@ -40,7 +40,7 @@ public class EndpointInventory extends RegisterSource {
private
static
final
String
SERVICE_ID
=
"service_id"
;
private
static
final
String
NAME
=
"name"
;
p
rivate
static
final
String
DETECT_POINT
=
"detect_point"
;
p
ublic
static
final
String
DETECT_POINT
=
"detect_point"
;
@Setter
@Getter
@Column
(
columnName
=
SERVICE_ID
)
private
int
serviceId
;
@Setter
@Getter
@Column
(
columnName
=
NAME
,
matchQuery
=
true
)
private
String
name
=
Const
.
EMPTY_STRING
;
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
浏览文件 @
05764660
...
...
@@ -39,8 +39,12 @@ public class NetworkAddressInventory extends RegisterSource {
public
static
final
String
MODEL_NAME
=
"network_address_inventory"
;
private
static
final
String
NAME
=
"name"
;
public
static
final
String
SRC_LAYER
=
"src_layer"
;
private
static
final
String
SERVER_TYPE
=
"server_type"
;
@Setter
@Getter
@Column
(
columnName
=
NAME
,
matchQuery
=
true
)
private
String
name
=
Const
.
EMPTY_STRING
;
@Setter
@Getter
@Column
(
columnName
=
SRC_LAYER
)
private
int
srcLayer
;
@Setter
@Getter
@Column
(
columnName
=
SERVER_TYPE
)
private
int
serverType
;
public
static
String
buildId
(
String
networkAddress
)
{
return
networkAddress
;
...
...
@@ -74,6 +78,8 @@ public class NetworkAddressInventory extends RegisterSource {
@Override
public
RemoteData
.
Builder
serialize
()
{
RemoteData
.
Builder
remoteBuilder
=
RemoteData
.
newBuilder
();
remoteBuilder
.
setDataIntegers
(
0
,
getSequence
());
remoteBuilder
.
setDataIntegers
(
1
,
getSrcLayer
());
remoteBuilder
.
setDataIntegers
(
2
,
getServerType
());
remoteBuilder
.
setDataLongs
(
0
,
getRegisterTime
());
remoteBuilder
.
setDataLongs
(
1
,
getHeartbeatTime
());
...
...
@@ -84,6 +90,8 @@ public class NetworkAddressInventory extends RegisterSource {
@Override
public
void
deserialize
(
RemoteData
remoteData
)
{
setSequence
(
remoteData
.
getDataIntegers
(
0
));
setSrcLayer
(
remoteData
.
getDataIntegers
(
1
));
setServerType
(
remoteData
.
getDataIntegers
(
2
));
setRegisterTime
(
remoteData
.
getDataLongs
(
0
));
setHeartbeatTime
(
remoteData
.
getDataLongs
(
1
));
...
...
@@ -101,6 +109,8 @@ public class NetworkAddressInventory extends RegisterSource {
NetworkAddressInventory
inventory
=
new
NetworkAddressInventory
();
inventory
.
setSequence
((
Integer
)
dbMap
.
get
(
SEQUENCE
));
inventory
.
setName
((
String
)
dbMap
.
get
(
NAME
));
inventory
.
setSrcLayer
((
Integer
)
dbMap
.
get
(
SRC_LAYER
));
inventory
.
setServerType
((
Integer
)
dbMap
.
get
(
SERVER_TYPE
));
inventory
.
setRegisterTime
((
Long
)
dbMap
.
get
(
REGISTER_TIME
));
inventory
.
setHeartbeatTime
((
Long
)
dbMap
.
get
(
HEARTBEAT_TIME
));
return
inventory
;
...
...
@@ -110,6 +120,8 @@ public class NetworkAddressInventory extends RegisterSource {
Map
<
String
,
Object
>
map
=
new
HashMap
<>();
map
.
put
(
SEQUENCE
,
storageData
.
getSequence
());
map
.
put
(
NAME
,
storageData
.
getName
());
map
.
put
(
SRC_LAYER
,
storageData
.
getSrcLayer
());
map
.
put
(
SERVER_TYPE
,
storageData
.
getServerType
());
map
.
put
(
REGISTER_TIME
,
storageData
.
getRegisterTime
());
map
.
put
(
HEARTBEAT_TIME
,
storageData
.
getHeartbeatTime
());
return
map
;
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
浏览文件 @
05764660
...
...
@@ -29,8 +29,8 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
public
abstract
class
RegisterSource
extends
StreamData
implements
StorageData
{
public
static
final
String
SEQUENCE
=
"sequence"
;
p
rotected
static
final
String
REGISTER_TIME
=
"register_time"
;
p
rotected
static
final
String
HEARTBEAT_TIME
=
"heartbeat_time"
;
p
ublic
static
final
String
REGISTER_TIME
=
"register_time"
;
p
ublic
static
final
String
HEARTBEAT_TIME
=
"heartbeat_time"
;
@Getter
@Setter
@Column
(
columnName
=
SEQUENCE
)
private
int
sequence
;
@Getter
@Setter
@Column
(
columnName
=
REGISTER_TIME
)
private
long
registerTime
;
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
浏览文件 @
05764660
...
...
@@ -39,8 +39,8 @@ public class ServiceInventory extends RegisterSource {
public
static
final
String
MODEL_NAME
=
"service_inventory"
;
p
rivate
static
final
String
NAME
=
"name"
;
p
rivate
static
final
String
IS_ADDRESS
=
"is_address"
;
p
ublic
static
final
String
NAME
=
"name"
;
p
ublic
static
final
String
IS_ADDRESS
=
"is_address"
;
private
static
final
String
ADDRESS_ID
=
"address_id"
;
@Setter
@Getter
@Column
(
columnName
=
NAME
,
matchQuery
=
true
)
private
String
name
=
Const
.
EMPTY_STRING
;
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/INetworkAddressInventoryRegister.java
浏览文件 @
05764660
...
...
@@ -27,4 +27,6 @@ public interface INetworkAddressInventoryRegister extends Service {
int
getOrCreate
(
String
networkAddress
);
int
get
(
String
networkAddress
);
void
update
(
int
addressId
,
int
srcLayer
,
int
serverType
);
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
浏览文件 @
05764660
...
...
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import
org.apache.skywalking.oap.server.core.register.worker.InventoryProcess
;
import
org.apache.skywalking.oap.server.library.module.ModuleManager
;
import
static
java
.
util
.
Objects
.
isNull
;
import
static
java
.
util
.
Objects
.
*
;
/**
* @author peng-yongsheng
...
...
@@ -91,4 +91,24 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
@Override
public
int
get
(
String
networkAddress
)
{
return
getNetworkAddressInventoryCache
().
getAddressId
(
networkAddress
);
}
@Override
public
void
update
(
int
addressId
,
int
srcLayer
,
int
serverType
)
{
if
(!
this
.
compare
(
addressId
,
srcLayer
,
serverType
))
{
NetworkAddressInventory
newNetworkAddress
=
getNetworkAddressInventoryCache
().
get
(
addressId
);
newNetworkAddress
.
setSrcLayer
(
srcLayer
);
newNetworkAddress
.
setServerType
(
serverType
);
newNetworkAddress
.
setHeartbeatTime
(
System
.
currentTimeMillis
());
InventoryProcess
.
INSTANCE
.
in
(
newNetworkAddress
);
}
}
private
boolean
compare
(
int
addressId
,
int
srcLayer
,
int
serverType
)
{
NetworkAddressInventory
networkAddress
=
getNetworkAddressInventoryCache
().
get
(
addressId
);
if
(
nonNull
(
networkAddress
))
{
return
srcLayer
==
networkAddress
.
getSrcLayer
()
&&
serverType
==
networkAddress
.
getServerType
();
}
return
true
;
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
浏览文件 @
05764660
...
...
@@ -38,6 +38,6 @@ public class StorageModule extends ModuleDefine {
IBatchDAO
.
class
,
StorageDAO
.
class
,
IRegisterLockDAO
.
class
,
IServiceInventoryCacheDAO
.
class
,
IServiceInstanceInventoryCacheDAO
.
class
,
IEndpointInventoryCacheDAO
.
class
,
INetworkAddressInventoryCacheDAO
.
class
,
ITopologyQueryDAO
.
class
,
IMetricQueryDAO
.
class
,
ITraceQueryDAO
.
class
};
ITopologyQueryDAO
.
class
,
IMetricQueryDAO
.
class
,
ITraceQueryDAO
.
class
,
IMetadataQueryDAO
.
class
};
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
浏览文件 @
05764660
...
...
@@ -18,10 +18,21 @@
package
org.apache.skywalking.oap.server.core.storage.query
;
import
java.io.IOException
;
import
java.util.List
;
import
org.apache.skywalking.oap.server.core.query.entity.Service
;
import
org.apache.skywalking.oap.server.core.storage.DAO
;
/**
* @author peng-yongsheng
*/
public
interface
IMetadataQueryDAO
extends
DAO
{
int
numOfService
(
final
long
startTimestamp
,
final
long
endTimestamp
)
throws
IOException
;
int
numOfEndpoint
(
final
long
startTimestamp
,
final
long
endTimestamp
)
throws
IOException
;
int
numOfConjectural
(
final
long
startTimestamp
,
final
long
endTimestamp
,
final
int
srcLayer
)
throws
IOException
;
List
<
Service
>
getAllServices
(
final
long
startTimestamp
,
final
long
endTimestamp
)
throws
IOException
;
}
oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQuery.java
浏览文件 @
05764660
...
...
@@ -19,6 +19,8 @@
package
org.apache.skywalking.oap.query.graphql.resolver
;
import
com.coxautodev.graphql.tools.GraphQLQueryResolver
;
import
java.io.IOException
;
import
java.text.ParseException
;
import
java.util.List
;
import
org.apache.skywalking.oap.query.graphql.type.Duration
;
import
org.apache.skywalking.oap.server.core.CoreModule
;
...
...
@@ -42,42 +44,42 @@ public class MetadataQuery implements GraphQLQueryResolver {
return
metadataQueryService
;
}
public
ClusterBrief
getGlobalBrief
(
final
Duration
duration
)
{
long
startTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getStart
());
long
endTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getEnd
());
public
ClusterBrief
getGlobalBrief
(
final
Duration
duration
)
throws
IOException
,
ParseException
{
long
startTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getStart
());
long
endTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getEnd
());
return
getMetadataQueryService
().
getGlobalBrief
(
duration
.
getStep
(),
startTimeBucket
,
endTimeBucket
);
return
getMetadataQueryService
().
getGlobalBrief
(
startTimestamp
,
endTimestamp
);
}
public
List
<
Service
>
getAllServices
(
final
Duration
duration
)
{
long
startTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getStart
());
long
endTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getEnd
());
public
List
<
Service
>
getAllServices
(
final
Duration
duration
)
throws
IOException
,
ParseException
{
long
startTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getStart
());
long
endTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getEnd
());
return
getMetadataQueryService
().
getAllServices
(
duration
.
getStep
(),
startTimeBucket
,
endTimeBucket
);
return
getMetadataQueryService
().
getAllServices
(
startTimestamp
,
endTimestamp
);
}
public
List
<
Service
>
searchServices
(
final
Duration
duration
,
final
String
keyword
)
{
long
startTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getStart
());
long
endTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getEnd
());
public
List
<
Service
>
searchServices
(
final
Duration
duration
,
final
String
keyword
)
throws
ParseException
{
long
startTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getStart
());
long
endTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getEnd
());
return
getMetadataQueryService
().
searchServices
(
duration
.
getStep
(),
startTimeBucket
,
endTimeBucket
,
keyword
);
return
getMetadataQueryService
().
searchServices
(
startTimestamp
,
endTimestamp
,
keyword
);
}
public
List
<
ServiceInstance
>
getServiceInstances
(
final
Duration
duration
,
final
String
id
)
{
long
startTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getStart
());
long
endTime
Bucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getEnd
());
public
Service
searchService
(
final
Duration
duration
,
final
String
serviceCode
)
throws
ParseException
{
long
startTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getStart
());
long
endTime
stamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getEnd
());
return
getMetadataQueryService
().
getServiceInstances
(
duration
.
getStep
(),
startTimeBucket
,
endTimeBucket
,
id
);
return
getMetadataQueryService
().
searchService
(
startTimestamp
,
endTimestamp
,
serviceCode
);
}
public
List
<
Endpoint
>
searchEndpoint
(
final
String
keyword
,
final
String
serviceId
,
final
int
limit
)
{
return
getMetadataQueryService
().
searchEndpoint
(
keyword
,
serviceId
,
limit
);
}
public
List
<
ServiceInstance
>
getServiceInstances
(
final
Duration
duration
,
final
String
id
)
throws
ParseException
{
long
startTimestamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getStart
()
);
long
endTimestamp
=
DurationUtils
.
INSTANCE
.
toTimestamp
(
duration
.
getStep
(),
duration
.
getEnd
());
public
Service
searchService
(
final
Duration
duration
,
final
String
serviceCode
)
{
long
startTimeBucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getStart
());
long
endTimeBucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getEnd
());
return
getMetadataQueryService
().
getServiceInstances
(
startTimestamp
,
endTimestamp
,
id
);
}
return
getMetadataQueryService
().
searchService
(
duration
.
getStep
(),
startTimeBucket
,
endTimeBucket
,
serviceCode
);
public
List
<
Endpoint
>
searchEndpoint
(
final
String
keyword
,
final
String
serviceId
,
final
int
limit
)
{
return
getMetadataQueryService
().
searchEndpoint
(
keyword
,
serviceId
,
limit
);
}
}
oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
浏览文件 @
05764660
...
...
@@ -80,9 +80,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
standardBuilder
.
setPeerId
(
peerId
);
standardBuilder
.
setPeer
(
Const
.
EMPTY_STRING
);
//
int spanLayer = standardBuilder.getSpanLayerValue();
//
int serverType = componentLibraryCatalogService.getServerIdBasedOnComponent(standardBuilder.getComponentId());
//
networkAddressInventoryRegister.update(peerId, spanLayer, serverType);
int
spanLayer
=
standardBuilder
.
getSpanLayerValue
();
int
serverType
=
componentLibraryCatalogService
.
getServerIdBasedOnComponent
(
standardBuilder
.
getComponentId
());
networkAddressInventoryRegister
.
update
(
peerId
,
spanLayer
,
serverType
);
}
}
...
...
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
浏览文件 @
05764660
...
...
@@ -78,6 +78,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this
.
registerServiceImplementation
(
ITopologyQueryDAO
.
class
,
new
TopologyQueryEsDAO
(
elasticSearchClient
));
this
.
registerServiceImplementation
(
IMetricQueryDAO
.
class
,
new
MetricQueryEsDAO
(
elasticSearchClient
));
this
.
registerServiceImplementation
(
ITraceQueryDAO
.
class
,
new
TraceQueryEsDAO
(
elasticSearchClient
));
this
.
registerServiceImplementation
(
IMetadataQueryDAO
.
class
,
new
MetadataQueryEsDAO
(
elasticSearchClient
));
}
@Override
...
...
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
0 → 100644
浏览文件 @
05764660
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query
;
import
java.io.IOException
;
import
java.util.*
;
import
org.apache.skywalking.oap.server.core.query.entity.Service
;
import
org.apache.skywalking.oap.server.core.register.*
;
import
org.apache.skywalking.oap.server.core.source.DetectPoint
;
import
org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO
;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient
;
import
org.apache.skywalking.oap.server.library.util.BooleanUtils
;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO
;
import
org.elasticsearch.action.search.SearchResponse
;
import
org.elasticsearch.index.query.*
;
import
org.elasticsearch.search.SearchHit
;
import
org.elasticsearch.search.builder.SearchSourceBuilder
;
/**
* @author peng-yongsheng
*/
public
class
MetadataQueryEsDAO
extends
EsDAO
implements
IMetadataQueryDAO
{
public
MetadataQueryEsDAO
(
ElasticSearchClient
client
)
{
super
(
client
);
}
@Override
public
int
numOfService
(
long
startTimestamp
,
long
endTimestamp
)
throws
IOException
{
SearchSourceBuilder
sourceBuilder
=
SearchSourceBuilder
.
searchSource
();
BoolQueryBuilder
boolQueryBuilder
=
QueryBuilders
.
boolQuery
();
BoolQueryBuilder
boolQuery1
=
QueryBuilders
.
boolQuery
();
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
HEARTBEAT_TIME
).
gte
(
endTimestamp
));
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
BoolQueryBuilder
boolQuery2
=
QueryBuilders
.
boolQuery
();
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
HEARTBEAT_TIME
).
gte
(
startTimestamp
));
BoolQueryBuilder
timeBoolQuery
=
QueryBuilders
.
boolQuery
();
timeBoolQuery
.
should
().
add
(
boolQuery1
);
timeBoolQuery
.
should
().
add
(
boolQuery2
);
boolQueryBuilder
.
must
().
add
(
timeBoolQuery
);
boolQueryBuilder
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceInventory
.
IS_ADDRESS
,
BooleanUtils
.
FALSE
));
sourceBuilder
.
query
(
boolQueryBuilder
);
sourceBuilder
.
size
(
0
);
SearchResponse
response
=
getClient
().
search
(
ServiceInventory
.
MODEL_NAME
,
sourceBuilder
);
return
(
int
)
response
.
getHits
().
getTotalHits
();
}
@Override
public
int
numOfEndpoint
(
long
startTimestamp
,
long
endTimestamp
)
throws
IOException
{
SearchSourceBuilder
sourceBuilder
=
SearchSourceBuilder
.
searchSource
();
BoolQueryBuilder
boolQueryBuilder
=
QueryBuilders
.
boolQuery
();
BoolQueryBuilder
boolQuery1
=
QueryBuilders
.
boolQuery
();
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
EndpointInventory
.
HEARTBEAT_TIME
).
gte
(
endTimestamp
));
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
EndpointInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
BoolQueryBuilder
boolQuery2
=
QueryBuilders
.
boolQuery
();
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
EndpointInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
EndpointInventory
.
HEARTBEAT_TIME
).
gte
(
startTimestamp
));
BoolQueryBuilder
timeBoolQuery
=
QueryBuilders
.
boolQuery
();
timeBoolQuery
.
should
().
add
(
boolQuery1
);
timeBoolQuery
.
should
().
add
(
boolQuery2
);
boolQueryBuilder
.
must
().
add
(
timeBoolQuery
);
boolQueryBuilder
.
must
().
add
(
QueryBuilders
.
termQuery
(
EndpointInventory
.
DETECT_POINT
,
DetectPoint
.
SERVER
.
ordinal
()));
sourceBuilder
.
query
(
boolQueryBuilder
);
sourceBuilder
.
size
(
0
);
SearchResponse
response
=
getClient
().
search
(
EndpointInventory
.
MODEL_NAME
,
sourceBuilder
);
return
(
int
)
response
.
getHits
().
getTotalHits
();
}
@Override
public
int
numOfConjectural
(
long
startTimestamp
,
long
endTimestamp
,
int
srcLayer
)
throws
IOException
{
SearchSourceBuilder
sourceBuilder
=
SearchSourceBuilder
.
searchSource
();
BoolQueryBuilder
boolQueryBuilder
=
QueryBuilders
.
boolQuery
();
BoolQueryBuilder
boolQuery1
=
QueryBuilders
.
boolQuery
();
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
NetworkAddressInventory
.
HEARTBEAT_TIME
).
gte
(
endTimestamp
));
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
NetworkAddressInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
BoolQueryBuilder
boolQuery2
=
QueryBuilders
.
boolQuery
();
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
NetworkAddressInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
NetworkAddressInventory
.
HEARTBEAT_TIME
).
gte
(
startTimestamp
));
BoolQueryBuilder
timeBoolQuery
=
QueryBuilders
.
boolQuery
();
timeBoolQuery
.
should
().
add
(
boolQuery1
);
timeBoolQuery
.
should
().
add
(
boolQuery2
);
boolQueryBuilder
.
must
().
add
(
timeBoolQuery
);
boolQueryBuilder
.
must
().
add
(
QueryBuilders
.
termQuery
(
NetworkAddressInventory
.
SRC_LAYER
,
srcLayer
));
sourceBuilder
.
query
(
boolQueryBuilder
);
sourceBuilder
.
size
(
0
);
SearchResponse
response
=
getClient
().
search
(
NetworkAddressInventory
.
MODEL_NAME
,
sourceBuilder
);
return
(
int
)
response
.
getHits
().
getTotalHits
();
}
@Override
public
List
<
Service
>
getAllServices
(
long
startTimestamp
,
long
endTimestamp
)
throws
IOException
{
SearchSourceBuilder
sourceBuilder
=
SearchSourceBuilder
.
searchSource
();
BoolQueryBuilder
boolQueryBuilder
=
QueryBuilders
.
boolQuery
();
BoolQueryBuilder
boolQuery1
=
QueryBuilders
.
boolQuery
();
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
HEARTBEAT_TIME
).
gte
(
endTimestamp
));
boolQuery1
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
BoolQueryBuilder
boolQuery2
=
QueryBuilders
.
boolQuery
();
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
REGISTER_TIME
).
lte
(
endTimestamp
));
boolQuery2
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceInventory
.
HEARTBEAT_TIME
).
gte
(
startTimestamp
));
BoolQueryBuilder
timeBoolQuery
=
QueryBuilders
.
boolQuery
();
timeBoolQuery
.
should
().
add
(
boolQuery1
);
timeBoolQuery
.
should
().
add
(
boolQuery2
);
boolQueryBuilder
.
must
().
add
(
timeBoolQuery
);
boolQueryBuilder
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceInventory
.
IS_ADDRESS
,
BooleanUtils
.
FALSE
));
sourceBuilder
.
query
(
boolQueryBuilder
);
sourceBuilder
.
size
(
100
);
SearchResponse
response
=
getClient
().
search
(
ServiceInventory
.
MODEL_NAME
,
sourceBuilder
);
List
<
Service
>
services
=
new
ArrayList
<>();
for
(
SearchHit
searchHit
:
response
.
getHits
())
{
Map
<
String
,
Object
>
sourceAsMap
=
searchHit
.
getSourceAsMap
();
Service
service
=
new
Service
();
service
.
setId
(
String
.
valueOf
(
sourceAsMap
.
get
(
ServiceInventory
.
SEQUENCE
)));
service
.
setName
((
String
)
sourceAsMap
.
get
(
ServiceInventory
.
NAME
));
services
.
add
(
service
);
}
return
services
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录