Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
42c9ebd0
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
42c9ebd0
编写于
4月 26, 2017
作者:
Z
zhangxin
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/3.0.1' into zhangxin/fix/169
上级
a16f6f65
7e80c6af
变更
55
隐藏空白更改
内联
并排
Showing
55 changed file
with
1201 addition
and
491 deletion
+1201
-491
docker-compose.yml
docker-compose.yml
+3
-3
skywalking-collector/pom.xml
skywalking-collector/pom.xml
+10
-0
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/AkkaSystem.java
.../main/java/com/a/eye/skywalking/collector/AkkaSystem.java
+5
-2
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java
...ctor/worker/globaltrace/analysis/GlobalTraceAnalysis.java
+6
-6
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java
...lobaltrace/persistence/GlobalTraceSearchWithGlobalId.java
+2
-6
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/AbstractGet.java
...e/skywalking/collector/worker/httpserver/AbstractGet.java
+2
-2
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/AbstractPost.java
.../skywalking/collector/worker/httpserver/AbstractPost.java
+26
-20
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java
...lector/worker/node/analysis/AbstractNodeCompAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeMappingAnalysis.java
...tor/worker/node/analysis/AbstractNodeMappingAnalysis.java
+3
-3
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java
...king/collector/worker/node/analysis/NodeCompAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingDayAnalysis.java
...ollector/worker/node/analysis/NodeMappingDayAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingHourAnalysis.java
...llector/worker/node/analysis/NodeMappingHourAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingMinuteAnalysis.java
...ector/worker/node/analysis/NodeMappingMinuteAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java
...ctor/worker/noderef/analysis/AbstractNodeRefAnalysis.java
+8
-8
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefDayAnalysis.java
...collector/worker/noderef/analysis/NodeRefDayAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefHourAnalysis.java
...ollector/worker/noderef/analysis/NodeRefHourAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefMinuteAnalysis.java
...lector/worker/noderef/analysis/NodeRefMinuteAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java
.../eye/skywalking/collector/worker/segment/SegmentPost.java
+25
-40
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/DeserializeObject.java
...ng/collector/worker/segment/entity/DeserializeObject.java
+16
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/GlobalTraceId.java
...alking/collector/worker/segment/entity/GlobalTraceId.java
+22
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/JsonBuilder.java
...ywalking/collector/worker/segment/entity/JsonBuilder.java
+70
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/LogData.java
...e/skywalking/collector/worker/segment/entity/LogData.java
+59
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Segment.java
...e/skywalking/collector/worker/segment/entity/Segment.java
+127
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/SegmentDeserialize.java
...g/collector/worker/segment/entity/SegmentDeserialize.java
+45
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Span.java
.../eye/skywalking/collector/worker/segment/entity/Span.java
+153
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/SpanView.java
.../skywalking/collector/worker/segment/entity/SpanView.java
+1
-1
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/TraceSegmentRef.java
...king/collector/worker/segment/entity/TraceSegmentRef.java
+74
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/AbstractTag.java
...king/collector/worker/segment/entity/tag/AbstractTag.java
+16
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/BooleanTag.java
...lking/collector/worker/segment/entity/tag/BooleanTag.java
+35
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/IntTag.java
...kywalking/collector/worker/segment/entity/tag/IntTag.java
+31
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/ShortTag.java
...walking/collector/worker/segment/entity/tag/ShortTag.java
+30
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/StringTag.java
...alking/collector/worker/segment/entity/tag/StringTag.java
+20
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/Tags.java
.../skywalking/collector/worker/segment/entity/tag/Tags.java
+97
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/Segment.java
...ye/skywalking/collector/worker/segment/logic/Segment.java
+0
-141
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/SegmentDeserialize.java
...ng/collector/worker/segment/logic/SegmentDeserialize.java
+0
-17
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentCostSave.java
...collector/worker/segment/persistence/SegmentCostSave.java
+3
-3
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionSave.java
...ctor/worker/segment/persistence/SegmentExceptionSave.java
+5
-5
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSave.java
...ing/collector/worker/segment/persistence/SegmentSave.java
+40
-8
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java
...egment/persistence/SegmentTopSearchWithGlobalTraceId.java
+6
-6
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java
...er/segment/persistence/SegmentTopSearchWithTimeSlice.java
+14
-14
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java
...g/collector/worker/span/persistence/SpanSearchWithId.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/ClientSpanIsLeafTools.java
...walking/collector/worker/tools/ClientSpanIsLeafTools.java
+2
-2
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java
...eye/skywalking/collector/worker/tools/SpanPeersTools.java
+2
-2
skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config
...king-collector-worker/src/main/resources/collector.config
+17
-66
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/PostWithHttpServletTestCase.java
...lector/worker/httpserver/PostWithHttpServletTestCase.java
+7
-6
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/TestAbstractPost.java
...walking/collector/worker/httpserver/TestAbstractPost.java
+1
-2
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentPostTestCase.java
...walking/collector/worker/segment/SegmentPostTestCase.java
+1
-1
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentRealPost.java
.../skywalking/collector/worker/segment/SegmentRealPost.java
+3
-3
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/mock/SegmentMock.java
...skywalking/collector/worker/segment/mock/SegmentMock.java
+22
-58
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSaveTestCase.java
...ector/worker/segment/persistence/SegmentSaveTestCase.java
+6
-4
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithIdTestCase.java
...tor/worker/span/persistence/SpanSearchWithIdTestCase.java
+7
-8
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersToolsTestCase.java
...alking/collector/worker/tools/SpanPeersToolsTestCase.java
+1
-1
skywalking-collector/skywalking-collector-worker/src/test/resources/json/span/persistence/segment.json
...ker/src/test/resources/json/span/persistence/segment.json
+134
-0
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java
.../java/com/a/eye/skywalking/api/context/TracerContext.java
+6
-17
skywalking-sniffer/skywalking-toolkit-activation/skywalking-toolkit-opentracing-activation/src/main/java/com/a/eye/skywalking/toolkit/activation/opentracing/span/interceptor/SpanSetTagInterceptor.java
...n/opentracing/span/interceptor/SpanSetTagInterceptor.java
+2
-0
未找到文件。
docker-compose.yml
浏览文件 @
42c9ebd0
version
:
'
2.1'
services
:
skywalking-webui
:
image
:
sky-walking-ui:
1.0
image
:
sky-walking-ui:
3.0.1-2017
expose
:
-
"
8080"
ports
:
...
...
@@ -12,7 +12,7 @@ services:
-
COLLECTOR_SERVERS=skywalking-collector:7001
skywalking-collector
:
image
:
skywalking-collector:3.0-2017
image
:
skywalking-collector:3.0
.1
-2017
expose
:
-
"
7001"
-
"
1000"
...
...
@@ -37,4 +37,4 @@ services:
test
:
[
"
CMD"
,
"
curl"
,
"
-f"
,
"
http://localhost:9200"
]
interval
:
30s
timeout
:
10s
retries
:
5
\ No newline at end of file
retries
:
5
skywalking-collector/pom.xml
浏览文件 @
42c9ebd0
...
...
@@ -36,6 +36,16 @@
<artifactId>
log4j-api
</artifactId>
<version>
${log4j.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.logging.log4j
</groupId>
<artifactId>
log4j-slf4j-impl
</artifactId>
<version>
${log4j.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.logging.log4j
</groupId>
<artifactId>
log4j-jcl
</artifactId>
<version>
${log4j.version}
</version>
</dependency>
<dependency>
<groupId>
com.typesafe.akka
</groupId>
<artifactId>
akka-testkit_2.11
</artifactId>
...
...
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/AkkaSystem.java
浏览文件 @
42c9ebd0
...
...
@@ -18,8 +18,11 @@ public enum AkkaSystem {
public
ActorSystem
create
()
{
final
Config
config
=
ConfigFactory
.
parseString
(
"akka.remote.netty.tcp.HOSTNAME="
+
ClusterConfig
.
Cluster
.
Current
.
HOSTNAME
).
withFallback
(
ConfigFactory
.
parseString
(
"akka.remote.netty.tcp.PORT="
+
ClusterConfig
.
Cluster
.
Current
.
PORT
)).
withFallback
(
ConfigFactory
.
load
(
"application.conf"
));
withFallback
(
ConfigFactory
.
parseString
(
"akka.remote.netty.tcp.PORT="
+
ClusterConfig
.
Cluster
.
Current
.
PORT
)).
withFallback
(
ConfigFactory
.
parseString
(
"akka.loggers=[\"akka.event.slf4j.Slf4jLogger\"]"
)).
withFallback
(
ConfigFactory
.
parseString
(
"akka.loglevel=\"ERROR\""
)).
withFallback
(
ConfigFactory
.
load
(
"application.conf"
));
if
(!
StringUtil
.
isEmpty
(
ClusterConfig
.
Cluster
.
SEED_NODES
))
{
config
.
withFallback
(
ConfigFactory
.
parseString
(
"akka.cluster.seed-nodes="
+
generateSeedNodes
()));
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -10,10 +10,10 @@ import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import
com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex
;
import
com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.MergeData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
java.util.List
;
...
...
@@ -29,12 +29,12 @@ public class GlobalTraceAnalysis extends MergeAnalysisMember {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
String
subSegmentId
=
segment
.
getTraceSegmentId
();
List
<
Distributed
TraceId
>
globalTraceIdList
=
segment
.
getRelatedGlobalTraces
();
List
<
Global
TraceId
>
globalTraceIdList
=
segment
.
getRelatedGlobalTraces
();
if
(
CollectionTools
.
isNotEmpty
(
globalTraceIdList
))
{
for
(
Distributed
TraceId
disTraceId
:
globalTraceIdList
)
{
for
(
Global
TraceId
disTraceId
:
globalTraceIdList
)
{
String
traceId
=
disTraceId
.
get
();
setMergeData
(
traceId
,
GlobalTraceIndex
.
SUB_SEG_IDS
,
subSegmentId
);
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java
浏览文件 @
42c9ebd0
...
...
@@ -6,14 +6,10 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import
com.a.eye.skywalking.collector.actor.selector.WorkerSelector
;
import
com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SpanView
;
import
com.a.eye.skywalking.collector.worker.segment.entity.*
;
import
com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs
;
import
com.a.eye.skywalking.collector.worker.storage.MergeData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
...
...
@@ -52,7 +48,7 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
logger
.
debug
(
"subSegId: %s"
,
subSegId
);
String
segmentSource
=
GetResponseFromEs
.
INSTANCE
.
get
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
subSegId
).
getSourceAsString
();
logger
.
debug
(
"segmentSource: %s"
,
segmentSource
);
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
segmentSource
);
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
segmentSource
);
String
segmentId
=
segment
.
getTraceSegmentId
();
List
<
TraceSegmentRef
>
refsList
=
segment
.
getRefs
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/AbstractGet.java
浏览文件 @
42c9ebd0
...
...
@@ -18,12 +18,12 @@ public abstract class AbstractGet extends AbstractLocalSyncWorker {
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
final
public
void
onWork
(
Object
request
,
Object
response
)
throws
Exception
{
@Override
final
public
void
onWork
(
Object
request
,
Object
response
)
throws
Exception
{
Map
<
String
,
String
[]>
parameterMap
=
(
Map
<
String
,
String
[]>)
request
;
try
{
onSearch
(
parameterMap
,
(
JsonObject
)
response
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
((
JsonObject
)
response
).
addProperty
(
"isSuccess"
,
false
);
((
JsonObject
)
response
).
addProperty
(
"reason"
,
e
.
getMessage
());
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/AbstractPost.java
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.httpserver
;
import
com.a.eye.skywalking.collector.actor.*
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
com.google.gson.stream.JsonReader
;
import
javax.servlet.ServletException
;
import
javax.servlet.http.HttpServletRequest
;
...
...
@@ -17,22 +17,16 @@ import java.io.IOException;
public
abstract
class
AbstractPost
extends
AbstractLocalAsyncWorker
{
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
AbstractPost
.
class
);
public
AbstractPost
(
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
final
public
void
onWork
(
Object
request
)
throws
Exception
{
if
(
request
instanceof
String
)
{
onReceive
((
String
)
request
);
}
else
{
logger
.
error
(
"unhandled request, request instance must String, but is %s"
,
request
.
getClass
().
toString
());
saveException
(
new
IllegalArgumentException
(
"request instance must String"
));
}
@Override
final
public
void
onWork
(
Object
message
)
throws
Exception
{
onReceive
(
message
);
}
protected
abstract
void
onReceive
(
String
reqJsonStr
)
throws
Exception
;
protected
abstract
void
onReceive
(
Object
message
)
throws
Exception
;
static
class
PostWithHttpServlet
extends
AbstractHttpServlet
{
...
...
@@ -42,22 +36,34 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
this
.
ownerWorkerRef
=
ownerWorkerRef
;
}
@Override
final
protected
void
doPost
(
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
ServletException
,
IOException
{
@Override
final
protected
void
doPost
(
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
ServletException
,
IOException
{
JsonObject
resJson
=
new
JsonObject
();
try
{
BufferedReader
bufferedReader
=
request
.
getReader
();
StringBuilder
dataStr
=
new
StringBuilder
();
String
tmpStr
;
while
((
tmpStr
=
bufferedReader
.
readLine
())
!=
null
)
{
dataStr
.
append
(
tmpStr
);
}
ownerWorkerRef
.
tell
(
dataStr
.
toString
());
streamReader
(
bufferedReader
);
reply
(
response
,
resJson
,
HttpServletResponse
.
SC_OK
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
resJson
.
addProperty
(
"error"
,
e
.
getMessage
());
reply
(
response
,
resJson
,
HttpServletResponse
.
SC_INTERNAL_SERVER_ERROR
);
}
}
private
void
streamReader
(
BufferedReader
bufferedReader
)
throws
Exception
{
try
(
JsonReader
reader
=
new
JsonReader
(
bufferedReader
))
{
readSegmentArray
(
reader
);
}
}
private
void
readSegmentArray
(
JsonReader
reader
)
throws
Exception
{
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
Segment
segment
=
new
Segment
();
segment
.
deserialize
(
reader
);
ownerWorkerRef
.
tell
(
segment
);
}
reader
.
endArray
();
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -4,12 +4,12 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import
com.a.eye.skywalking.collector.actor.LocalWorkerContext
;
import
com.a.eye.skywalking.collector.worker.RecordAnalysisMember
;
import
com.a.eye.skywalking.collector.worker.node.NodeCompIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags
;
import
com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.collector.worker.tools.SpanPeersTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.tag.Tags
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -28,7 +28,7 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
super
(
role
,
clusterContext
,
selfContext
);
}
void
analyseSpans
(
Trace
Segment
segment
)
throws
Exception
{
void
analyseSpans
(
Segment
segment
)
throws
Exception
{
List
<
Span
>
spanList
=
segment
.
getSpans
();
logger
.
debug
(
"node analysis span isNotEmpty %s"
,
CollectionTools
.
isNotEmpty
(
spanList
));
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeMappingAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -5,9 +5,9 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.RecordAnalysisMember
;
import
com.a.eye.skywalking.collector.worker.node.NodeMappingIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.TraceSegmentRef
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -26,7 +26,7 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
super
(
role
,
clusterContext
,
selfContext
);
}
void
analyseRefs
(
Trace
Segment
segment
,
long
timeSlice
)
throws
Exception
{
void
analyseRefs
(
Segment
segment
,
long
timeSlice
)
throws
Exception
{
List
<
TraceSegmentRef
>
segmentRefList
=
segment
.
getRefs
();
logger
.
debug
(
"node mapping analysis refs isNotEmpty %s"
,
CollectionTools
.
isNotEmpty
(
segmentRefList
));
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeCompAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeCompAnalysis
extends
AbstractNodeCompAnalysis
{
NodeCompAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseSpans
(
segment
);
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingDayAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingDayAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeMappingDayAnalysis
extends
AbstractNodeMappingAnalysis
{
public
NodeMappingDayAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseRefs
(
segment
,
segmentWithTimeSlice
.
getDay
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingHourAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingHourAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeMappingHourAnalysis
extends
AbstractNodeMappingAnalysis
{
NodeMappingHourAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseRefs
(
segment
,
segmentWithTimeSlice
.
getHour
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingMinuteAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingMinuteAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeMappingMinuteAnalysis
extends
AbstractNodeMappingAnalysis
{
NodeMappingMinuteAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseRefs
(
segment
,
segmentWithTimeSlice
.
getMinute
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -5,12 +5,12 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.RecordAnalysisMember
;
import
com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags
;
import
com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.collector.worker.tools.SpanPeersTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.tag.Tags
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -25,12 +25,12 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
AbstractNodeRefAnalysis
.
class
);
AbstractNodeRefAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
final
void
analyseNodeRef
(
Trace
Segment
segment
,
long
timeSlice
,
long
minute
,
long
hour
,
long
day
,
int
second
)
throws
Exception
{
final
void
analyseNodeRef
(
Segment
segment
,
long
timeSlice
,
long
minute
,
long
hour
,
long
day
,
int
second
)
throws
Exception
{
List
<
Span
>
spanList
=
segment
.
getSpans
();
if
(
CollectionTools
.
isNotEmpty
(
spanList
))
{
for
(
Span
span
:
spanList
)
{
...
...
@@ -69,7 +69,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
private
void
buildNodeRefResRecordData
(
String
nodeRefId
,
Span
span
,
long
minute
,
long
hour
,
long
day
,
int
second
)
throws
Exception
{
int
second
)
throws
Exception
{
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
refResRecord
=
new
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
(
minute
,
hour
,
day
,
second
);
refResRecord
.
setStartTime
(
span
.
getStartTime
());
refResRecord
.
setEndTime
(
span
.
getEndTime
());
...
...
@@ -79,5 +79,5 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
protected
abstract
void
sendToResSumAnalysis
(
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
refResRecord
)
throws
Exception
;
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
refResRecord
)
throws
Exception
;
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefDayAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeRefDayAnalysis
extends
AbstractNodeRefAnalysis
{
protected
NodeRefDayAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -31,8 +31,8 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
long
minute
=
segmentWithTimeSlice
.
getMinute
();
long
hour
=
segmentWithTimeSlice
.
getHour
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefHourAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeRefHourAnalysis
extends
AbstractNodeRefAnalysis
{
protected
NodeRefHourAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -31,8 +31,8 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
long
minute
=
segmentWithTimeSlice
.
getMinute
();
long
hour
=
segmentWithTimeSlice
.
getHour
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefMinuteAnalysis.java
浏览文件 @
42c9ebd0
...
...
@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeRefMinuteAnalysis
extends
AbstractNodeRefAnalysis
{
protected
NodeRefMinuteAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -31,8 +31,8 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
long
minute
=
segmentWithTimeSlice
.
getMinute
();
long
hour
=
segmentWithTimeSlice
.
getHour
();
long
day
=
segmentWithTimeSlice
.
getDay
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java
浏览文件 @
42c9ebd0
...
...
@@ -11,35 +11,30 @@ import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import
com.a.eye.skywalking.collector.worker.globaltrace.analysis.GlobalTraceAnalysis
;
import
com.a.eye.skywalking.collector.worker.httpserver.AbstractPost
;
import
com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider
;
import
com.a.eye.skywalking.collector.worker.node.analysis.*
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeCompAnalysis
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingDayAnalysis
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingHourAnalysis
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingMinuteAnalysis
;
import
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefDayAnalysis
;
import
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefHourAnalysis
;
import
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnalysis
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.persistence.SegmentCostSave
;
import
com.a.eye.skywalking.collector.worker.segment.persistence.SegmentExceptionSave
;
import
com.a.eye.skywalking.collector.worker.segment.persistence.SegmentSave
;
import
com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice
;
import
com.a.eye.skywalking.collector.worker.tools.DateTools
;
import
com.a.eye.skywalking.trace.SegmentsMessage
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
java.util.List
;
/**
* @author pengys5
*/
public
class
SegmentPost
extends
AbstractPost
{
private
static
final
Logger
logger
=
LogManager
.
getFormatterLogger
(
SegmentPost
.
class
);
private
Gson
gson
;
public
SegmentPost
(
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
gson
=
new
Gson
();
}
@Override
...
...
@@ -62,27 +57,25 @@ public class SegmentPost extends AbstractPost {
}
@Override
protected
void
onReceive
(
String
reqJsonStr
)
throws
Exception
{
SegmentsMessage
segmentsMessage
=
gson
.
fromJson
(
reqJsonStr
,
SegmentsMessage
.
class
);
List
<
TraceSegment
>
segmentList
=
segmentsMessage
.
getSegments
();
for
(
TraceSegment
newSegment
:
segmentList
)
{
protected
void
onReceive
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
Segment
)
{
Segment
segment
=
(
Segment
)
message
;
try
{
validateData
(
newS
egment
);
validateData
(
s
egment
);
}
catch
(
IllegalArgumentException
e
)
{
continue
;
return
;
}
logger
.
debug
(
"receive message instanceof TraceSegment, traceSegmentId is %s"
,
newS
egment
.
getTraceSegmentId
());
logger
.
debug
(
"receive message instanceof TraceSegment, traceSegmentId is %s"
,
s
egment
.
getTraceSegmentId
());
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
newS
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
newS
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
newS
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
newS
egment
.
getStartTime
());
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
s
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
s
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
s
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
s
egment
.
getStartTime
());
logger
.
debug
(
"minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s"
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentWithTimeSlice
(
newSegment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
String
newSegmentJsonStr
=
gson
.
toJson
(
newSegment
);
tellSegmentSave
(
newSegmentJsonStr
,
daySlice
,
hourSlice
,
minuteSlice
);
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentWithTimeSlice
(
segment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
getSelfContext
().
lookup
(
SegmentSave
.
Role
.
INSTANCE
).
tell
(
segment
);
getSelfContext
().
lookup
(
SegmentCostSave
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
getSelfContext
().
lookup
(
GlobalTraceAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
...
...
@@ -95,14 +88,6 @@ public class SegmentPost extends AbstractPost {
}
}
private
void
tellSegmentSave
(
String
newSegmentJsonStr
,
long
day
,
long
hour
,
long
minute
)
throws
Exception
{
JsonObject
newSegmentJson
=
gson
.
fromJson
(
newSegmentJsonStr
,
JsonObject
.
class
);
newSegmentJson
.
addProperty
(
"minute"
,
minute
);
newSegmentJson
.
addProperty
(
"hour"
,
hour
);
newSegmentJson
.
addProperty
(
"day"
,
day
);
getSelfContext
().
lookup
(
SegmentSave
.
Role
.
INSTANCE
).
tell
(
newSegmentJson
);
}
private
void
tellNodeRef
(
SegmentWithTimeSlice
segmentWithTimeSlice
)
throws
Exception
{
getSelfContext
().
lookup
(
NodeRefMinuteAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
getSelfContext
().
lookup
(
NodeRefHourAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
...
...
@@ -115,11 +100,11 @@ public class SegmentPost extends AbstractPost {
getSelfContext
().
lookup
(
NodeMappingDayAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
}
private
void
validateData
(
TraceSegment
newS
egment
)
{
if
(
StringUtil
.
isEmpty
(
newS
egment
.
getTraceSegmentId
()))
{
private
void
validateData
(
Segment
s
egment
)
{
if
(
StringUtil
.
isEmpty
(
s
egment
.
getTraceSegmentId
()))
{
throw
new
IllegalArgumentException
(
"traceSegmentId required"
);
}
if
(
0
==
newS
egment
.
getStartTime
())
{
if
(
0
==
s
egment
.
getStartTime
())
{
throw
new
IllegalArgumentException
(
"startTime required"
);
}
}
...
...
@@ -163,15 +148,15 @@ public class SegmentPost extends AbstractPost {
}
public
static
class
SegmentWithTimeSlice
extends
AbstractTimeSlice
{
private
final
TraceSegment
traceS
egment
;
private
final
Segment
s
egment
;
public
SegmentWithTimeSlice
(
TraceSegment
traceS
egment
,
long
minute
,
long
hour
,
long
day
,
int
second
)
{
public
SegmentWithTimeSlice
(
Segment
s
egment
,
long
minute
,
long
hour
,
long
day
,
int
second
)
{
super
(
minute
,
hour
,
day
,
second
);
this
.
traceSegment
=
traceS
egment
;
this
.
segment
=
s
egment
;
}
public
TraceSegment
getTrace
Segment
()
{
return
traceS
egment
;
public
Segment
get
Segment
()
{
return
s
egment
;
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/DeserializeObject.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
/**
* @author pengys5
*/
public
abstract
class
DeserializeObject
{
private
String
jsonStr
;
public
String
getJsonStr
()
{
return
jsonStr
;
}
public
void
setJsonStr
(
String
jsonStr
)
{
this
.
jsonStr
=
jsonStr
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/GlobalTraceId.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
/**
* @author pengys5
*/
public
class
GlobalTraceId
extends
DeserializeObject
{
private
String
globalTraceId
;
public
String
get
()
{
return
globalTraceId
;
}
public
GlobalTraceId
deserialize
(
JsonReader
reader
)
throws
IOException
{
this
.
globalTraceId
=
reader
.
nextString
();
this
.
setJsonStr
(
"\""
+
globalTraceId
+
"\""
);
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/JsonBuilder.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
enum
JsonBuilder
{
INSTANCE
;
public
void
append
(
StringBuilder
builder
,
String
name
,
String
value
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":\""
).
append
(
value
).
append
(
"\""
);
}
public
void
append
(
StringBuilder
builder
,
String
name
,
Number
value
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":"
).
append
(
value
);
}
public
void
append
(
StringBuilder
builder
,
String
name
,
List
<?>
value
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":"
);
builder
.
append
(
"["
);
boolean
isFirst
=
true
;
for
(
int
i
=
0
;
i
<
value
.
size
();
i
++)
{
DeserializeObject
deserializeObject
=
(
DeserializeObject
)
value
.
get
(
i
);
if
(!
isFirst
)
{
builder
.
append
(
","
);
}
builder
.
append
(
deserializeObject
.
getJsonStr
());
isFirst
=
false
;
}
builder
.
append
(
"]"
);
}
public
void
append
(
StringBuilder
builder
,
String
name
,
Map
<
String
,
?>
tagsWithStr
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":"
);
builder
.
append
(
"{"
);
boolean
isFirst
=
true
;
for
(
Map
.
Entry
<
String
,
?>
entry
:
tagsWithStr
.
entrySet
())
{
String
key
=
entry
.
getKey
();
Object
value
=
entry
.
getValue
();
if
(!
isFirst
)
{
builder
.
append
(
","
);
}
if
(
value
instanceof
String
)
{
builder
.
append
(
"\""
).
append
(
key
).
append
(
"\":\""
).
append
(
value
).
append
(
"\""
);
}
else
{
builder
.
append
(
"\""
).
append
(
key
).
append
(
"\":"
).
append
(
value
);
}
isFirst
=
false
;
}
builder
.
append
(
"}"
);
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/LogData.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
LogData
extends
DeserializeObject
{
private
long
time
;
private
Map
<
String
,
String
>
fields
;
public
long
getTime
()
{
return
time
;
}
public
Map
<
String
,
String
>
getFields
()
{
return
fields
;
}
public
LogData
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
switch
(
reader
.
nextName
())
{
case
"tm"
:
Long
tm
=
reader
.
nextLong
();
this
.
time
=
tm
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"tm"
,
tm
,
first
);
break
;
case
"fi"
:
fields
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
String
value
=
reader
.
nextString
();
fields
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"fi"
,
fields
,
first
);
break
;
default
:
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Segment.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @author pengys5
*/
public
class
Segment
extends
DeserializeObject
{
private
String
traceSegmentId
;
private
long
startTime
;
private
long
endTime
;
private
List
<
TraceSegmentRef
>
refs
;
private
List
<
Span
>
spans
;
private
String
applicationCode
;
private
List
<
GlobalTraceId
>
relatedGlobalTraces
;
public
String
getTraceSegmentId
()
{
return
traceSegmentId
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
long
getEndTime
()
{
return
endTime
;
}
public
String
getApplicationCode
()
{
return
applicationCode
;
}
public
List
<
TraceSegmentRef
>
getRefs
()
{
return
refs
;
}
public
List
<
Span
>
getSpans
()
{
return
spans
;
}
public
List
<
GlobalTraceId
>
getRelatedGlobalTraces
()
{
return
relatedGlobalTraces
;
}
public
Segment
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
switch
(
reader
.
nextName
())
{
case
"ts"
:
String
ts
=
reader
.
nextString
();
this
.
traceSegmentId
=
ts
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ts"
,
ts
,
first
);
break
;
case
"ac"
:
String
ac
=
reader
.
nextString
();
this
.
applicationCode
=
ac
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ac"
,
ac
,
first
);
break
;
case
"st"
:
long
st
=
reader
.
nextLong
();
this
.
startTime
=
st
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"st"
,
st
,
first
);
break
;
case
"et"
:
long
et
=
reader
.
nextLong
();
this
.
endTime
=
et
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"et"
,
et
,
first
);
break
;
case
"rs"
:
refs
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
TraceSegmentRef
ref
=
new
TraceSegmentRef
();
ref
.
deserialize
(
reader
);
refs
.
add
(
ref
);
}
reader
.
endArray
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"rs"
,
refs
,
first
);
break
;
case
"ss"
:
spans
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
Span
span
=
new
Span
();
span
.
deserialize
(
reader
);
spans
.
add
(
span
);
}
reader
.
endArray
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ss"
,
spans
,
first
);
break
;
case
"gt"
:
relatedGlobalTraces
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
GlobalTraceId
globalTraceId
=
new
GlobalTraceId
();
globalTraceId
.
deserialize
(
reader
);
relatedGlobalTraces
.
add
(
globalTraceId
);
}
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"gt"
,
relatedGlobalTraces
,
first
);
reader
.
endArray
();
break
;
default
:
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/SegmentDeserialize.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.FileReader
;
import
java.io.IOException
;
import
java.io.StringReader
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @author pengys5
*/
public
enum
SegmentDeserialize
{
INSTANCE
;
public
Segment
deserializeSingle
(
String
singleSegmentJsonStr
)
throws
IOException
{
JsonReader
reader
=
new
JsonReader
(
new
StringReader
(
singleSegmentJsonStr
));
Segment
segment
=
new
Segment
();
segment
.
deserialize
(
reader
);
return
segment
;
}
public
List
<
Segment
>
deserializeMultiple
(
String
segmentJsonFile
)
throws
Exception
{
List
<
Segment
>
segmentList
=
new
ArrayList
<>();
streamReader
(
segmentList
,
new
FileReader
(
segmentJsonFile
));
return
segmentList
;
}
private
void
streamReader
(
List
<
Segment
>
segmentList
,
FileReader
fileReader
)
throws
Exception
{
try
(
JsonReader
reader
=
new
JsonReader
(
fileReader
))
{
readSegmentArray
(
segmentList
,
reader
);
}
}
private
void
readSegmentArray
(
List
<
Segment
>
segmentList
,
JsonReader
reader
)
throws
Exception
{
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
Segment
segment
=
new
Segment
();
segment
.
deserialize
(
reader
);
segmentList
.
add
(
segment
);
}
reader
.
endArray
();
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Span.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
Span
extends
DeserializeObject
{
private
int
spanId
;
private
int
parentSpanId
;
private
long
startTime
;
private
long
endTime
;
private
String
operationName
;
private
Map
<
String
,
String
>
tagsWithStr
;
private
Map
<
String
,
Boolean
>
tagsWithBool
;
private
Map
<
String
,
Integer
>
tagsWithInt
;
private
List
<
LogData
>
logs
;
public
int
getSpanId
()
{
return
spanId
;
}
public
int
getParentSpanId
()
{
return
parentSpanId
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
long
getEndTime
()
{
return
endTime
;
}
public
String
getOperationName
()
{
return
operationName
;
}
public
String
getStrTag
(
String
key
)
{
return
tagsWithStr
.
get
(
key
);
}
public
Boolean
getBoolTag
(
String
key
)
{
return
tagsWithBool
.
get
(
key
);
}
public
Integer
getIntTag
(
String
key
)
{
return
tagsWithInt
.
get
(
key
);
}
public
List
<
LogData
>
getLogs
()
{
return
logs
;
}
public
Span
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
switch
(
reader
.
nextName
())
{
case
"si"
:
Integer
si
=
reader
.
nextInt
();
this
.
spanId
=
si
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"si"
,
si
,
first
);
break
;
case
"ps"
:
Integer
ps
=
reader
.
nextInt
();
this
.
parentSpanId
=
ps
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ps"
,
ps
,
first
);
break
;
case
"st"
:
Long
st
=
reader
.
nextLong
();
this
.
startTime
=
st
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"st"
,
st
,
first
);
break
;
case
"et"
:
Long
et
=
reader
.
nextLong
();
this
.
endTime
=
et
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"et"
,
et
,
first
);
break
;
case
"on"
:
String
on
=
reader
.
nextString
();
this
.
operationName
=
on
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"on"
,
on
,
first
);
break
;
case
"ts"
:
tagsWithStr
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
String
value
=
reader
.
nextString
();
tagsWithStr
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ts"
,
tagsWithStr
,
first
);
break
;
case
"tb"
:
tagsWithBool
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
boolean
value
=
reader
.
nextBoolean
();
tagsWithBool
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"tb"
,
tagsWithBool
,
first
);
break
;
case
"ti"
:
tagsWithInt
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
Integer
value
=
reader
.
nextInt
();
tagsWithInt
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ti"
,
tagsWithInt
,
first
);
break
;
case
"lo"
:
logs
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
LogData
logData
=
new
LogData
();
logData
.
deserialize
(
reader
);
logs
.
add
(
logData
);
}
reader
.
endArray
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"lo"
,
logs
,
first
);
break
;
default
:
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/
logic
/SpanView.java
→
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/
entity
/SpanView.java
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.
logic
;
package
com.a.eye.skywalking.collector.worker.segment.
entity
;
import
java.util.HashSet
;
import
java.util.Set
;
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/TraceSegmentRef.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
/**
* @author pengys5
*/
public
class
TraceSegmentRef
extends
DeserializeObject
{
private
String
traceSegmentId
;
private
int
spanId
=
-
1
;
private
String
applicationCode
;
private
String
peerHost
;
public
String
getTraceSegmentId
()
{
return
traceSegmentId
;
}
public
int
getSpanId
()
{
return
spanId
;
}
public
String
getApplicationCode
()
{
return
applicationCode
;
}
public
String
getPeerHost
()
{
return
peerHost
;
}
public
TraceSegmentRef
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
switch
(
reader
.
nextName
())
{
case
"rs"
:
String
ts
=
reader
.
nextString
();
this
.
traceSegmentId
=
ts
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ts"
,
ts
,
first
);
break
;
case
"si"
:
Integer
si
=
reader
.
nextInt
();
this
.
spanId
=
si
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"si"
,
si
,
first
);
break
;
case
"ac"
:
String
ac
=
reader
.
nextString
();
this
.
applicationCode
=
ac
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ac"
,
ac
,
first
);
break
;
case
"ph"
:
String
ph
=
reader
.
nextString
();
this
.
peerHost
=
ph
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ph"
,
ph
,
first
);
break
;
default
:
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/AbstractTag.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
public
abstract
class
AbstractTag
<
T
>
{
/**
* The key of this Tag.
*/
protected
final
String
key
;
public
AbstractTag
(
String
tagKey
)
{
this
.
key
=
tagKey
;
}
public
abstract
T
get
(
Span
span
);
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/BooleanTag.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* Do the same thing as {@link StringTag}, just with a {@link Boolean} value.
* <p>
* Created by wusheng on 2017/2/17.
*/
public
class
BooleanTag
extends
AbstractTag
<
Boolean
>
{
private
boolean
defaultValue
;
public
BooleanTag
(
String
key
,
boolean
defaultValue
)
{
super
(
key
);
this
.
defaultValue
=
defaultValue
;
}
/**
* Get a tag value, type of {@link Boolean}. After akka-message/serialize, all tags values are type of {@link
* String}, convert to {@link Boolean}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public
Boolean
get
(
Span
span
)
{
Boolean
tagValue
=
span
.
getBoolTag
(
super
.
key
);
if
(
tagValue
==
null
)
{
return
defaultValue
;
}
else
{
return
tagValue
;
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/IntTag.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* Do the same thing as {@link StringTag}, just with a {@link Integer} value.
*
* Created by wusheng on 2017/2/18.
*/
public
class
IntTag
extends
AbstractTag
<
Integer
>
{
public
IntTag
(
String
key
)
{
super
(
key
);
}
/**
* Get a tag value, type of {@link Integer}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Integer}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public
Integer
get
(
Span
span
)
{
Integer
tagValue
=
span
.
getIntTag
(
super
.
key
);
if
(
tagValue
==
null
)
{
return
null
;
}
else
{
return
tagValue
;
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/ShortTag.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* Do the same thing as {@link StringTag}, just with a {@link Short} value.
*
* Created by wusheng on 2017/2/17.
*/
public
class
ShortTag
extends
AbstractTag
<
Short
>
{
public
ShortTag
(
String
key
)
{
super
(
key
);
}
/**
* Get a tag value, type of {@link Short}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Short}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public
Short
get
(
Span
span
)
{
Integer
tagValue
=
span
.
getIntTag
(
super
.
key
);
if
(
tagValue
==
null
)
{
return
null
;
}
else
{
return
Short
.
valueOf
(
tagValue
.
toString
());
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/StringTag.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* A subclass of {@link AbstractTag},
* represent a tag with a {@link String} value.
*
* Created by wusheng on 2017/2/17.
*/
public
class
StringTag
extends
AbstractTag
<
String
>
{
public
StringTag
(
String
tagKey
)
{
super
(
tagKey
);
}
@Override
public
String
get
(
Span
span
)
{
return
span
.
getStrTag
(
super
.
key
);
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/Tags.java
0 → 100644
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* The span tags are supported by sky-walking engine.
* As default, all tags will be stored, but these ones have particular meanings.
* <p>
* Created by wusheng on 2017/2/17.
*/
public
final
class
Tags
{
private
Tags
()
{
}
/**
* URL records the url of the incoming request.
*/
public
static
final
StringTag
URL
=
new
StringTag
(
"url"
);
/**
* STATUS_CODE records the http status code of the response.
*/
public
static
final
IntTag
STATUS_CODE
=
new
IntTag
(
"status_code"
);
/**
* SPAN_KIND hints at the relationship between spans, e.g. client/server.
*/
public
static
final
StringTag
SPAN_KIND
=
new
StringTag
(
"span.kind"
);
/**
* A constant for setting the span kind to indicate that it represents a server span.
*/
public
static
final
String
SPAN_KIND_SERVER
=
"server"
;
/**
* A constant for setting the span kind to indicate that it represents a client span.
*/
public
static
final
String
SPAN_KIND_CLIENT
=
"client"
;
/**
* SPAN_LAYER represents the kind of span.
* <p>
* e.g.
* db=database;
* rpc=Remote Procedure Call Framework, like motan, thift;
* nosql=something like redis/memcache
*/
public
static
final
class
SPAN_LAYER
{
private
static
StringTag
SPAN_LAYER_TAG
=
new
StringTag
(
"span.layer"
);
public
static
String
get
(
Span
span
)
{
return
SPAN_LAYER_TAG
.
get
(
span
);
}
}
/**
* COMPONENT is a low-cardinality identifier of the module, library, or package that is instrumented.
* Like dubbo/dubbox/motan
*/
public
static
final
StringTag
COMPONENT
=
new
StringTag
(
"component"
);
/**
* ERROR indicates whether a Span ended in an error state.
*/
public
static
final
BooleanTag
ERROR
=
new
BooleanTag
(
"error"
,
false
);
/**
* PEER_HOST records host address (ip:port, or ip1:port1,ip2:port2) of the peer, maybe IPV4, IPV6 or hostname.
*/
public
static
final
StringTag
PEER_HOST
=
new
StringTag
(
"peer.host"
);
/**
* PEER_PORT records remote port of the peer
*/
public
static
final
IntTag
PEER_PORT
=
new
IntTag
(
"peer.port"
);
/**
* PEERS records multiple host address and port of remote
*/
public
static
final
StringTag
PEERS
=
new
StringTag
(
"peers"
);
/**
* DB_TYPE records database type, such as sql, redis, cassandra and so on.
*/
public
static
final
StringTag
DB_TYPE
=
new
StringTag
(
"db.type"
);
/**
* DB_INSTANCE records database instance name.
*/
public
static
final
StringTag
DB_INSTANCE
=
new
StringTag
(
"db.instance"
);
/**
* DB_STATEMENT records the sql statement of the database access.
*/
public
static
final
StringTag
DB_STATEMENT
=
new
StringTag
(
"db.statement"
);
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/Segment.java
已删除
100644 → 0
浏览文件 @
a16f6f65
package
com.a.eye.skywalking.collector.worker.segment.logic
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceIds
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.google.gson.annotations.Expose
;
import
com.google.gson.annotations.SerializedName
;
import
java.util.Collections
;
import
java.util.LinkedList
;
import
java.util.List
;
/**
* @author pengys5
*/
public
class
Segment
{
/**
* The id of this trace segment.
* Every segment has its unique-global-id.
*/
@Expose
@SerializedName
(
value
=
"ts"
)
private
String
traceSegmentId
;
/**
* The start time of this trace segment.
*/
@Expose
@SerializedName
(
value
=
"st"
)
private
long
startTime
;
/**
* The end time of this trace segment.
*/
@Expose
@SerializedName
(
value
=
"et"
)
private
long
endTime
;
/**
* The refs of parent trace segments, except the primary one.
* For most RPC call, {@link #refs} contains only one element,
* but if this segment is a start span of batch process, the segment faces multi parents,
* at this moment, we use this {@link #refs} to link them.
*/
@Expose
@SerializedName
(
value
=
"rs"
)
private
List
<
TraceSegmentRef
>
refs
;
/**
* The spans belong to this trace segment.
* They all have finished.
* All active spans are hold and controlled by "skywalking-api" module.
*/
@Expose
@SerializedName
(
value
=
"ss"
)
private
List
<
Span
>
spans
;
/**
* The <code>applicationCode</code> represents a NAME of current application/JVM and indicates which is business
* role in the cluster.
* <p>
* e.g. account_app, billing_app
*/
@Expose
@SerializedName
(
value
=
"ac"
)
private
String
applicationCode
;
/**
* The <code>relatedGlobalTraces</code> represent a set of all related trace. Most time it contains only one
* element, because only one parent {@link TraceSegment} exists, but, in batch scenario, the num becomes greater
* than 1, also meaning multi-parents {@link TraceSegment}.
* <p>
* The difference between <code>relatedGlobalTraces</code> and {@link #refs} is:
* {@link #refs} targets this {@link TraceSegment}'s direct parent,
* <p>
* and
* <p>
* <code>relatedGlobalTraces</code> targets this {@link TraceSegment}'s related call chain, a call chain contains
* multi {@link TraceSegment}s, only using {@link #refs} is not enough for analysis and ui.
*/
@Expose
@SerializedName
(
value
=
"gt"
)
private
DistributedTraceIds
relatedGlobalTraces
;
/**
* Establish the link between this segment and its parents.
*
* @param refSegment {@link TraceSegmentRef}
*/
public
void
ref
(
TraceSegmentRef
refSegment
)
{
if
(
refs
==
null
)
{
refs
=
new
LinkedList
<
TraceSegmentRef
>();
}
if
(!
refs
.
contains
(
refSegment
))
{
refs
.
add
(
refSegment
);
}
}
public
void
relatedGlobalTraces
(
List
<
DistributedTraceId
>
distributedTraceIds
)
{
if
(
distributedTraceIds
==
null
||
distributedTraceIds
.
size
()
==
0
)
{
return
;
}
for
(
DistributedTraceId
distributedTraceId
:
distributedTraceIds
)
{
relatedGlobalTraces
.
append
(
distributedTraceId
);
}
}
public
String
getTraceSegmentId
()
{
return
traceSegmentId
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
long
getEndTime
()
{
return
endTime
;
}
public
List
<
TraceSegmentRef
>
getRefs
()
{
if
(
refs
==
null
)
{
return
null
;
}
return
Collections
.
unmodifiableList
(
refs
);
}
public
List
<
DistributedTraceId
>
getRelatedGlobalTraces
()
{
return
relatedGlobalTraces
.
getRelatedGlobalTraces
();
}
public
List
<
Span
>
getSpans
()
{
return
Collections
.
unmodifiableList
(
spans
);
}
public
String
getApplicationCode
()
{
return
applicationCode
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/SegmentDeserialize.java
已删除
100644 → 0
浏览文件 @
a16f6f65
package
com.a.eye.skywalking.collector.worker.segment.logic
;
import
com.google.gson.Gson
;
/**
* @author pengys5
*/
public
enum
SegmentDeserialize
{
INSTANCE
;
private
Gson
gson
=
new
Gson
();
public
Segment
deserializeFromES
(
String
segmentSource
)
{
Segment
segment
=
gson
.
fromJson
(
segmentSource
,
Segment
.
class
);
return
segment
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentCostSave.java
浏览文件 @
42c9ebd0
...
...
@@ -9,10 +9,10 @@ import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -43,7 +43,7 @@ public class SegmentCostSave extends RecordPersistenceMember {
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
if
(
CollectionTools
.
isNotEmpty
(
segment
.
getSpans
()))
{
for
(
Span
span
:
segment
.
getSpans
())
{
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionSave.java
浏览文件 @
42c9ebd0
...
...
@@ -9,13 +9,13 @@ import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.LogData
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags
;
import
com.a.eye.skywalking.collector.worker.storage.AbstractIndex
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.LogData
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.tag.Tags
;
import
com.google.gson.JsonArray
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
...
...
@@ -49,7 +49,7 @@ public class SegmentExceptionSave extends RecordPersistenceMember {
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
if
(
CollectionTools
.
isNotEmpty
(
segment
.
getSpans
()))
{
for
(
Span
span
:
segment
.
getSpans
())
{
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSave.java
浏览文件 @
42c9ebd0
...
...
@@ -6,13 +6,20 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.actor.selector.RollingSelector
;
import
com.a.eye.skywalking.collector.actor.selector.WorkerSelector
;
import
com.a.eye.skywalking.collector.worker.RecordPersistenceMember
;
import
com.a.eye.skywalking.collector.worker.config.CacheSizeConfig
;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.AbstractIndex
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.google.gson.JsonObject
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
org.elasticsearch.action.bulk.BulkRequestBuilder
;
import
org.elasticsearch.action.bulk.BulkResponse
;
import
org.elasticsearch.client.Client
;
import
java.util.LinkedHashMap
;
import
java.util.Map
;
/**
* @author pengys5
...
...
@@ -21,6 +28,8 @@ public class SegmentSave extends RecordPersistenceMember {
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
SegmentSave
.
class
);
private
Map
<
String
,
String
>
persistenceData
=
new
LinkedHashMap
<>();
@Override
public
String
esIndex
()
{
return
SegmentIndex
.
INDEX
;
...
...
@@ -32,22 +41,45 @@ public class SegmentSave extends RecordPersistenceMember {
}
public
SegmentSave
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
JsonObject
)
{
JsonObject
segmentJson
=
(
JsonObject
)
message
;
RecordData
recordData
=
new
RecordData
(
segmentJson
.
get
(
"ts"
).
getAsString
());
recordData
.
setRecord
(
segmentJson
);
super
.
analyse
(
recordData
);
if
(
message
instanceof
Segment
)
{
Segment
segment
=
(
Segment
)
message
;
persistenceData
.
put
(
segment
.
getTraceSegmentId
(),
segment
.
getJsonStr
());
if
(
persistenceData
.
size
()
>=
CacheSizeConfig
.
Cache
.
Persistence
.
SIZE
)
{
persistence
();
}
}
else
{
logger
.
error
(
"unhandled message, message instance must JsonObject, but is %s"
,
message
.
getClass
().
toString
());
}
}
@Override
protected
void
persistence
()
{
boolean
success
=
saveToEs
();
if
(
success
)
{
persistenceData
.
clear
();
}
}
private
boolean
saveToEs
()
{
Client
client
=
EsClient
.
INSTANCE
.
getClient
();
BulkRequestBuilder
bulkRequest
=
client
.
prepareBulk
();
logger
.
debug
(
"persistenceData SIZE: %s"
,
persistenceData
.
size
());
persistenceData
.
forEach
((
key
,
value
)
->
bulkRequest
.
add
(
client
.
prepareIndex
(
esIndex
(),
esType
(),
key
).
setSource
(
value
)));
BulkResponse
bulkResponse
=
bulkRequest
.
execute
().
actionGet
();
if
(
bulkResponse
.
hasFailures
())
{
logger
.
error
(
bulkResponse
.
buildFailureMessage
());
}
return
!
bulkResponse
.
hasFailures
();
}
public
static
class
Factory
extends
AbstractLocalAsyncWorkerProvider
<
SegmentSave
>
{
public
static
Factory
INSTANCE
=
new
Factory
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java
浏览文件 @
42c9ebd0
...
...
@@ -7,12 +7,12 @@ import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import
com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
com.a.eye.skywalking.collector.worker.storage.MergeData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonArray
;
import
com.google.gson.JsonObject
;
...
...
@@ -81,12 +81,12 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
COST
,
(
Number
)
getResponse
.
getSource
().
get
(
SegmentCostIndex
.
COST
));
String
segmentSource
=
client
.
prepareGet
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
segId
).
get
().
getSourceAsString
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
segmentSource
);
List
<
Distributed
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
segmentSource
);
List
<
Global
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
JsonArray
distributedTraceIdArray
=
new
JsonArray
();
if
(
CollectionTools
.
isNotEmpty
(
distributedTraceIdList
))
{
for
(
Distributed
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
for
(
Global
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
distributedTraceIdArray
.
add
(
distributedTraceId
.
get
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java
浏览文件 @
42c9ebd0
...
...
@@ -6,11 +6,11 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.google.gson.JsonArray
;
import
com.google.gson.JsonObject
;
import
org.elasticsearch.action.search.SearchRequestBuilder
;
...
...
@@ -30,7 +30,7 @@ import java.util.List;
public
class
SegmentTopSearchWithTimeSlice
extends
AbstractLocalSyncWorker
{
private
SegmentTopSearchWithTimeSlice
(
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -42,7 +42,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
@Override
protected
void
onWork
(
Object
request
,
Object
response
)
throws
Exception
{
if
(
request
instanceof
RequestEntity
)
{
RequestEntity
search
=
(
RequestEntity
)
request
;
RequestEntity
search
=
(
RequestEntity
)
request
;
SearchRequestBuilder
searchRequestBuilder
=
EsClient
.
INSTANCE
.
getClient
().
prepareSearch
(
SegmentCostIndex
.
INDEX
);
searchRequestBuilder
.
setTypes
(
SegmentCostIndex
.
TYPE_RECORD
);
...
...
@@ -77,23 +77,23 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
for
(
SearchHit
searchHit
:
searchResponse
.
getHits
().
getHits
())
{
JsonObject
topSegmentJson
=
new
JsonObject
();
topSegmentJson
.
addProperty
(
"num"
,
num
);
String
segId
=
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
SEG_ID
);
String
segId
=
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
SEG_ID
);
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
SEG_ID
,
segId
);
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
START_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
START_TIME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
START_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
START_TIME
));
if
(
searchHit
.
getSource
().
containsKey
(
SegmentCostIndex
.
END_TIME
))
{
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
END_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
END_TIME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
END_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
END_TIME
));
}
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
OPERATION_NAME
,
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
OPERATION_NAME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
COST
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
COST
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
OPERATION_NAME
,
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
OPERATION_NAME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
COST
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
COST
));
String
segmentSource
=
EsClient
.
INSTANCE
.
getClient
().
prepareGet
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
segId
).
get
().
getSourceAsString
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
segmentSource
);
List
<
Distributed
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
segmentSource
);
List
<
Global
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
JsonArray
distributedTraceIdArray
=
new
JsonArray
();
if
(
CollectionTools
.
isNotEmpty
(
distributedTraceIdList
))
{
for
(
Distributed
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
for
(
Global
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
distributedTraceIdArray
.
add
(
distributedTraceId
.
get
());
}
}
...
...
@@ -114,7 +114,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
topSegArray
.
add
(
topSegmentJson
);
}
JsonObject
resJsonObj
=
(
JsonObject
)
response
;
JsonObject
resJsonObj
=
(
JsonObject
)
response
;
resJsonObj
.
add
(
"result"
,
topSegPaging
);
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java
浏览文件 @
42c9ebd0
...
...
@@ -5,10 +5,10 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import
com.a.eye.skywalking.collector.actor.selector.WorkerSelector
;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs
;
import
com.a.eye.skywalking.trace.Span
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.elasticsearch.action.get.GetResponse
;
...
...
@@ -31,7 +31,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
if
(
request
instanceof
RequestEntity
)
{
RequestEntity
search
=
(
RequestEntity
)
request
;
GetResponse
getResponse
=
GetResponseFromEs
.
INSTANCE
.
get
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
search
.
segId
);
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
getResponse
.
getSourceAsString
());
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
getResponse
.
getSourceAsString
());
List
<
Span
>
spanList
=
segment
.
getSpans
();
getResponse
.
getSource
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/ClientSpanIsLeafTools.java
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.tools
;
import
com.a.eye.skywalking.
trace
.Span
;
import
com.a.eye.skywalking.
trace
.tag.Tags
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.Span
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.tag.Tags
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java
浏览文件 @
42c9ebd0
...
...
@@ -2,8 +2,8 @@ package com.a.eye.skywalking.collector.worker.tools;
import
com.a.eye.skywalking.api.util.StringUtil
;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.
trace
.Span
;
import
com.a.eye.skywalking.
trace
.tag.Tags
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.Span
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.tag.Tags
;
/**
* @author pengys5
...
...
skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config
浏览文件 @
42c9ebd0
#
The remote server should connect to, hostname can be either hostname or IP address.
#
Suggestion: set the real ip
address.
#
"hostname" and "port" are used to receive internal messages from other nodes of cluster。
#
Hostname can be either hostname or IP
address.
cluster
.
current
.
hostname
=
127
.
0
.
0
.
1
# Listening port.
cluster
.
current
.
port
=
11800
# The roles of this member. List of strings, e.g. roles = A, B
# In the future, the roles are part of the membership information and can be used by
# routers or other services to distribute work to certain member types,
# e.g. front-end and back-end nodes.
# In this version, all members has same roles, each of them will listen others status,
# because of network trouble or member jvm crash or every reason led to not reachable,
# the routers will stop to sending the message to the untouchable member.
# In this version, all members have same roles, and everyone of them is listening the status of others.
# The routers do not send message to nodes, which is unreachable, caused by network trouble, jvm crash or any other reasons.
cluster
.
current
.
roles
=
WorkersListener
# Initial contact points of the cluster, e.g. seed_nodes = 127.0.0.1:11800, 127.0.0.1:11801.
...
...
@@ -19,77 +15,32 @@ cluster.current.roles=WorkersListener
# This is akka configuration, see: http://doc.akka.io/docs/akka/2.4/general/configuration.html
cluster
.
seed_nodes
=
127
.
0
.
0
.
1
:
11800
# elasticsearch configuration,
config/elasticsearch.yml, see cluster.name
# elasticsearch configuration,
see cluster.name in "config/elasticsearch.yml"
es
.
cluster
.
name
=
CollectorDBCluster
es
.
cluster
.
transport
.
sniffer
=
true
# The elasticsearch nodes of cluster,
comma separated
, e.g. nodes=ip:port, ip:port
# The elasticsearch nodes of cluster,
separated by comma
, e.g. nodes=ip:port, ip:port
es
.
cluster
.
nodes
=
127
.
0
.
0
.
1
:
9300
#
Automatic create elasticsearch index
#
Initialized mode of elasticsearch index, default is auto.
# Options: auto, forced, manual
# auto: just create new index when index not created.
# forced: delete the index then create
# auto: create index when it doesn't exist.
# forced: delete and create.
# manual: do nothing.
es
.
index
.
initialize
.
mode
=
auto
# Config of shards or replicas in Elasticsearch.
es
.
index
.
shards
.
number
=
2
es
.
index
.
replicas
.
number
=
0
# You can configure a host either as a host name or IP address to identify a specific network
# interface on which to listen.
# Be used for web ui get the view data or agent post the trace segment.
# "hostname", "port" and "contextPath" are used to provide HTTP RESTful services.
# RESTful services include "receive segment" and "webui query services".
http
.
hostname
=
127
.
0
.
0
.
1
# The TCP/IP port on which the connector listens for connections.
http
.
port
=
12800
#
The contextPath is a URL prefix that identifies which context a HTTP request is destined for.
#
Web context path
http
.
contextPath
=/
# The analysis worker max cache size, when worker data size reach the size,
# then worker will send all cached data to the next worker and clear the cache.
# Cache size of analysis worker. The value determines whether sending to next worker and clear, or not.
cache
.
analysis
.
size
=
1024
#
The persistence worker max cache size, same of "cache.analysis.size" ability
.
#
Cache size of persistence worker. The value determines whether save data and clear, or not
.
cache
.
persistence
.
size
=
1024
WorkerNum
.
Node
.
NodeCompAgg
.
Value
=
10
WorkerNum
.
Node
.
NodeMappingDayAgg
.
Value
=
10
WorkerNum
.
Node
.
NodeMappingHourAgg
.
Value
=
10
WorkerNum
.
Node
.
NodeMappingMinuteAgg
.
Value
=
10
WorkerNum
.
NodeRef
.
NodeRefDayAgg
.
Value
=
10
WorkerNum
.
NodeRef
.
NodeRefHourAgg
.
Value
=
10
WorkerNum
.
NodeRef
.
NodeRefMinuteAgg
.
Value
=
10
WorkerNum
.
NodeRef
.
NodeRefResSumDayAgg
.
Value
=
10
WorkerNum
.
NodeRef
.
NodeRefResSumHourAgg
.
Value
=
10
WorkerNum
.
NodeRef
.
NodeRefResSumMinuteAgg
.
Value
=
10
WorkerNum
.
GlobalTrace
.
GlobalTraceAgg
.
Value
=
10
Queue
.
GlobalTrace
.
GlobalTraceSave
.
Size
=
1024
Queue
.
GlobalTrace
.
GlobalTraceAnalysis
.
Size
=
1024
Queue
.
Segment
.
SegmentPost
.
Size
=
1024
Queue
.
Segment
.
SegmentCostSave
.
Size
=
1024
Queue
.
Segment
.
SegmentSave
.
Size
=
1024
Queue
.
Segment
.
SegmentExceptionSave
.
Size
=
1024
Queue
.
Node
.
NodeCompAnalysis
.
Size
=
1024
Queue
.
Node
.
NodeMappingDayAnalysis
.
Size
=
1024
Queue
.
Node
.
NodeMappingHourAnalysis
.
Size
=
1024
Queue
.
Node
.
NodeMappingMinuteAnalysis
.
Size
=
1024
Queue
.
Node
.
NodeCompSave
.
Size
=
1024
Queue
.
Node
.
NodeMappingDaySave
.
Size
=
1024
Queue
.
Node
.
NodeMappingHourSave
.
Size
=
1024
Queue
.
Node
.
NodeMappingMinuteSave
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefDayAnalysis
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefHourAnalysis
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefMinuteAnalysis
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefDaySave
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefHourSave
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefMinuteSave
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefResSumDaySave
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefResSumHourSave
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefResSumMinuteSave
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefResSumDayAnalysis
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefResSumHourAnalysis
.
Size
=
1024
Queue
.
NodeRef
.
NodeRefResSumMinuteAnalysis
.
Size
=
1024
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/PostWithHttpServletTestCase.java
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.httpserver
;
import
com.a.eye.skywalking.collector.actor.LocalAsyncWorkerRef
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
...
...
@@ -11,6 +12,7 @@ import javax.servlet.http.HttpServletRequest;
import
javax.servlet.http.HttpServletResponse
;
import
java.io.BufferedReader
;
import
java.io.PrintWriter
;
import
java.io.StringReader
;
import
static
org
.
mockito
.
Matchers
.
anyInt
;
import
static
org
.
mockito
.
Mockito
.*;
...
...
@@ -54,15 +56,14 @@ public class PostWithHttpServletTestCase {
doAnswer
(
new
Answer
()
{
@Override
public
Object
answer
(
InvocationOnMock
invocation
)
throws
Throwable
{
S
tring
reqStr
=
(
String
)
invocation
.
getArguments
()[
0
];
System
.
out
.
println
(
reqStr
);
Assert
.
assertEquals
(
"TestTest2"
,
reqStr
);
S
egment
segment
=
(
Segment
)
invocation
.
getArguments
()[
0
];
System
.
out
.
println
(
segment
.
getTraceSegmentId
()
);
Assert
.
assertEquals
(
"TestTest2"
,
segment
.
getTraceSegmentId
()
);
return
null
;
}
}).
when
(
workerRef
).
tell
(
any
String
(
));
}).
when
(
workerRef
).
tell
(
any
(
Segment
.
class
));
BufferedReader
bufferedReader
=
mock
(
BufferedReader
.
class
);
when
(
bufferedReader
.
readLine
()).
thenReturn
(
"Test"
).
thenReturn
(
"Test2"
).
thenReturn
(
null
);
BufferedReader
bufferedReader
=
new
BufferedReader
(
new
StringReader
(
"[{\"ts\":\"TestTest2\"}]"
));
when
(
request
.
getReader
()).
thenReturn
(
bufferedReader
);
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/TestAbstractPost.java
浏览文件 @
42c9ebd0
...
...
@@ -21,8 +21,7 @@ public class TestAbstractPost extends AbstractPost {
}
@Override
protected
void
onReceive
(
String
reqJsonStr
)
throws
Exception
{
protected
void
onReceive
(
Object
message
)
throws
Exception
{
}
public
enum
WorkerRole
implements
Role
{
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentPostTestCase.java
浏览文件 @
42c9ebd0
...
...
@@ -260,7 +260,7 @@ public class SegmentPostTestCase {
doAnswer
(
nodeMappingDayAnalysisAnswer
).
when
(
nodeMappingDayAnalysis
).
tell
(
Mockito
.
argThat
(
new
IsSegmentWithTimeSlice
()));
}
@Test
//
@Test
public
void
testOnReceive
()
throws
Exception
{
String
cacheServiceSegmentAsString
=
segmentMock
.
mockCacheServiceSegmentAsString
();
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentRealPost.java
浏览文件 @
42c9ebd0
...
...
@@ -16,13 +16,13 @@ public class SegmentRealPost {
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", portalServiceExceptionSegmentAsString);
String
cacheServiceSegmentAsString
=
mock
.
mockCacheServiceSegmentAsString
();
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
7001
/segments"
,
cacheServiceSegmentAsString
);
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
12800
/segments"
,
cacheServiceSegmentAsString
);
String
persistenceServiceSegmentAsString
=
mock
.
mockPersistenceServiceSegmentAsString
();
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
7001
/segments"
,
persistenceServiceSegmentAsString
);
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
12800
/segments"
,
persistenceServiceSegmentAsString
);
String
portalServiceSegmentAsString
=
mock
.
mockPortalServiceSegmentAsString
();
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
7001
/segments"
,
portalServiceSegmentAsString
);
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
12800
/segments"
,
portalServiceSegmentAsString
);
// String specialSegmentAsString = mock.mockSpecialSegmentAsString();
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", specialSegmentAsString);
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/mock/SegmentMock.java
浏览文件 @
42c9ebd0
...
...
@@ -3,13 +3,10 @@ package com.a.eye.skywalking.collector.worker.segment.mock;
import
com.a.eye.skywalking.collector.queue.EndOfBatchCommand
;
import
com.a.eye.skywalking.collector.worker.AnalysisMember
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.tools.DateTools
;
import
com.a.eye.skywalking.collector.worker.tools.JsonFileReader
;
import
com.a.eye.skywalking.trace.SegmentsMessage
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.Gson
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
java.io.FileNotFoundException
;
import
java.util.ArrayList
;
...
...
@@ -19,10 +16,6 @@ import java.util.List;
* @author pengys5
*/
public
class
SegmentMock
{
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
SegmentMock
.
class
);
private
Gson
gson
=
new
Gson
();
private
String
path
=
this
.
getClass
().
getResource
(
"/"
).
getPath
();
private
final
String
CacheServiceJsonFile
=
path
+
"/json/segment/post/normal/cache-service.json"
;
...
...
@@ -38,10 +31,6 @@ public class SegmentMock {
return
JsonFileReader
.
INSTANCE
.
read
(
path
+
fileName
);
}
public
String
mockSpecialSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
SpecialJsonFile
);
}
public
String
mockCacheServiceSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
CacheServiceJsonFile
);
}
...
...
@@ -54,69 +43,44 @@ public class SegmentMock {
return
JsonFileReader
.
INSTANCE
.
read
(
PortalServiceJsonFile
);
}
public
String
mockCacheServiceExceptionSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
CacheServiceExceptionJsonFile
);
}
public
String
mockPortalServiceExceptionSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
PortalServiceExceptionJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockSpecialSegmentTimeSlice
()
throws
FileNotFoundException
{
String
specialSegmentAsString
=
mockSpecialSegmentAsString
();
logger
.
debug
(
specialSegmentAsString
);
return
createSegmentWithTimeSliceList
(
specialSegmentAsString
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceExceptionSegmentTimeSlice
()
throws
FileNotFoundException
{
String
cacheServiceExceptionSegmentAsString
=
mockCacheServiceExceptionSegmentAsString
();
logger
.
debug
(
cacheServiceExceptionSegmentAsString
);
return
createSegmentWithTimeSliceList
(
cacheServiceExceptionSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceExceptionSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
CacheServiceExceptionJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceExceptionSegmentTimeSlice
()
throws
FileNotFoundException
{
String
portalServiceExceptionSegmentAsString
=
mockPortalServiceExceptionSegmentAsString
();
logger
.
debug
(
portalServiceExceptionSegmentAsString
);
return
createSegmentWithTimeSliceList
(
portalServiceExceptionSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceExceptionSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
PortalServiceExceptionJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceSegmentSegmentTimeSlice
()
throws
FileNotFoundException
{
String
cacheServiceSegmentAsString
=
mockCacheServiceSegmentAsString
();
logger
.
debug
(
cacheServiceSegmentAsString
);
return
createSegmentWithTimeSliceList
(
cacheServiceSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceSegmentSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
CacheServiceJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPersistenceServiceSegmentTimeSlice
()
throws
FileNotFoundException
{
String
persistenceServiceSegmentAsString
=
mockPersistenceServiceSegmentAsString
();
logger
.
debug
(
persistenceServiceSegmentAsString
);
return
createSegmentWithTimeSliceList
(
persistenceServiceSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPersistenceServiceSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
PersistenceServiceJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceSegmentSegmentTimeSlice
()
throws
FileNotFoundException
{
String
portalServiceSegmentAsString
=
mockPortalServiceSegmentAsString
();
logger
.
debug
(
portalServiceSegmentAsString
);
return
createSegmentWithTimeSliceList
(
portalServiceSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceSegmentSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
PortalServiceJsonFile
);
}
private
List
<
SegmentPost
.
SegmentWithTimeSlice
>
createSegmentWithTimeSliceList
(
String
segmentJsonStr
)
{
SegmentsMessage
segmentsMessage
=
gson
.
fromJson
(
segmentJsonStr
,
SegmentsMessage
.
class
);
List
<
TraceSegment
>
segmentList
=
segmentsMessage
.
getSegments
();
private
List
<
SegmentPost
.
SegmentWithTimeSlice
>
createSegmentWithTimeSliceList
(
String
jsonFilePath
)
throws
Exception
{
List
<
Segment
>
segmentList
=
SegmentDeserialize
.
INSTANCE
.
deserializeMultiple
(
jsonFilePath
);
List
<
SegmentPost
.
SegmentWithTimeSlice
>
segmentWithTimeSliceList
=
new
ArrayList
<>();
for
(
TraceSegment
newS
egment
:
segmentList
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
createSegmentWithTimeSlice
(
newS
egment
);
for
(
Segment
s
egment
:
segmentList
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
createSegmentWithTimeSlice
(
s
egment
);
segmentWithTimeSliceList
.
add
(
segmentWithTimeSlice
);
}
return
segmentWithTimeSliceList
;
}
private
SegmentPost
.
SegmentWithTimeSlice
createSegmentWithTimeSlice
(
TraceSegment
newS
egment
)
{
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
newS
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
newS
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
newS
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
newS
egment
.
getStartTime
());
private
SegmentPost
.
SegmentWithTimeSlice
createSegmentWithTimeSlice
(
Segment
s
egment
)
{
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
s
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
s
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
s
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
s
egment
.
getStartTime
());
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentPost
.
SegmentWithTimeSlice
(
newS
egment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentPost
.
SegmentWithTimeSlice
(
s
egment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
return
segmentWithTimeSlice
;
}
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSaveTestCase.java
浏览文件 @
42c9ebd0
...
...
@@ -7,6 +7,7 @@ import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
...
...
@@ -20,6 +21,7 @@ import org.mockito.stubbing.Answer;
import
org.powermock.core.classloader.annotations.PowerMockIgnore
;
import
org.powermock.core.classloader.annotations.PrepareForTest
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.reflect.Whitebox
;
import
java.util.TimeZone
;
...
...
@@ -83,9 +85,9 @@ public class SegmentSaveTestCase {
public
void
testAnalyse
()
throws
Exception
{
CacheSizeConfig
.
Cache
.
Persistence
.
SIZE
=
1
;
JsonObject
segment_1
=
new
JsonObjec
t
();
segment
_1
.
addProperty
(
"ts"
,
"segment_1
"
);
segmentSave
.
analyse
(
segment
_1
);
Segment
segment
=
new
Segmen
t
();
segment
.
setJsonStr
(
"{\"ts\":\"segment_1\"}
"
);
segmentSave
.
analyse
(
segment
);
Assert
.
assertEquals
(
"segment_1"
,
saveToEsSource
.
ts
);
}
...
...
@@ -97,7 +99,7 @@ public class SegmentSaveTestCase {
@Override
public
Object
answer
(
InvocationOnMock
invocation
)
throws
Throwable
{
Gson
gson
=
new
Gson
();
String
source
=
(
String
)
invocation
.
getArguments
()[
0
];
String
source
=
(
String
)
invocation
.
getArguments
()[
0
];
JsonObject
sourceJsonObj
=
gson
.
fromJson
(
source
,
JsonObject
.
class
);
ts
=
sourceJsonObj
.
get
(
"ts"
).
getAsString
();
return
null
;
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithIdTestCase.java
浏览文件 @
42c9ebd0
...
...
@@ -5,10 +5,10 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.actor.selector.RollingSelector
;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.mock.SegmentMock
;
import
com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.elasticsearch.action.get.GetResponse
;
import
org.junit.Assert
;
...
...
@@ -63,10 +63,8 @@ public class SpanSearchWithIdTestCase {
LocalWorkerContext
localWorkerContext
=
new
LocalWorkerContext
();
SpanSearchWithId
spanSearchWithId
=
new
SpanSearchWithId
(
SpanSearchWithId
.
WorkerRole
.
INSTANCE
,
clusterWorkerContext
,
localWorkerContext
);
TraceSegment
segment
=
create
();
Gson
gson
=
new
Gson
();
String
sourceString
=
gson
.
toJson
(
segment
);
SegmentMock
mock
=
new
SegmentMock
();
String
sourceString
=
mock
.
loadJsonFile
(
"/json/span/persistence/segment.json"
);
GetResponse
getResponse
=
mock
(
GetResponse
.
class
);
when
(
getResponseFromEs
.
get
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
"1"
)).
thenReturn
(
getResponse
);
when
(
getResponse
.
getSourceAsString
()).
thenReturn
(
sourceString
);
...
...
@@ -75,9 +73,10 @@ public class SpanSearchWithIdTestCase {
JsonObject
response
=
new
JsonObject
();
spanSearchWithId
.
onWork
(
request
,
response
);
JsonObject
segJsonObj
=
response
.
get
(
Const
.
RESULT
).
getAsJsonObject
();
String
value
=
segJsonObj
.
get
(
"ts"
).
getAsJsonObject
().
get
(
"Tag"
).
getAsString
();
Assert
.
assertEquals
(
"VALUE"
,
value
);
JsonObject
spanJsonObj
=
response
.
get
(
Const
.
RESULT
).
getAsJsonObject
();
System
.
out
.
println
(
spanJsonObj
.
toString
());
String
value
=
spanJsonObj
.
get
(
"operationName"
).
getAsString
();
Assert
.
assertEquals
(
"/portal/"
,
value
);
}
private
TraceSegment
create
()
{
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersToolsTestCase.java
浏览文件 @
42c9ebd0
package
com.a.eye.skywalking.collector.worker.tools
;
import
com.a.eye.skywalking.
trace
.Span
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.Span
;
import
org.junit.Assert
;
import
org.junit.Test
;
...
...
skywalking-collector/skywalking-collector-worker/src/test/resources/json/span/persistence/segment.json
0 → 100644
浏览文件 @
42c9ebd0
{
"ts"
:
"Segment.1490922929254.1797892356.6003.69.1"
,
"st"
:
1490922929254
,
"et"
:
1490922929306
,
"ss"
:
[
{
"si"
:
2
,
"ps"
:
1
,
"st"
:
1490922929257
,
"et"
:
1490922929262
,
"on"
:
"com.a.eye.skywalking.test.cache.CacheService.findCache(java.lang.String)"
,
"ts"
:
{
"span.layer"
:
"rpc"
,
"peer.host"
:
"127.0.0.1"
,
"component"
:
"Motan"
,
"span.kind"
:
"client"
,
"url"
:
"motan://127.0.0.1:8002/default_rpc/com.a.eye.skywalking.test.cache.CacheService/1.0/referer"
},
"tb"
:
{},
"ti"
:
{
"peer.port"
:
8002
},
"lo"
:
[]
},
{
"si"
:
1
,
"ps"
:
0
,
"st"
:
0
,
"et"
:
1490922929262
,
"on"
:
"Motan_default_rpc_com.a.eye.skywalking.test.cache.CacheService.findCache(java.lang.String)"
,
"ts"
:
{
"requestId"
:
"1563346001467539461"
},
"tb"
:
{},
"ti"
:
{},
"lo"
:
[]
},
{
"si"
:
4
,
"ps"
:
3
,
"st"
:
1490922929262
,
"et"
:
1490922929293
,
"on"
:
"/persistence/query"
,
"ts"
:
{
"span.layer"
:
"http"
,
"peer.host"
:
"10.128.35.80"
,
"component"
:
"HttpClient"
,
"span.kind"
:
"client"
,
"url"
:
"http://10.128.35.80:20880/persistence/query"
},
"tb"
:
{},
"ti"
:
{
"peer.port"
:
20880
,
"status_code"
:
200
},
"lo"
:
[]
},
{
"si"
:
3
,
"ps"
:
0
,
"st"
:
1490922929262
,
"et"
:
1490922929297
,
"on"
:
"com.a.eye.skywalking.test.persistence.PersistenceService.query(String)"
,
"ts"
:
{
"span.layer"
:
"rpc"
,
"component"
:
"Dubbo"
,
"peer.host"
:
"10.128.35.80"
,
"span.kind"
:
"client"
,
"url"
:
"rest://10.128.35.80:20880/com.a.eye.skywalking.test.persistence.PersistenceService.query(String)"
},
"tb"
:
{},
"ti"
:
{
"peer.port"
:
20880
},
"lo"
:
[]
},
{
"si"
:
6
,
"ps"
:
5
,
"st"
:
1490922929297
,
"et"
:
1490922929303
,
"on"
:
"com.a.eye.skywalking.test.cache.CacheService.updateCache(java.lang.String,java.lang.String)"
,
"ts"
:
{
"span.layer"
:
"rpc"
,
"peer.host"
:
"127.0.0.1"
,
"component"
:
"Motan"
,
"span.kind"
:
"client"
,
"url"
:
"motan://127.0.0.1:8002/default_rpc/com.a.eye.skywalking.test.cache.CacheService/1.0/referer"
},
"tb"
:
{},
"ti"
:
{
"peer.port"
:
8002
},
"lo"
:
[]
},
{
"si"
:
5
,
"ps"
:
0
,
"st"
:
0
,
"et"
:
1490922929303
,
"on"
:
"Motan_default_rpc_com.a.eye.skywalking.test.cache.CacheService.updateCache(java.lang.String,java.lang.String)"
,
"ts"
:
{
"requestId"
:
"1563346001510531078"
},
"tb"
:
{},
"ti"
:
{},
"lo"
:
[]
},
{
"si"
:
0
,
"ps"
:
-1
,
"st"
:
1490922929254
,
"et"
:
1490922929306
,
"on"
:
"/portal/"
,
"ts"
:
{
"span.layer"
:
"http"
,
"component"
:
"Tomcat"
,
"peer.host"
:
"0:0:0:0:0:0:0:1"
,
"span.kind"
:
"server"
,
"url"
:
"http://localhost:38080/portal/"
},
"tb"
:
{},
"ti"
:
{
"peer.port"
:
57837
,
"status_code"
:
200
},
"lo"
:
[]
}
],
"ac"
:
"portal-service"
,
"gt"
:
[
"Trace.1490922929254.1797892356.6003.69.2"
]
}
\ No newline at end of file
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java
浏览文件 @
42c9ebd0
...
...
@@ -7,7 +7,6 @@ import com.a.eye.skywalking.trace.Span;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.a.eye.skywalking.trace.tag.Tags
;
import
java.util.ArrayList
;
import
java.util.LinkedList
;
import
java.util.List
;
...
...
@@ -22,13 +21,12 @@ public final class TracerContext {
/**
* Active spans stored in a Stack, usually called 'ActiveSpanStack'.
* This {@link
Array
List} is the in-memory storage-structure.
* This {@link
Linked
List} is the in-memory storage-structure.
*
* I use {@link ArrayList#size()} as a pointer, to the top element of 'ActiveSpanStack'.
* And provide the top 3 important methods of stack:
* I use {@link LinkedList#removeLast()}, {@link LinkedList#addLast(Object)} and {@link LinkedList#last} instead of
* {@link #pop()}, {@link #push(Span)}, {@link #peek()}
*/
private
Li
st
<
Span
>
activeSpanStack
=
new
ArrayList
<
Span
>(
20
);
private
Li
nkedList
<
Span
>
activeSpanStack
=
new
LinkedList
<
Span
>(
);
private
int
spanIdGenerator
;
...
...
@@ -166,7 +164,7 @@ public final class TracerContext {
* @return the top element of 'ActiveSpanStack', and remove it.
*/
private
Span
pop
()
{
return
activeSpanStack
.
remove
(
getTopElementIdx
()
);
return
activeSpanStack
.
remove
Last
(
);
}
/**
...
...
@@ -175,7 +173,7 @@ public final class TracerContext {
* @param span
*/
private
void
push
(
Span
span
)
{
activeSpanStack
.
add
(
activeSpanStack
.
size
(),
span
);
activeSpanStack
.
add
Last
(
span
);
}
/**
...
...
@@ -185,16 +183,7 @@ public final class TracerContext {
if
(
activeSpanStack
.
isEmpty
())
{
return
null
;
}
return
activeSpanStack
.
get
(
getTopElementIdx
());
}
/**
* Get the index of 'ActiveSpanStack'
*
* @return the index
*/
private
int
getTopElementIdx
()
{
return
activeSpanStack
.
size
()
-
1
;
return
activeSpanStack
.
getLast
();
}
public
static
class
ListenerManager
{
...
...
skywalking-sniffer/skywalking-toolkit-activation/skywalking-toolkit-opentracing-activation/src/main/java/com/a/eye/skywalking/toolkit/activation/opentracing/span/interceptor/SpanSetTagInterceptor.java
浏览文件 @
42c9ebd0
...
...
@@ -34,6 +34,8 @@ public class SpanSetTagInterceptor implements InstanceMethodsAroundInterceptor {
ContextManager
.
activeSpan
().
setTag
(
key
,
(
Boolean
)
value
);
else
if
(
value
instanceof
Integer
)
ContextManager
.
activeSpan
().
setTag
(
key
,
(
Integer
)
value
);
else
if
(
value
instanceof
Short
)
ContextManager
.
activeSpan
().
setTag
(
key
,
((
Short
)
value
).
intValue
());
else
ContextManager
.
activeSpan
().
setTag
(
key
,
value
.
toString
());
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录