Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
0c17906c
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
0c17906c
编写于
11月 10, 2017
作者:
wu-sheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix compile issues.
上级
1db1a65f
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
95 addition
and
69 deletion
+95
-69
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ApplicationCacheGuavaService.java
...tor/cache/guava/service/ApplicationCacheGuavaService.java
+6
-6
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/InstanceCacheGuavaService.java
...lector/cache/guava/service/InstanceCacheGuavaService.java
+3
-3
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
...ector/cache/guava/service/ServiceIdCacheGuavaService.java
+3
-3
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
...tor/cache/guava/service/ServiceNameCacheGuavaService.java
+3
-3
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java
...n/java/org/skywalking/apm/collector/core/graph/Graph.java
+3
-3
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/GraphManager.java
...org/skywalking/apm/collector/core/graph/GraphManager.java
+1
-1
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/GraphNodeFinder.java
.../skywalking/apm/collector/core/graph/GraphNodeFinder.java
+2
-2
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Next.java
...in/java/org/skywalking/apm/collector/core/graph/Next.java
+3
-3
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java
...in/java/org/skywalking/apm/collector/core/graph/Node.java
+2
-2
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/NodeProcessor.java
...rg/skywalking/apm/collector/core/graph/NodeProcessor.java
+2
-2
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java
...va/org/skywalking/apm/collector/core/graph/WayToNode.java
+3
-3
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/GraphManagerTest.java
...skywalking/apm/collector/core/graph/GraphManagerTest.java
+10
-10
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node1Processor.java
...g/skywalking/apm/collector/core/graph/Node1Processor.java
+2
-2
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node2Processor.java
...g/skywalking/apm/collector/core/graph/Node2Processor.java
+2
-2
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node3Processor.java
...g/skywalking/apm/collector/core/graph/Node3Processor.java
+2
-2
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node4Processor.java
...g/skywalking/apm/collector/core/graph/Node4Processor.java
+2
-2
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/OutputProcessor.java
.../skywalking/apm/collector/core/graph/OutputProcessor.java
+30
-0
apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml
...-collector-remote/collector-remote-kafka-provider/pom.xml
+0
-4
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java
.../stream/worker/base/AbstractLocalAsyncWorkerProvider.java
+2
-2
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java
...ctor/stream/worker/base/AbstractRemoteWorkerProvider.java
+2
-2
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java
...king/apm/collector/stream/worker/base/AbstractWorker.java
+2
-2
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java
.../collector/stream/worker/base/AbstractWorkerProvider.java
+2
-2
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java
...apm/collector/stream/worker/base/LocalAsyncWorkerRef.java
+4
-4
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerRef.java
...ing/apm/collector/stream/worker/base/RemoteWorkerRef.java
+2
-2
checkStyle.xml
checkStyle.xml
+2
-2
未找到文件。
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ApplicationCacheGuavaService.java
浏览文件 @
0c17906c
...
...
@@ -35,7 +35,7 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ApplicationCacheGuavaService
.
class
);
private
final
Cache
<
String
,
Integer
>
CODE_CACHE
=
CacheBuilder
.
newBuilder
().
initialCapacity
(
100
).
maximumSize
(
1000
).
build
();
private
final
Cache
<
String
,
Integer
>
codeCache
=
CacheBuilder
.
newBuilder
().
initialCapacity
(
100
).
maximumSize
(
1000
).
build
();
private
final
DAOService
daoService
;
...
...
@@ -48,7 +48,7 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
int
applicationId
=
0
;
try
{
applicationId
=
CODE_CACHE
.
get
(
applicationCode
,
()
->
dao
.
getApplicationId
(
applicationCode
));
applicationId
=
codeCache
.
get
(
applicationCode
,
()
->
dao
.
getApplicationId
(
applicationCode
));
}
catch
(
Throwable
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
@@ -56,20 +56,20 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
if
(
applicationId
==
0
)
{
applicationId
=
dao
.
getApplicationId
(
applicationCode
);
if
(
applicationId
!=
0
)
{
CODE_CACHE
.
put
(
applicationCode
,
applicationId
);
codeCache
.
put
(
applicationCode
,
applicationId
);
}
}
return
applicationId
;
}
private
final
Cache
<
Integer
,
String
>
ID_CACHE
=
CacheBuilder
.
newBuilder
().
maximumSize
(
1000
).
build
();
private
final
Cache
<
Integer
,
String
>
idCache
=
CacheBuilder
.
newBuilder
().
maximumSize
(
1000
).
build
();
public
String
get
(
int
applicationId
)
{
IApplicationCacheDAO
dao
=
(
IApplicationCacheDAO
)
daoService
.
get
(
IApplicationCacheDAO
.
class
);
String
applicationCode
=
Const
.
EMPTY_STRING
;
try
{
applicationCode
=
ID_CACHE
.
get
(
applicationId
,
()
->
dao
.
getApplicationCode
(
applicationId
));
applicationCode
=
idCache
.
get
(
applicationId
,
()
->
dao
.
getApplicationCode
(
applicationId
));
}
catch
(
Throwable
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
@@ -77,7 +77,7 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
if
(
StringUtils
.
isEmpty
(
applicationCode
))
{
applicationCode
=
dao
.
getApplicationCode
(
applicationId
);
if
(
StringUtils
.
isNotEmpty
(
applicationCode
))
{
CODE_CACHE
.
put
(
applicationCode
,
applicationId
);
codeCache
.
put
(
applicationCode
,
applicationId
);
}
}
return
applicationCode
;
...
...
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/InstanceCacheGuavaService.java
浏览文件 @
0c17906c
...
...
@@ -33,7 +33,7 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
InstanceCacheGuavaService
.
class
);
private
final
Cache
<
Integer
,
Integer
>
INSTANCE_CACHE
=
CacheBuilder
.
newBuilder
().
initialCapacity
(
100
).
maximumSize
(
5000
).
build
();
private
final
Cache
<
Integer
,
Integer
>
integerCache
=
CacheBuilder
.
newBuilder
().
initialCapacity
(
100
).
maximumSize
(
5000
).
build
();
private
final
DAOService
daoService
;
...
...
@@ -46,7 +46,7 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
int
applicationId
=
0
;
try
{
applicationId
=
INSTANCE_CACHE
.
get
(
applicationInstanceId
,
()
->
dao
.
getApplicationId
(
applicationInstanceId
));
applicationId
=
integerCache
.
get
(
applicationInstanceId
,
()
->
dao
.
getApplicationId
(
applicationInstanceId
));
}
catch
(
Throwable
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
@@ -54,7 +54,7 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
if
(
applicationId
==
0
)
{
applicationId
=
dao
.
getApplicationId
(
applicationInstanceId
);
if
(
applicationId
!=
0
)
{
INSTANCE_CACHE
.
put
(
applicationInstanceId
,
applicationId
);
integerCache
.
put
(
applicationInstanceId
,
applicationId
);
}
}
return
applicationId
;
...
...
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
浏览文件 @
0c17906c
...
...
@@ -34,7 +34,7 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ServiceIdCacheGuavaService
.
class
);
private
final
Cache
<
String
,
Integer
>
SERVICE_CACHE
=
CacheBuilder
.
newBuilder
().
maximumSize
(
1000
).
build
();
private
final
Cache
<
String
,
Integer
>
serviceIdCache
=
CacheBuilder
.
newBuilder
().
maximumSize
(
1000
).
build
();
private
final
DAOService
daoService
;
...
...
@@ -47,7 +47,7 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
int
serviceId
=
0
;
try
{
serviceId
=
SERVICE_CACHE
.
get
(
applicationId
+
Const
.
ID_SPLIT
+
serviceName
,
()
->
dao
.
getServiceId
(
applicationId
,
serviceName
));
serviceId
=
serviceIdCache
.
get
(
applicationId
+
Const
.
ID_SPLIT
+
serviceName
,
()
->
dao
.
getServiceId
(
applicationId
,
serviceName
));
}
catch
(
Throwable
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
@@ -55,7 +55,7 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
if
(
serviceId
==
0
)
{
serviceId
=
dao
.
getServiceId
(
applicationId
,
serviceName
);
if
(
serviceId
!=
0
)
{
SERVICE_CACHE
.
put
(
applicationId
+
Const
.
ID_SPLIT
+
serviceName
,
serviceId
);
serviceIdCache
.
put
(
applicationId
+
Const
.
ID_SPLIT
+
serviceName
,
serviceId
);
}
}
return
serviceId
;
...
...
apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
浏览文件 @
0c17906c
...
...
@@ -35,7 +35,7 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ServiceNameCacheGuavaService
.
class
);
private
final
Cache
<
Integer
,
String
>
CACHE
=
CacheBuilder
.
newBuilder
().
maximumSize
(
10000
).
build
();
private
final
Cache
<
Integer
,
String
>
serviceNameCache
=
CacheBuilder
.
newBuilder
().
maximumSize
(
10000
).
build
();
private
final
DAOService
daoService
;
...
...
@@ -48,7 +48,7 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
String
serviceName
=
Const
.
EMPTY_STRING
;
try
{
serviceName
=
CACHE
.
get
(
serviceId
,
()
->
dao
.
getServiceName
(
serviceId
));
serviceName
=
serviceNameCache
.
get
(
serviceId
,
()
->
dao
.
getServiceName
(
serviceId
));
}
catch
(
Throwable
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
@@ -56,7 +56,7 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
if
(
StringUtils
.
isEmpty
(
serviceName
))
{
serviceName
=
dao
.
getServiceName
(
serviceId
);
if
(
StringUtils
.
isNotEmpty
(
serviceName
))
{
CACHE
.
put
(
serviceId
,
serviceName
);
serviceNameCache
.
put
(
serviceId
,
serviceName
);
}
}
...
...
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java
浏览文件 @
0c17906c
...
...
@@ -32,8 +32,8 @@ public final class Graph<INPUT> {
this
.
id
=
id
;
}
public
void
start
(
INPUT
INPUT
)
{
entryWay
.
in
(
INPUT
);
public
void
start
(
INPUT
input
)
{
entryWay
.
in
(
input
);
}
public
<
OUTPUT
>
Node
<
INPUT
,
OUTPUT
>
addNode
(
NodeProcessor
<
INPUT
,
OUTPUT
>
nodeProcessor
)
{
...
...
@@ -53,7 +53,7 @@ public final class Graph<INPUT> {
if
(
nodeIndex
.
containsKey
(
nodeId
))
{
throw
new
PotentialCyclicGraphException
(
"handler="
+
node
.
getHandler
().
getClass
().
getName
()
+
" already exists in graph["
+
id
+
"
】
"
);
+
" already exists in graph["
+
id
+
"
]
"
);
}
nodeIndex
.
put
(
nodeId
,
node
);
}
...
...
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/GraphManager.java
浏览文件 @
0c17906c
...
...
@@ -35,7 +35,7 @@ public enum GraphManager {
* @param graphId represents a graph, which is used for finding it.
* @return
*/
public
synchronized
<
I
nput
>
Graph
<
Input
>
createIfAbsent
(
int
graphId
,
Class
<
Input
>
input
)
{
public
synchronized
<
I
NPUT
>
Graph
<
INPUT
>
createIfAbsent
(
int
graphId
,
Class
<
INPUT
>
input
)
{
if
(!
allGraphs
.
containsKey
(
graphId
))
{
Graph
graph
=
new
Graph
(
graphId
);
allGraphs
.
put
(
graphId
,
graph
);
...
...
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/GraphNodeFinder.java
浏览文件 @
0c17906c
...
...
@@ -35,10 +35,10 @@ public class GraphNodeFinder {
*
* @param handlerId of specific node in graph.
* @param outputClass of the found node
* @param <NODE
_
OUTPUT> type of given output class
* @param <NODEOUTPUT> type of given output class
* @return Node instance.
*/
public
<
NODE
_OUTPUT
>
Node
<?,
NODE_OUTPUT
>
findNode
(
int
handlerId
,
Class
<
NODE_
OUTPUT
>
outputClass
)
{
public
<
NODE
OUTPUT
>
Node
<?,
NODEOUTPUT
>
findNode
(
int
handlerId
,
Class
<
NODE
OUTPUT
>
outputClass
)
{
ConcurrentHashMap
<
Integer
,
Node
>
graphNodeIndex
=
graph
.
getNodeIndex
();
Node
node
=
graphNodeIndex
.
get
(
handlerId
);
if
(
node
==
null
)
{
...
...
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Next.java
浏览文件 @
0c17906c
...
...
@@ -42,9 +42,9 @@ public class Next<INPUT> implements Executor<INPUT> {
/**
* Drive to the next nodes
*
* @param
INPUT
* @param
input
*/
@Override
public
void
execute
(
INPUT
INPUT
)
{
ways
.
forEach
(
way
->
way
.
in
(
INPUT
));
@Override
public
void
execute
(
INPUT
input
)
{
ways
.
forEach
(
way
->
way
.
in
(
input
));
}
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java
浏览文件 @
0c17906c
...
...
@@ -47,8 +47,8 @@ public final class Node<INPUT, OUTPUT> {
}
}
final
void
execute
(
INPUT
INPUT
)
{
nodeProcessor
.
process
(
INPUT
,
next
);
final
void
execute
(
INPUT
input
)
{
nodeProcessor
.
process
(
input
,
next
);
}
NodeProcessor
getHandler
()
{
...
...
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/NodeProcessor.java
浏览文件 @
0c17906c
...
...
@@ -21,7 +21,7 @@ package org.skywalking.apm.collector.core.graph;
/**
* @author peng-yongsheng, wu-sheng
*/
public
interface
NodeProcessor
<
INPUT
,
OUTPUT
>
{
public
interface
NodeProcessor
<
input
,
output
>
{
/**
* The unique id in the certain graph.
*
...
...
@@ -29,5 +29,5 @@ public interface NodeProcessor<INPUT, OUTPUT> {
*/
int
id
();
void
process
(
INPUT
INPUT
,
Next
<
OUTPUT
>
next
);
void
process
(
input
input
,
Next
<
output
>
next
);
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java
浏览文件 @
0c17906c
...
...
@@ -33,10 +33,10 @@ public abstract class WayToNode<INPUT, OUTPUT> {
destination
=
new
Node
(
graph
,
destinationHandler
);
}
protected
abstract
void
in
(
INPUT
INPUT
);
protected
abstract
void
in
(
INPUT
input
);
protected
void
out
(
INPUT
INPUT
)
{
destination
.
execute
(
INPUT
);
protected
void
out
(
INPUT
input
)
{
destination
.
execute
(
input
);
}
Node
getDestination
()
{
...
...
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/GraphManagerTest.java
浏览文件 @
0c17906c
...
...
@@ -31,7 +31,7 @@ import org.junit.Test;
public
class
GraphManagerTest
{
private
static
PrintStream
OUT_REF
;
private
ByteArrayOutputStream
outputStream
;
private
static
String
lineSeparator
=
System
.
lineSeparator
();
private
static
String
LINE_SEPARATE
=
System
.
lineSeparator
();
@Before
public
void
initAndHoldOut
()
{
...
...
@@ -54,8 +54,8 @@ public class GraphManagerTest {
testGraph
.
start
(
"Input String"
);
String
output
=
outputStream
.
toString
();
String
expected
=
"Node1 process: s=Input String"
+
lineSeparator
+
"Node2 process: s=Input String"
+
lineSeparator
;
String
expected
=
"Node1 process: s=Input String"
+
LINE_SEPARATE
+
"Node2 process: s=Input String"
+
LINE_SEPARATE
;
Assert
.
assertEquals
(
expected
,
output
);
}
...
...
@@ -68,9 +68,9 @@ public class GraphManagerTest {
graph
.
start
(
"Input String"
);
String
output
=
outputStream
.
toString
();
String
expected
=
"Node1 process: s=Input String"
+
lineSeparator
+
"Node2 process: s=Input String"
+
lineSeparator
+
"Node4 process: int=123"
+
lineSeparator
;
String
expected
=
"Node1 process: s=Input String"
+
LINE_SEPARATE
+
"Node2 process: s=Input String"
+
LINE_SEPARATE
+
"Node4 process: int=123"
+
LINE_SEPARATE
;
Assert
.
assertEquals
(
expected
,
output
);
}
...
...
@@ -92,7 +92,7 @@ public class GraphManagerTest {
next
.
execute
(
123
);
String
output
=
outputStream
.
toString
();
String
expected
=
"Node4 process: int=123"
+
lineSeparator
;
"Node4 process: int=123"
+
LINE_SEPARATE
;
Assert
.
assertEquals
(
expected
,
output
);
}
...
...
@@ -118,14 +118,14 @@ public class GraphManagerTest {
public
void
testDeadEndWay
()
{
Graph
<
String
>
graph
=
GraphManager
.
INSTANCE
.
createIfAbsent
(
7
,
String
.
class
);
graph
.
addNode
(
new
Node1Processor
()).
addNext
(
new
WayToNode
<
String
,
Integer
>(
new
Node2Processor
())
{
@Override
protected
void
in
(
String
INPUT
)
{
@Override
protected
void
in
(
String
input
)
{
//don't call `out(intput)`;
}
});
graph
.
start
(
"Input String"
);
String
output
=
outputStream
.
toString
();
String
expected
=
"Node1 process: s=Input String"
+
lineSeparator
;
String
expected
=
"Node1 process: s=Input String"
+
LINE_SEPARATE
;
Assert
.
assertEquals
(
expected
,
output
);
}
...
...
@@ -134,7 +134,7 @@ public class GraphManagerTest {
public
void
testEntryWay
()
{
Graph
<
String
>
graph
=
GraphManager
.
INSTANCE
.
createIfAbsent
(
8
,
String
.
class
);
graph
.
addNode
(
new
WayToNode
<
String
,
String
>(
new
Node1Processor
())
{
@Override
protected
void
in
(
String
INPUT
)
{
@Override
protected
void
in
(
String
input
)
{
//don't call `out(intput)`;
}
}).
addNext
(
new
Node2Processor
());
...
...
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node1Processor.java
浏览文件 @
0c17906c
...
...
@@ -21,13 +21,13 @@ package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public
class
Node1Processor
implements
NodeProcessor
<
String
,
String
>
{
public
class
Node1Processor
extends
OutputProcessor
implements
NodeProcessor
<
String
,
String
>
{
@Override
public
int
id
()
{
return
1
;
}
@Override
public
void
process
(
String
s
,
Next
<
String
>
next
)
{
System
.
out
.
println
(
"Node1 process: s="
+
s
);
outstream
()
.
println
(
"Node1 process: s="
+
s
);
next
.
execute
(
s
);
}
}
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node2Processor.java
浏览文件 @
0c17906c
...
...
@@ -21,13 +21,13 @@ package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public
class
Node2Processor
implements
NodeProcessor
<
String
,
Integer
>
{
public
class
Node2Processor
extends
OutputProcessor
implements
NodeProcessor
<
String
,
Integer
>
{
@Override
public
int
id
()
{
return
2
;
}
@Override
public
void
process
(
String
s
,
Next
<
Integer
>
next
)
{
System
.
out
.
println
(
"Node2 process: s="
+
s
);
outstream
()
.
println
(
"Node2 process: s="
+
s
);
next
.
execute
(
123
);
}
}
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node3Processor.java
浏览文件 @
0c17906c
...
...
@@ -21,14 +21,14 @@ package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public
class
Node3Processor
implements
NodeProcessor
<
Long
,
Long
>
{
public
class
Node3Processor
extends
OutputProcessor
implements
NodeProcessor
<
Long
,
Long
>
{
@Override
public
int
id
()
{
return
3
;
}
@Override
public
void
process
(
Long
aLong
,
Next
<
Long
>
next
)
{
System
.
out
.
println
(
"Node3 process: long="
+
aLong
);
outstream
()
.
println
(
"Node3 process: long="
+
aLong
);
next
.
execute
(
aLong
);
}
}
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/Node4Processor.java
浏览文件 @
0c17906c
...
...
@@ -21,14 +21,14 @@ package org.skywalking.apm.collector.core.graph;
/**
* @author wusheng
*/
public
class
Node4Processor
implements
NodeProcessor
<
Integer
,
Long
>
{
public
class
Node4Processor
extends
OutputProcessor
implements
NodeProcessor
<
Integer
,
Long
>
{
@Override
public
int
id
()
{
return
4
;
}
@Override
public
void
process
(
Integer
in
,
Next
<
Long
>
next
)
{
System
.
out
.
println
(
"Node4 process: int="
+
in
);
outstream
()
.
println
(
"Node4 process: int="
+
in
);
next
.
execute
(
new
Long
(
in
.
intValue
()));
}
}
apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/graph/OutputProcessor.java
0 → 100644
浏览文件 @
0c17906c
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.collector.core.graph
;
import
java.io.PrintStream
;
/**
* @author wu-sheng
*/
public
class
OutputProcessor
{
protected
PrintStream
outstream
()
{
return
System
.
out
;
}
}
apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml
浏览文件 @
0c17906c
...
...
@@ -30,7 +30,6 @@
<artifactId>
collector-remote-kafka-provider
</artifactId>
<packaging>
jar
</packaging>
<<<<<<
< HEAD
<dependencies>
<dependency>
<groupId>
org.skywalking
</groupId>
...
...
@@ -39,6 +38,3 @@
</dependency>
</dependencies>
</project>
=======
</project>
>>>>>>> 406c4f52d8251eb81aa868ec38f17789e18e0dc0
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java
浏览文件 @
0c17906c
...
...
@@ -27,7 +27,7 @@ import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public
abstract
class
AbstractLocalAsyncWorkerProvider
<
INPUT
extends
Data
,
OUTPUT
extends
Data
,
W
orkerType
extends
AbstractLocalAsyncWorker
<
INPUT
,
OUTPUT
>
&
QueueExecutor
<
INPUT
>>
extends
AbstractWorkerProvider
<
INPUT
,
OUTPUT
,
WorkerType
>
{
public
abstract
class
AbstractLocalAsyncWorkerProvider
<
INPUT
extends
Data
,
OUTPUT
extends
Data
,
W
ORKER_TYPE
extends
AbstractLocalAsyncWorker
<
INPUT
,
OUTPUT
>
&
QueueExecutor
<
INPUT
>>
extends
AbstractWorkerProvider
<
INPUT
,
OUTPUT
,
WORKER_TYPE
>
{
public
abstract
int
queueSize
();
...
...
@@ -41,7 +41,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends Data, OUTPU
@Override
final
public
WorkerRef
create
(
WorkerCreateListener
workerCreateListener
)
throws
ProviderNotFoundException
{
W
orkerType
localAsyncWorker
=
workerInstance
(
daoService
);
W
ORKER_TYPE
localAsyncWorker
=
workerInstance
(
daoService
);
workerCreateListener
.
addWorker
(
localAsyncWorker
);
QueueEventHandler
<
INPUT
>
queueEventHandler
=
queueCreatorService
.
create
(
queueSize
(),
localAsyncWorker
);
return
new
LocalAsyncWorkerRef
<>(
localAsyncWorker
,
queueEventHandler
);
...
...
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java
浏览文件 @
0c17906c
...
...
@@ -30,7 +30,7 @@ import org.skywalking.apm.collector.storage.service.DAOService;
* @author peng-yongsheng
* @since v3.0-2017
*/
public
abstract
class
AbstractRemoteWorkerProvider
<
INPUT
extends
Data
,
OUTPUT
extends
Data
,
W
orkerType
extends
AbstractRemoteWorker
<
INPUT
,
OUTPUT
>>
extends
AbstractWorkerProvider
<
INPUT
,
OUTPUT
,
WorkerType
>
{
public
abstract
class
AbstractRemoteWorkerProvider
<
INPUT
extends
Data
,
OUTPUT
extends
Data
,
W
ORKER_TYPE
extends
AbstractRemoteWorker
<
INPUT
,
OUTPUT
>>
extends
AbstractWorkerProvider
<
INPUT
,
OUTPUT
,
WORKER_TYPE
>
{
private
final
DAOService
daoService
;
private
final
RemoteClientService
remoteClientService
;
...
...
@@ -48,7 +48,7 @@ public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT ex
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override
final
public
WorkerRef
create
(
WorkerCreateListener
workerCreateListener
)
{
W
orkerType
remoteWorker
=
workerInstance
(
daoService
);
W
ORKER_TYPE
remoteWorker
=
workerInstance
(
daoService
);
workerCreateListener
.
addWorker
(
remoteWorker
);
RemoteWorkerRef
<
INPUT
,
OUTPUT
>
workerRef
=
new
RemoteWorkerRef
<>(
remoteWorker
);
return
workerRef
;
...
...
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java
浏览文件 @
0c17906c
...
...
@@ -41,10 +41,10 @@ public abstract class AbstractWorker<INPUT extends Data, OUTPUT extends Data> im
*/
protected
abstract
void
onWork
(
INPUT
message
)
throws
WorkerException
;
@Override
public
final
void
process
(
INPUT
INPUT
,
Next
<
OUTPUT
>
next
)
{
@Override
public
final
void
process
(
INPUT
input
,
Next
<
OUTPUT
>
next
)
{
this
.
next
=
next
;
try
{
onWork
(
INPUT
);
onWork
(
input
);
}
catch
(
WorkerException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java
浏览文件 @
0c17906c
...
...
@@ -24,6 +24,6 @@ import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public
abstract
class
AbstractWorkerProvider
<
INPUT
extends
Data
,
OUTPUT
extends
Data
,
W
orkerType
extends
AbstractWorker
<
INPUT
,
OUTPUT
>>
implements
Provider
{
public
abstract
W
orkerType
workerInstance
(
DAOService
daoService
);
public
abstract
class
AbstractWorkerProvider
<
INPUT
extends
Data
,
OUTPUT
extends
Data
,
W
ORKER_TYPE
extends
AbstractWorker
<
INPUT
,
OUTPUT
>>
implements
Provider
{
public
abstract
W
ORKER_TYPE
workerInstance
(
DAOService
daoService
);
}
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java
浏览文件 @
0c17906c
...
...
@@ -35,11 +35,11 @@ public class LocalAsyncWorkerRef<INPUT extends Data, OUTPUT extends Data> extend
this
.
queueEventHandler
=
queueEventHandler
;
}
@Override
protected
void
in
(
INPUT
INPUT
)
{
queueEventHandler
.
tell
(
INPUT
);
@Override
protected
void
in
(
INPUT
input
)
{
queueEventHandler
.
tell
(
input
);
}
@Override
protected
void
out
(
INPUT
INPUT
)
{
super
.
out
(
INPUT
);
@Override
protected
void
out
(
INPUT
input
)
{
super
.
out
(
input
);
}
}
apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerRef.java
浏览文件 @
0c17906c
...
...
@@ -61,8 +61,8 @@ public class RemoteWorkerRef<INPUT extends Data, OUTPUT extends Data> extends Wo
}
}
@Override
protected
void
out
(
INPUT
INPUT
)
{
super
.
out
(
INPUT
);
@Override
protected
void
out
(
INPUT
input
)
{
super
.
out
(
input
);
}
private
Boolean
isAcrossJVM
()
{
...
...
checkStyle.xml
浏览文件 @
0c17906c
...
...
@@ -81,11 +81,11 @@
<module
name=
"MemberName"
/>
<!--Validates identifiers for class type parameters-->
<module
name=
"ClassTypeParameterName"
>
<property
name=
"format"
value=
"
^[A-Z0-9]*$
"
/>
<property
name=
"format"
value=
"
(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)
"
/>
</module>
<!--Validates identifiers for method type parameters-->
<module
name=
"MethodTypeParameterName"
>
<property
name=
"format"
value=
"
^[A-Z0-9]*$
"
/>
<property
name=
"format"
value=
"
(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)
"
/>
</module>
<module
name=
"PackageName"
/>
<module
name=
"ParameterName"
/>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录