diff --git a/test/jdk/jfr/api/consumer/streaming/TestCrossProcessStreaming.java b/test/jdk/jfr/api/consumer/streaming/TestCrossProcessStreaming.java new file mode 100644 index 0000000000000000000000000000000000000000..ee0f2c16f017be17971f9fe49da16ed4369d4e4a --- /dev/null +++ b/test/jdk/jfr/api/consumer/streaming/TestCrossProcessStreaming.java @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.api.consumer.streaming; + +import static jdk.test.lib.Asserts.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.sun.tools.attach.VirtualMachine; +import jdk.jfr.Event; +import jdk.jfr.Recording; +import jdk.jfr.consumer.EventStream; +import jdk.test.lib.Asserts; +import jdk.test.lib.process.ProcessTools; + +/** + * @test + * @summary Test scenario where JFR event producer is in a different process + * with respect to the JFR event stream consumer. + * @key jfr + * @library /lib / + * @modules jdk.attach + * jdk.jfr + * @run main jdk.jfr.api.consumer.streaming.TestCrossProcessStreaming + */ + +public class TestCrossProcessStreaming { + static String MAIN_STARTED_TOKEN = "MAIN_STARTED"; + + public static class TestEvent extends Event { + } + + public static class ResultEvent extends Event { + int nrOfEventsProduced; + } + + public static class EventProducer { + public static void main(String... args) throws Exception { + Path pidPath = Paths.get(args[1]); + writeString(pidPath, ProcessTools.getProcessId() + "@"); + + CrossProcessSynchronizer sync = new CrossProcessSynchronizer(); + log(MAIN_STARTED_TOKEN); + + long pid = ProcessTools.getProcessId(); + int nrOfEvents = 0; + boolean exitRequested = false; + while (!exitRequested) { + TestEvent e = new TestEvent(); + e.commit(); + nrOfEvents++; + if (nrOfEvents % 1000 == 0) { + Thread.sleep(100); + exitRequested = CrossProcessSynchronizer.exitRequested(pid); + } + } + + ResultEvent re = new ResultEvent(); + re.nrOfEventsProduced = nrOfEvents; + re.commit(); + + log("Number of TestEvents generated: " + nrOfEvents); + } + } + + + static class CrossProcessSynchronizer { + static void requestExit(long pid) throws Exception { + Files.createFile(file(pid)); + } + + static boolean exitRequested(long pid) throws Exception { + return Files.exists(file(pid)); + } + + static Path file(long pid) { + return Paths.get(".", "exit-requested-" + pid); + } + } + + + static class ConsumedEvents { + AtomicInteger total = new AtomicInteger(0); + AtomicInteger whileProducerAlive = new AtomicInteger(0); + AtomicInteger produced = new AtomicInteger(-1); + } + + private static String readString(Path path) throws IOException { + try (InputStream in = new FileInputStream(path.toFile())) { + byte[] bytes = new byte[32]; + int length = in.read(bytes); + assertTrue(length < bytes.length, "bytes array to small"); + if (length == -1) { + return null; + } + return new String(bytes, 0, length); + } + } + + private static void writeString(Path path, String content) throws IOException { + try (OutputStream out = new FileOutputStream(path.toFile())) { + out.write(content.getBytes()); + } + } + + public static void main(String... args) throws Exception { + Process p = startProducerProcess("normal"); + String repo = getJfrRepository(p); + + ConsumedEvents ce = consumeEvents(p, repo); + + p.waitFor(); + Asserts.assertEquals(p.exitValue(), 0, + "Process exited abnormally, exitValue = " + p.exitValue()); + + Asserts.assertEquals(ce.total.get(), ce.produced.get(), "Some events were lost"); + + // Expected that some portion of events emitted by the producer are delivered + // to the consumer while producer is still alive, at least one event for certain. + Asserts.assertLTE(1, ce.whileProducerAlive.get(), + "Too few events are delivered while producer is alive"); + } + + private static long pid; + + static Process startProducerProcess(String extraParam) throws Exception { + Path pidPath = Paths.get("pid-" + System.currentTimeMillis()).toAbsolutePath(); + ProcessBuilder pb = + ProcessTools.createJavaProcessBuilder(false, + "-XX:StartFlightRecording", + EventProducer.class.getName(), + extraParam, + pidPath.toString()); + Process p = ProcessTools.startProcess("Event-Producer", pb, + line -> line.equals(MAIN_STARTED_TOKEN), + 0, TimeUnit.SECONDS); + + do { + Thread.sleep(10); + } while (!pidPath.toFile().exists()); + + String pidStr; + do { + pidStr = readString(pidPath); + } while (pidStr == null || !pidStr.endsWith("@")); + + pid = Long.valueOf(pidStr.substring(0, pidStr.length()-1)); + return p; + } + + static String getJfrRepository(Process p) throws Exception { + String repo = null; + + // It may take little bit of time for the observed process to set the property after + // the process starts, therefore read the property in a loop. + while (repo == null) { + VirtualMachine vm = VirtualMachine.attach(String.valueOf(pid)); + repo = vm.getSystemProperties().getProperty("jdk.jfr.repository"); + vm.detach(); + } + + log("JFR repository = " + repo); + return repo; + } + + static ConsumedEvents consumeEvents(Process p, String repo) throws Exception { + ConsumedEvents result = new ConsumedEvents(); + + // wait for couple of JFR stream flushes before concluding the test + CountDownLatch flushed = new CountDownLatch(2); + + // consume events produced by another process via event stream + try (EventStream es = EventStream.openRepository(Paths.get(repo))) { + es.onEvent(TestEvent.class.getName(), + e -> { + result.total.incrementAndGet(); + if (p.isAlive()) { + result.whileProducerAlive.incrementAndGet(); + } + }); + + es.onEvent(ResultEvent.class.getName(), + e -> result.produced.set(e.getInt("nrOfEventsProduced"))); + + es.onFlush( () -> flushed.countDown() ); + + // Setting start time to the beginning of the Epoch is a good way to start + // reading the stream from the very beginning. + es.setStartTime(Instant.EPOCH); + es.startAsync(); + + // await for certain number of flush events before concluding the test case + flushed.await(); + CrossProcessSynchronizer.requestExit(pid); + + es.awaitTermination(); + } + + return result; + } + + private static final void log(String msg) { + System.out.println(msg); + } +}