From 5e3afe5ec28bdcfdbd2248cd78fc0b5fc4f8d36b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Fri, 31 Aug 2018 06:32:51 +0800 Subject: [PATCH] Buffer library performance test and functional test successfully. (#1612) * Buffer stream. * Buffer file reader. * Buffer library performance test and functional test successfully. * Fixed the code merge mistake. --- .../library/buffer/BufferFileUtils.java | 15 +++--- .../oap/server/library/buffer/DataStream.java | 2 +- .../library/buffer/DataStreamReader.java | 49 ++++++++++++------- .../library/buffer/DataStreamWriter.java | 32 ++++++------ .../oap/server/library/buffer/Offset.java | 11 ++++- .../server/library/buffer/OffsetStream.java | 4 +- .../library/buffer/BufferStreamTestCase.java | 25 ++++++---- skywalking-ui | 2 +- 8 files changed, 83 insertions(+), 57 deletions(-) diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java index 98570050e3..d39709bdc6 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java @@ -18,8 +18,7 @@ package org.apache.skywalking.oap.server.library.buffer; -import java.text.*; -import java.util.*; +import java.util.Arrays; /** * @author peng-yongsheng @@ -34,19 +33,17 @@ class BufferFileUtils { static final String OFFSET_FILE_PREFIX = "offset"; private static final String SEPARATOR = "-"; private static final String SUFFIX = ".sw"; - private static final String DATA_FORMAT_STR = "yyyyMMddHHmmss"; static void sort(String[] fileList) { Arrays.sort(fileList, (f1, f2) -> { - int fileId1 = Integer.parseInt(f1.split("_")[1]); - int fileId2 = Integer.parseInt(f2.split("_")[1]); + long t1 = Long.parseLong(f1.substring(0, f1.length() - 3).split(SEPARATOR)[1]); + long t2 = Long.parseLong(f2.substring(0, f2.length() - 3).split(SEPARATOR)[1]); - return fileId1 - fileId2; + return (int)(t1 - t2); }); } static String buildFileName(String prefix) { - DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT_STR); - return prefix + SEPARATOR + dateFormat.format(new Date()) + SUFFIX; + return prefix + SEPARATOR + System.currentTimeMillis() + SUFFIX; } -} +} \ No newline at end of file diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java index 1bb380d660..54394d714f 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java @@ -38,7 +38,7 @@ class DataStream { @Getter private final DataStreamWriter writer; private boolean initialized = false; - DataStream(File directory, int offsetFileMaxSize, int dataFileMaxSize, Parser parser, + DataStream(File directory, int dataFileMaxSize, int offsetFileMaxSize, Parser parser, DataStreamReader.CallBack callBack) { this.directory = directory; this.offsetStream = new OffsetStream(directory, offsetFileMaxSize); diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java index 16b0cd9f9d..f838020ff6 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.library.buffer; import com.google.protobuf.*; import java.io.*; import java.util.concurrent.*; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.PrefixFileFilter; import org.apache.skywalking.apm.util.*; import org.slf4j.*; @@ -36,6 +37,7 @@ class DataStreamReader { private final Offset.ReadOffset readOffset; private final Parser parser; private final CallBack callBack; + private File readingFile; private InputStream inputStream; DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser parser, @@ -49,35 +51,40 @@ class DataStreamReader { void initialize() { preRead(); - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( + Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay( new RunnableWithExceptionProtection(this::read, - t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS); + t -> logger.error("Buffer data pre read failure.", t)), 3, 1, TimeUnit.SECONDS); } private void preRead() { String fileName = readOffset.getFileName(); if (StringUtil.isEmpty(fileName)) { - openInputStream(readEarliestCreateDataFile()); + openInputStream(readEarliestDataFile()); } else { - File dataFile = new File(directory, fileName); - if (dataFile.exists()) { - openInputStream(dataFile); - read(); + File readingFile = new File(directory, fileName); + if (readingFile.exists()) { + openInputStream(readingFile); + try { + inputStream.skip(readOffset.getOffset()); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } } else { - openInputStream(readEarliestCreateDataFile()); + openInputStream(readEarliestDataFile()); } } } - private void openInputStream(File readFile) { + private void openInputStream(File readingFile) { try { - inputStream = new FileInputStream(readFile); + this.readingFile = readingFile; + inputStream = new FileInputStream(readingFile); } catch (FileNotFoundException e) { logger.error(e.getMessage(), e); } } - private File readEarliestCreateDataFile() { + private File readEarliestDataFile() { String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX)); if (fileNames != null && fileNames.length > 0) { @@ -92,12 +99,20 @@ class DataStreamReader { private void read() { try { - MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream); - if (messageType != null) { - callBack.call(messageType); - final int serialized = messageType.getSerializedSize(); - final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; - readOffset.setOffset(readOffset.getOffset() + offset); + if (readOffset.getOffset() == readingFile.length() && !readOffset.isCurrentWriteFile()) { + FileUtils.forceDelete(readingFile); + openInputStream(readEarliestDataFile()); + } + + while (readOffset.getOffset() < readingFile.length()) { + + MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream); + if (messageType != null) { + callBack.call(messageType); + final int serialized = messageType.getSerializedSize(); + final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; + readOffset.setOffset(readOffset.getOffset() + offset); + } } } catch (IOException e) { logger.error(e.getMessage(), e); diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java index 13a88b1a26..cb1a0c6065 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java @@ -49,46 +49,46 @@ class DataStreamWriter { if (!initialized) { String writeFileName = writeOffset.getFileName(); - File dataFile; + File writingFile; if (StringUtil.isEmpty(writeFileName)) { - dataFile = createNewFile(); + writingFile = createNewFile(); } else { - dataFile = new File(directory, writeFileName); - if (!dataFile.exists()) { - dataFile = createNewFile(); + writingFile = new File(directory, writeFileName); + if (!writingFile.exists()) { + writingFile = createNewFile(); } } - outputStream = FileUtils.openOutputStream(dataFile, true); + outputStream = FileUtils.openOutputStream(writingFile, true); initialized = true; } } private File createNewFile() throws IOException { String fileName = BufferFileUtils.buildFileName(BufferFileUtils.DATA_FILE_PREFIX); - File dataFile = new File(directory, fileName); + File writingFile = new File(directory, fileName); - boolean created = dataFile.createNewFile(); + boolean created = writingFile.createNewFile(); if (!created) { - logger.info("The file named {} already exists.", dataFile.getAbsolutePath()); + logger.info("The file named {} already exists.", writingFile.getAbsolutePath()); } else { - logger.info("Create a new buffer data file: {}", dataFile.getAbsolutePath()); + logger.info("Create a new buffer data file: {}", writingFile.getAbsolutePath()); } writeOffset.setOffset(0); - writeOffset.setFileName(dataFile.getName()); + writeOffset.setFileName(writingFile.getName()); - return dataFile; + return writingFile; } - void write(AbstractMessageLite messageLite) { + synchronized void write(AbstractMessageLite messageLite) { try { messageLite.writeDelimitedTo(outputStream); long position = outputStream.getChannel().position(); writeOffset.setOffset(position); - if (position > (FileUtils.ONE_MB * dataFileMaxSize)) { - File dataFile = createNewFile(); - outputStream = FileUtils.openOutputStream(dataFile, true); + if (position >= (FileUtils.ONE_MB * dataFileMaxSize)) { + File writingFile = createNewFile(); + outputStream = FileUtils.openOutputStream(writingFile, true); } } catch (IOException e) { logger.error(e.getMessage(), e); diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java index 09e5936e1e..5119bb9404 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java @@ -31,8 +31,8 @@ class Offset { @Getter private final WriteOffset writeOffset; Offset() { - readOffset = new ReadOffset(); writeOffset = new WriteOffset(); + readOffset = new ReadOffset(writeOffset); } String serialize() { @@ -55,6 +55,15 @@ class Offset { static class ReadOffset { @Getter @Setter private String fileName; @Getter @Setter private long offset = 0; + private final WriteOffset writeOffset; + + private ReadOffset(WriteOffset writeOffset) { + this.writeOffset = writeOffset; + } + + boolean isCurrentWriteFile() { + return fileName.equals(writeOffset.fileName); + } } static class WriteOffset { diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java index 7f46331411..9fb4ed5354 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java @@ -66,8 +66,8 @@ class OffsetStream { if (!initialized) { String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX)); if (fileNames != null && fileNames.length > 0) { - for (int i = 0; i < fileNames.length; i++) { - } + BufferFileUtils.sort(fileNames); + offsetFile = new File(directory, fileNames[0]); } else { offsetFile = newFile(); } diff --git a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java index cff910085c..53eca71f67 100644 --- a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java +++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java @@ -33,9 +33,9 @@ public class BufferStreamTestCase { public static void main(String[] args) throws IOException, InterruptedException { String directory = "/Users/pengys5/code/sky-walking/buffer-test"; BufferStream.Builder builder = new BufferStream.Builder<>(directory); - builder.cleanWhenRestart(true); - builder.dataFileMaxSize(1); - builder.offsetFileMaxSize(1); +// builder.cleanWhenRestart(true); + builder.dataFileMaxSize(50); + builder.offsetFileMaxSize(10); builder.parser(TraceSegmentObject.parser()); builder.callBack(new SegmentParse()); @@ -44,18 +44,23 @@ public class BufferStreamTestCase { TimeUnit.SECONDS.sleep(5); - String str = "2018-08-27 11:59:45,261 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" + - "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" + - "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" + - "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28"; + StringBuilder str = new StringBuilder("2018-08-27 11:59:45,261 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28"); + for (int i = 0; i < 1000; i++) { + str.append("main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28"); + } - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 20000; i++) { TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder(); SpanObject.Builder span = SpanObject.newBuilder(); - span.setOperationName(String.valueOf(i) + " " + str); + span.setSpanId(i); + span.setOperationName(str.toString()); segment.addSpans(span); stream.write(segment.build()); + + if (i % 1000 == 0) { + TimeUnit.MILLISECONDS.sleep(50); + } } } @@ -63,7 +68,7 @@ public class BufferStreamTestCase { private static class SegmentParse implements DataStreamReader.CallBack { @Override public void call(TraceSegmentObject message) { - logger.info("segment parse: {}", message.getSpans(0).getOperationName()); + logger.info("segment parse: {}", message.getSpans(0).getSpanId()); } } } diff --git a/skywalking-ui b/skywalking-ui index ad3ee45dba..f9c602936a 160000 --- a/skywalking-ui +++ b/skywalking-ui @@ -1 +1 @@ -Subproject commit ad3ee45dbadfae35d77238bdd7a1df593158f109 +Subproject commit f9c602936ab4f386576bf16f203efac61962e424 -- GitLab