提交 67cb71e0 编写于 作者: P pengys5

fixed pull request suggestion

上级 eea42183
......@@ -22,7 +22,7 @@ public abstract class MergeAnalysisMember extends AnalysisMember {
}
final protected void setMergeData(String id, String column, String value) throws Exception {
getMergeAnalysisData().getElseCreate(id).setMergeData(column, value);
getMergeAnalysisData().getOrCreate(id).setMergeData(column, value);
}
@Override
......
......@@ -35,9 +35,9 @@ public abstract class MergePersistenceMember extends PersistenceMember<MergePers
if (message instanceof MergeData) {
MergeData mergeData = (MergeData) message;
MergePersistenceData data = getPersistenceData();
data.holdData();
data.getElseCreate(mergeData.getId()).merge(mergeData);
data.releaseData();
data.hold();
data.getOrCreate(mergeData.getId()).merge(mergeData);
data.release();
} else {
logger.error("unhandled message, message instance must MergeData, but is %s", message.getClass().toString());
}
......
......@@ -17,7 +17,7 @@ public abstract class MetricAnalysisMember extends AnalysisMember {
}
final protected void setMetric(String id, String column, Long value) throws Exception {
getMetricAnalysisData().getElseCreate(id).setMetric(column, value);
getMetricAnalysisData().getOrCreate(id).setMetric(column, value);
}
private MetricAnalysisData getMetricAnalysisData() {
......
......@@ -34,9 +34,9 @@ public abstract class MetricPersistenceMember extends PersistenceMember<MetricPe
if (message instanceof MetricData) {
MetricData metricData = (MetricData) message;
MetricPersistenceData data = getPersistenceData();
data.holdData();
data.getElseCreate(metricData.getId()).merge(metricData);
data.releaseData();
data.hold();
data.getOrCreate(metricData.getId()).merge(metricData);
data.release();
} else {
logger.error("unhandled message, message instance must MetricData, but is %s", message.getClass().toString());
}
......
......@@ -19,7 +19,7 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
}
final public void setRecord(String id, JsonObject record) throws Exception {
getRecordAnalysisData().getElseCreate(id).setRecord(record);
getRecordAnalysisData().getOrCreate(id).setRecord(record);
}
private RecordAnalysisData getRecordAnalysisData() {
......
......@@ -35,9 +35,9 @@ public abstract class RecordPersistenceMember extends PersistenceMember<RecordPe
RecordData recordData = (RecordData) message;
logger.debug("setRecord: id: %s, data: %s", recordData.getId(), recordData.getRecord());
RecordPersistenceData data = getPersistenceData();
data.holdData();
data.getElseCreate(recordData.getId()).setRecord(recordData.getRecord());
data.releaseData();
data.hold();
data.getOrCreate(recordData.getId()).setRecord(recordData.getRecord());
data.release();
} else {
logger.error("message unhandled");
}
......
......@@ -53,12 +53,12 @@ public class SegmentSave extends PersistenceMember<SegmentPersistenceData, Segme
if (message instanceof Segment) {
Segment segment = (Segment) message;
SegmentPersistenceData data = getPersistenceData();
data.holdData();
data.getElseCreate(segment.getTraceSegmentId() + i).setSegmentStr(segment.getJsonStr());
data.hold();
data.getOrCreate(segment.getTraceSegmentId() + i).setSegmentStr(segment.getJsonStr());
if (data.size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence(data.asMap());
}
data.releaseData();
data.release();
i++;
} else {
logger.error("unhandled message, message instance must Segment, but is %s", message.getClass().toString());
......
......@@ -10,7 +10,7 @@ public class MergeAnalysisData {
private WindowData<MergeData> windowData = new WindowData(new LinkedHashMap<String, MergeData>());
public MergeData getElseCreate(String id) {
public MergeData getOrCreate(String id) {
if (!windowData.containsKey(id)) {
windowData.put(id, new MergeData(id));
}
......
......@@ -7,18 +7,18 @@ public class MergePersistenceData extends Window<MergeData> implements Persisten
private WindowData<MergeData> lockedWindowData;
public MergeData getElseCreate(String id) {
public MergeData getOrCreate(String id) {
if (!lockedWindowData.containsKey(id)) {
lockedWindowData.put(id, new MergeData(id));
}
return lockedWindowData.get(id);
}
public void holdData() {
public void hold() {
lockedWindowData = getCurrentAndHold();
}
public void releaseData() {
public void release() {
lockedWindowData.release();
lockedWindowData = null;
}
......
......@@ -10,7 +10,7 @@ public class MetricAnalysisData {
private WindowData<MetricData> windowData = new WindowData(new LinkedHashMap<String, MetricData>());
public MetricData getElseCreate(String id) {
public MetricData getOrCreate(String id) {
if (!windowData.containsKey(id)) {
windowData.put(id, new MetricData(id));
}
......
......@@ -7,18 +7,18 @@ public class MetricPersistenceData extends Window<MetricData> implements Persist
private WindowData<MetricData> lockedWindowData;
public MetricData getElseCreate(String id) {
public MetricData getOrCreate(String id) {
if (!lockedWindowData.containsKey(id)) {
lockedWindowData.put(id, new MetricData(id));
}
return lockedWindowData.get(id);
}
public void holdData() {
public void hold() {
lockedWindowData = getCurrentAndHold();
}
public void releaseData() {
public void release() {
lockedWindowData.release();
lockedWindowData = null;
}
......
......@@ -5,9 +5,9 @@ package com.a.eye.skywalking.collector.worker.storage;
*/
public interface PersistenceData<T extends Data> {
T getElseCreate(String id);
T getOrCreate(String id);
void releaseData();
void release();
void holdData();
void hold();
}
......@@ -4,6 +4,7 @@ import com.a.eye.skywalking.collector.actor.AbstractLocalSyncWorker;
import com.a.eye.skywalking.collector.worker.config.EsConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.StringFormattedMessage;
import org.elasticsearch.action.index.IndexRequestBuilder;
import java.util.LinkedList;
......@@ -26,13 +27,14 @@ public enum PersistenceTimer {
try {
extractDataAndSave();
Thread.sleep(timeInterval);
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable e) {
logger.error(e);
}
}
};
Thread thread = new Thread(runnable);
thread.start();
Thread persistenceThread = new Thread(runnable);
persistenceThread.setName("timerPersistence");
persistenceThread.start();
}
private void extractDataAndSave() {
......@@ -44,8 +46,7 @@ public enum PersistenceTimer {
try {
worker.allocateJob(new FlushAndSwitch(), dataList);
} catch (Exception e) {
logger.error("flush persistence worker data error, worker role name: %s", worker.getRole().roleName());
e.printStackTrace();
logger.error(new StringFormattedMessage("flush persistence worker data error, worker role name: %s", worker.getRole().roleName()), e);
}
}
EsClient.INSTANCE.bulk(dataList);
......
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.AbstractLocalSyncWorker;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
/**
......@@ -11,7 +10,7 @@ import java.util.List;
public enum PersistenceWorkerListener {
INSTANCE;
private List<AbstractLocalSyncWorker> workers = new ArrayList<>();
private List<AbstractLocalSyncWorker> workers = new LinkedList<>();
public void register(AbstractLocalSyncWorker worker) {
workers.add(worker);
......
......@@ -10,7 +10,7 @@ public class RecordAnalysisData {
private WindowData<RecordData> windowData = new WindowData(new LinkedHashMap<String, RecordData>());
public RecordData getElseCreate(String id) {
public RecordData getOrCreate(String id) {
if (!windowData.containsKey(id)) {
windowData.put(id, new RecordData(id));
}
......
......@@ -7,18 +7,18 @@ public class RecordPersistenceData extends Window<RecordData> implements Persist
private WindowData<RecordData> lockedWindowData;
public RecordData getElseCreate(String id) {
public RecordData getOrCreate(String id) {
if (!lockedWindowData.containsKey(id)) {
lockedWindowData.put(id, new RecordData(id));
}
return lockedWindowData.get(id);
}
public void holdData() {
public void hold() {
lockedWindowData = getCurrentAndHold();
}
public void releaseData() {
public void release() {
lockedWindowData.release();
lockedWindowData = null;
}
......
......@@ -9,18 +9,18 @@ public class SegmentPersistenceData extends Window<SegmentData> implements Persi
private WindowData<SegmentData> lockedWindowData;
public SegmentData getElseCreate(String id) {
public SegmentData getOrCreate(String id) {
if (!lockedWindowData.containsKey(id)) {
lockedWindowData.put(id, new SegmentData(id));
}
return lockedWindowData.get(id);
}
public void holdData() {
public void hold() {
lockedWindowData = getCurrentAndHold();
}
public void releaseData() {
public void release() {
lockedWindowData.release();
lockedWindowData = null;
}
......
......@@ -7,7 +7,7 @@ import java.util.HashMap;
*/
public abstract class Window<T extends Data> {
private Pointer current;
private WindowData<T> pointer;
private WindowData<T> windowDataA;
private WindowData<T> windowDataB;
......@@ -15,19 +15,19 @@ public abstract class Window<T extends Data> {
public Window() {
windowDataA = new WindowData(new HashMap<>());
windowDataB = new WindowData(new HashMap<>());
current = Pointer.A;
pointer = windowDataA;
}
public void switchPointer() {
if (current.equals(Pointer.A)) {
current = Pointer.B;
if (pointer == windowDataA) {
pointer = windowDataB;
} else {
current = Pointer.A;
pointer = windowDataA;
}
}
protected WindowData<T> getCurrentAndHold() {
if (Pointer.A.equals(current)) {
if (pointer == windowDataA) {
windowDataA.hold();
return windowDataA;
} else {
......@@ -37,14 +37,10 @@ public abstract class Window<T extends Data> {
}
public WindowData<T> getLast() {
if (Pointer.A.equals(current)) {
if (pointer == windowDataA) {
return windowDataB;
} else {
return windowDataA;
}
}
enum Pointer {
A, B
}
}
......@@ -7,7 +7,7 @@ import java.util.Map;
*/
public class WindowData<T extends Data> {
private Map<String, T> data;
private boolean isHold;
private volatile boolean isHold;
WindowData(Map<String, T> data) {
this.data = data;
......
......@@ -41,7 +41,7 @@ public class MergePersistenceMemberTestCase {
MergeData mergeData = mock(MergeData.class);
when(mergePersistenceMember, "getPersistenceData").thenReturn(persistenceData);
when(persistenceData.getElseCreate(Mockito.anyString())).thenReturn(mergeData);
when(persistenceData.getOrCreate(Mockito.anyString())).thenReturn(mergeData);
doCallRealMethod().when(mergePersistenceMember).analyse(Mockito.any(MergeData.class));
}
......
......@@ -11,28 +11,28 @@ public class MergePersistenceWindowDataTestCase {
@Test
public void testGetElseCreate() {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.holdData();
MergeData mergeData = persistenceData.getElseCreate("test");
persistenceData.hold();
MergeData mergeData = persistenceData.getOrCreate("test");
Assert.assertEquals("test", mergeData.getId());
}
@Test
public void testSize() {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.holdData();
persistenceData.getElseCreate("test_1");
persistenceData.hold();
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getElseCreate("test_1");
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getElseCreate("test_2");
persistenceData.getOrCreate("test_2");
Assert.assertEquals(2, persistenceData.getCurrentAndHold().size());
}
@Test
public void testClear() {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.holdData();
persistenceData.getElseCreate("test_1");
persistenceData.hold();
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getCurrentAndHold().clear();
Assert.assertEquals(0, persistenceData.getCurrentAndHold().size());
......
......@@ -14,12 +14,12 @@ public class MetricPersistenceWindowDataTestCase {
String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
MetricPersistenceData metricPersistenceData = new MetricPersistenceData();
metricPersistenceData.holdData();
MetricData metricData = metricPersistenceData.getElseCreate(id);
metricPersistenceData.hold();
MetricData metricData = metricPersistenceData.getOrCreate(id);
metricData.setMetric("Column_1", 10L);
Assert.assertEquals(id, metricData.getId());
MetricData metricData1 = metricPersistenceData.getElseCreate(id);
MetricData metricData1 = metricPersistenceData.getOrCreate(id);
Assert.assertEquals(10L, metricData1.asMap().get("Column_1"));
}
......@@ -28,12 +28,12 @@ public class MetricPersistenceWindowDataTestCase {
String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
MetricPersistenceData metricPersistenceData = new MetricPersistenceData();
metricPersistenceData.holdData();
metricPersistenceData.getElseCreate(id);
metricPersistenceData.hold();
metricPersistenceData.getOrCreate(id);
Assert.assertEquals(1, metricPersistenceData.getCurrentAndHold().size());
String id_1 = "2016" + Const.ID_SPLIT + "B" + Const.ID_SPLIT + "C";
metricPersistenceData.getElseCreate(id_1);
metricPersistenceData.getOrCreate(id_1);
Assert.assertEquals(2, metricPersistenceData.getCurrentAndHold().size());
metricPersistenceData.getCurrentAndHold().clear();
......
......@@ -17,14 +17,14 @@ public class RecordPersistenceWindowDataTestCase {
JsonObject record = new JsonObject();
record.addProperty("Column_1", "Value_1");
RecordPersistenceData recordPersistenceData = new RecordPersistenceData();
recordPersistenceData.holdData();
recordPersistenceData.hold();
RecordData recordData = recordPersistenceData.getElseCreate(id);
RecordData recordData = recordPersistenceData.getOrCreate(id);
recordData.setRecord(record);
Assert.assertEquals(id, recordData.getId());
RecordData recordData1 = recordPersistenceData.getElseCreate(id);
RecordData recordData1 = recordPersistenceData.getOrCreate(id);
Assert.assertEquals("Value_1", recordData1.getRecord().get("Column_1").getAsString());
}
......@@ -33,11 +33,11 @@ public class RecordPersistenceWindowDataTestCase {
String id_1 = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
String id_2 = "2016" + Const.ID_SPLIT + "B" + Const.ID_SPLIT + "C";
RecordPersistenceData recordPersistenceData = new RecordPersistenceData();
recordPersistenceData.holdData();
recordPersistenceData.hold();
recordPersistenceData.getElseCreate(id_1);
recordPersistenceData.getOrCreate(id_1);
Assert.assertEquals(1, recordPersistenceData.getCurrentAndHold().size());
recordPersistenceData.getElseCreate(id_2);
recordPersistenceData.getOrCreate(id_2);
Assert.assertEquals(2, recordPersistenceData.getCurrentAndHold().size());
recordPersistenceData.getCurrentAndHold().clear();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册