未验证 提交 6070738a 编写于 作者: D dailidong 提交者: GitHub

Support worker server to run bat script (#2023)

* Support worker server to run bat script

1. Reimplement ProcessImpl.java, ProcessEnvironment.java and ProcessBuilder.java for Windows
2. Modify shell task code for windows
3. Add ASF License

* Add Unit Test
上级 1f92b4c4
......@@ -746,7 +746,7 @@ public final class Constants {
* application regex
*/
public static final String APPLICATION_REGEX = "application_\\d+_\\d+";
public static final String PID = "pid";
public static final String PID = OSUtils.isWindows() ? "handle" : "pid";
/**
* month_begin
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils.process;
import com.sun.jna.platform.win32.Kernel32Util;
import java.util.*;
final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private static final long serialVersionUID = -8017839552603542824L;
private static String validateName(String name) {
// An initial `=' indicates a magic Windows variable name -- OK
if (name.indexOf('=', 1) != -1 ||
name.indexOf('\u0000') != -1)
throw new IllegalArgumentException
("Invalid environment variable name: \"" + name + "\"");
return name;
}
private static String validateValue(String value) {
if (value.indexOf('\u0000') != -1)
throw new IllegalArgumentException
("Invalid environment variable value: \"" + value + "\"");
return value;
}
private static String nonNullString(Object o) {
if (o == null)
throw new NullPointerException();
return (String) o;
}
public String put(String key, String value) {
return super.put(validateName(key), validateValue(value));
}
public String get(Object key) {
return super.get(nonNullString(key));
}
public boolean containsKey(Object key) {
return super.containsKey(nonNullString(key));
}
public boolean containsValue(Object value) {
return super.containsValue(nonNullString(value));
}
public String remove(Object key) {
return super.remove(nonNullString(key));
}
private static class CheckedEntry implements Entry<String,String> {
private final Entry<String,String> e;
public CheckedEntry(Entry<String,String> e) {this.e = e;}
public String getKey() { return e.getKey();}
public String getValue() { return e.getValue();}
public String setValue(String value) {
return e.setValue(validateValue(value));
}
public String toString() { return getKey() + "=" + getValue();}
public boolean equals(Object o) {return e.equals(o);}
public int hashCode() {return e.hashCode();}
}
private static class CheckedEntrySet extends AbstractSet<Entry<String,String>> {
private final Set<Entry<String,String>> s;
public CheckedEntrySet(Set<Entry<String,String>> s) {this.s = s;}
public int size() {return s.size();}
public boolean isEmpty() {return s.isEmpty();}
public void clear() { s.clear();}
public Iterator<Entry<String,String>> iterator() {
return new Iterator<Entry<String,String>>() {
Iterator<Entry<String,String>> i = s.iterator();
public boolean hasNext() { return i.hasNext();}
public Entry<String,String> next() {
return new CheckedEntry(i.next());
}
public void remove() { i.remove();}
};
}
private static Entry<String,String> checkedEntry(Object o) {
@SuppressWarnings("unchecked")
Entry<String,String> e = (Entry<String,String>) o;
nonNullString(e.getKey());
nonNullString(e.getValue());
return e;
}
public boolean contains(Object o) {return s.contains(checkedEntry(o));}
public boolean remove(Object o) {return s.remove(checkedEntry(o));}
}
private static class CheckedValues extends AbstractCollection<String> {
private final Collection<String> c;
public CheckedValues(Collection<String> c) {this.c = c;}
public int size() {return c.size();}
public boolean isEmpty() {return c.isEmpty();}
public void clear() { c.clear();}
public Iterator<String> iterator() {return c.iterator();}
public boolean contains(Object o) {return c.contains(nonNullString(o));}
public boolean remove(Object o) {return c.remove(nonNullString(o));}
}
private static class CheckedKeySet extends AbstractSet<String> {
private final Set<String> s;
public CheckedKeySet(Set<String> s) {this.s = s;}
public int size() {return s.size();}
public boolean isEmpty() {return s.isEmpty();}
public void clear() { s.clear();}
public Iterator<String> iterator() {return s.iterator();}
public boolean contains(Object o) {return s.contains(nonNullString(o));}
public boolean remove(Object o) {return s.remove(nonNullString(o));}
}
public Set<String> keySet() {
return new CheckedKeySet(super.keySet());
}
public Collection<String> values() {
return new CheckedValues(super.values());
}
public Set<Entry<String,String>> entrySet() {
return new CheckedEntrySet(super.entrySet());
}
private static final class NameComparator implements Comparator<String> {
public int compare(String s1, String s2) {
// We can't use String.compareToIgnoreCase since it
// canonicalizes to lower case, while Windows
// canonicalizes to upper case! For example, "_" should
// sort *after* "Z", not before.
int n1 = s1.length();
int n2 = s2.length();
int min = Math.min(n1, n2);
for (int i = 0; i < min; i++) {
char c1 = s1.charAt(i);
char c2 = s2.charAt(i);
if (c1 != c2) {
c1 = Character.toUpperCase(c1);
c2 = Character.toUpperCase(c2);
if (c1 != c2)
// No overflow because of numeric promotion
return c1 - c2;
}
}
return n1 - n2;
}
}
private static final class EntryComparator implements Comparator<Entry<String,String>> {
public int compare(Entry<String,String> e1,
Entry<String,String> e2) {
return nameComparator.compare(e1.getKey(), e2.getKey());
}
}
// Allow `=' as first char in name, e.g. =C:=C:\DIR
static final int MIN_NAME_LENGTH = 1;
private static final NameComparator nameComparator;
private static final EntryComparator entryComparator;
private static final ProcessEnvironmentForWin32 theEnvironment;
private static final Map<String,String> theUnmodifiableEnvironment;
private static final Map<String,String> theCaseInsensitiveEnvironment;
static {
nameComparator = new NameComparator();
entryComparator = new EntryComparator();
theEnvironment = new ProcessEnvironmentForWin32();
theUnmodifiableEnvironment = Collections.unmodifiableMap(theEnvironment);
theEnvironment.putAll(environmentBlock());
theCaseInsensitiveEnvironment = new TreeMap<>(nameComparator);
theCaseInsensitiveEnvironment.putAll(theEnvironment);
}
private ProcessEnvironmentForWin32() {
super();
}
private ProcessEnvironmentForWin32(int capacity) {
super(capacity);
}
// Only for use by System.getenv(String)
static String getenv(String name) {
// The original implementation used a native call to _wgetenv,
// but it turns out that _wgetenv is only consistent with
// GetEnvironmentStringsW (for non-ASCII) if `wmain' is used
// instead of `main', even in a process created using
// CREATE_UNICODE_ENVIRONMENT. Instead we perform the
// case-insensitive comparison ourselves. At least this
// guarantees that System.getenv().get(String) will be
// consistent with System.getenv(String).
return theCaseInsensitiveEnvironment.get(name);
}
// Only for use by System.getenv()
static Map<String,String> getenv() {
return theUnmodifiableEnvironment;
}
// Only for use by ProcessBuilder.environment()
@SuppressWarnings("unchecked")
static Map<String,String> environment() {
return (Map<String,String>) theEnvironment.clone();
}
// Only for use by ProcessBuilder.environment(String[] envp)
static Map<String,String> emptyEnvironment(int capacity) {
return new ProcessEnvironmentForWin32(capacity);
}
private static Map<String, String> environmentBlock() {
return Kernel32Util.getEnvironmentVariables();
}
// Only for use by ProcessImpl.start()
String toEnvironmentBlock() {
// Sort Unicode-case-insensitively by name
List<Entry<String,String>> list = new ArrayList<>(entrySet());
Collections.sort(list, entryComparator);
StringBuilder sb = new StringBuilder(size()*30);
int cmp = -1;
// Some versions of MSVCRT.DLL require SystemRoot to be set.
// So, we make sure that it is always set, even if not provided
// by the caller.
final String SYSTEMROOT = "SystemRoot";
for (Entry<String,String> e : list) {
String key = e.getKey();
String value = e.getValue();
if (cmp < 0 && (cmp = nameComparator.compare(key, SYSTEMROOT)) > 0) {
// Not set, so add it here
addToEnvIfSet(sb, SYSTEMROOT);
}
addToEnv(sb, key, value);
}
if (cmp < 0) {
// Got to end of list and still not found
addToEnvIfSet(sb, SYSTEMROOT);
}
if (sb.length() == 0) {
// Environment was empty and SystemRoot not set in parent
sb.append('\u0000');
}
// Block is double NUL terminated
sb.append('\u0000');
return sb.toString();
}
// add the environment variable to the child, if it exists in parent
private static void addToEnvIfSet(StringBuilder sb, String name) {
String s = getenv(name);
if (s != null)
addToEnv(sb, name, s);
}
private static void addToEnv(StringBuilder sb, String name, String val) {
sb.append(name).append('=').append(val).append('\u0000');
}
static String toEnvironmentBlock(Map<String,String> map) {
return map == null ? null : ((ProcessEnvironmentForWin32)map).toEnvironmentBlock();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.junit.Assert;
import org.junit.Test;
/**
* Constants Test
*/
public class ConstantsTest {
/**
* Test PID via env
*/
@Test
public void testPID() {
if (OSUtils.isWindows()) {
Assert.assertEquals(Constants.PID, "handle");
} else {
Assert.assertEquals(Constants.PID, "pid");
}
}
}
......@@ -39,16 +39,20 @@ public class OSUtilsTest {
@Test
public void testOSMetric(){
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
Assert.assertTrue(availablePhysicalMemorySize > 0.0f);
double totalMemorySize = OSUtils.totalMemorySize();
Assert.assertTrue(totalMemorySize > 0.0f);
double loadAverage = OSUtils.loadAverage();
logger.info("loadAverage {}", loadAverage);
double memoryUsage = OSUtils.memoryUsage();
Assert.assertTrue(memoryUsage > 0.0f);
double cpuUsage = OSUtils.cpuUsage();
Assert.assertTrue(cpuUsage > 0.0f);
if (!OSUtils.isWindows()) {
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
Assert.assertTrue(availablePhysicalMemorySize > 0.0f);
double totalMemorySize = OSUtils.totalMemorySize();
Assert.assertTrue(totalMemorySize > 0.0f);
double loadAverage = OSUtils.loadAverage();
logger.info("loadAverage {}", loadAverage);
double memoryUsage = OSUtils.memoryUsage();
Assert.assertTrue(memoryUsage > 0.0f);
double cpuUsage = OSUtils.cpuUsage();
Assert.assertTrue(cpuUsage > 0.0f);
} else {
// TODO window ut
}
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils.process;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest(OSUtils.class)
public class ProcessBuilderForWin32Test {
private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class);
@Before
public void before() {
PowerMockito.mockStatic(OSUtils.class);
PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
}
@Test
public void testCreateProcessBuilderForWin32() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
Assert.assertNotNull(builder);
builder = new ProcessBuilderForWin32("net");
Assert.assertNotNull(builder);
builder = new ProcessBuilderForWin32(Collections.singletonList("net"));
Assert.assertNotNull(builder);
builder = new ProcessBuilderForWin32((List<String>) null);
Assert.assertNotNull(builder);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testBuildUser() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
builder.user("test", StringUtils.EMPTY);
Assert.assertNotNull(builder);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testBuildCommand() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
builder.command(Collections.singletonList("net"));
Assert.assertNotEquals(0, builder.command().size());
builder = new ProcessBuilderForWin32();
builder.command("net");
Assert.assertNotEquals(0, builder.command().size());
builder = new ProcessBuilderForWin32();
builder.command((List<String>) null);
Assert.assertNotEquals(0, builder.command().size());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testEnvironment() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
Assert.assertNotNull(builder.environment());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
builder.environment(new String[]{ "a=123" });
Assert.assertNotEquals(0, builder.environment().size());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testDirectory() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
builder.directory(new File("/tmp"));
Assert.assertNotNull(builder.directory());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testStream() {
try {
InputStream in = ProcessBuilderForWin32.NullInputStream.INSTANCE;
Assert.assertNotNull(in);
Assert.assertEquals(-1, in.read());
Assert.assertEquals(0, in.available());
OutputStream out = ProcessBuilderForWin32.NullOutputStream.INSTANCE;
Assert.assertNotNull(out);
out.write(new byte[] {1});
} catch (Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testRedirect() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
builder.redirectInput(new File("/tmp"));
Assert.assertNotNull(builder.redirectInput());
Assert.assertNotNull(builder.redirectInput().file());
builder.redirectOutput(new File("/tmp"));
Assert.assertNotNull(builder.redirectOutput());
Assert.assertNotNull(builder.redirectOutput().file());
builder.redirectError(new File("/tmp"));
Assert.assertNotNull(builder.redirectError());
Assert.assertNotNull(builder.redirectError().file());
builder.redirectInput(builder.redirectOutput());
builder.redirectOutput(builder.redirectInput());
builder.redirectError(builder.redirectInput());
Assert.assertNotNull(ProcessBuilderForWin32.Redirect.PIPE.type());
Assert.assertNotNull(ProcessBuilderForWin32.Redirect.PIPE.toString());
Assert.assertNotNull(ProcessBuilderForWin32.Redirect.INHERIT.type());
Assert.assertNotNull(ProcessBuilderForWin32.Redirect.INHERIT.toString());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testRedirectErrorStream() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
builder.redirectErrorStream(true);
Assert.assertTrue(builder.redirectErrorStream());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void runCmdViaUser() {
try {
ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
builder.user("test123", StringUtils.EMPTY);
List<String> commands = new ArrayList<>();
commands.add("cmd.exe");
commands.add("/c");
commands.add("net user");
builder.command(commands);
Process process = builder.start();
BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("GBK")));
String line;
StringBuilder sb = new StringBuilder();
while ((line = inReader.readLine()) != null) {
sb.append(line);
}
logger.info("net user: {}", sb.toString());
Assert.assertNotEquals(StringUtils.EMPTY, sb.toString());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils.process;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@RunWith(PowerMockRunner.class)
@PrepareForTest({OSUtils.class, ProcessEnvironmentForWin32.class})
public class ProcessEnvironmentForWin32Test {
private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class);
@Before
public void before() {
try {
PowerMockito.mockStatic(OSUtils.class);
PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testPutAndGet() {
try {
ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
processEnvironmentForWin32.put("a", "123");
Assert.assertEquals("123", processEnvironmentForWin32.get("a"));
Assert.assertTrue(processEnvironmentForWin32.containsKey("a"));
Assert.assertTrue(processEnvironmentForWin32.containsValue("123"));
Assert.assertEquals("123", processEnvironmentForWin32.remove("a"));
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
try {
ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
processEnvironmentForWin32.put("b=", "123");
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
try {
ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
processEnvironmentForWin32.put("b", "\u0000");
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
try {
ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
processEnvironmentForWin32.get(null);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testEntrySet() {
try {
ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
processEnvironmentForWin32.clear();
processEnvironmentForWin32.put("a", "123");
Assert.assertEquals(0, processEnvironmentForWin32.entrySet().size());
Assert.assertTrue(processEnvironmentForWin32.entrySet().isEmpty());
for (Map.Entry<String, String> entry : processEnvironmentForWin32.entrySet()) {
Assert.assertNotNull(entry);
Assert.assertNotNull(entry.getKey());
Assert.assertNotNull(entry.getValue());
Assert.assertNotNull(entry.setValue("123"));
}
processEnvironmentForWin32.clear();
Set<String> keys = processEnvironmentForWin32.keySet();
Assert.assertEquals(0, keys.size());
Assert.assertTrue(keys.isEmpty());
processEnvironmentForWin32.clear();
Collection<String> values = processEnvironmentForWin32.values();
Assert.assertEquals(0, keys.size());
Assert.assertTrue(keys.isEmpty());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testToEnvironmentBlock() {
try {
ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
Assert.assertNotNull(processEnvironmentForWin32.toEnvironmentBlock());
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils.process;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.security.action.GetPropertyAction;
@RunWith(PowerMockRunner.class)
@PrepareForTest({OSUtils.class, GetPropertyAction.class})
public class ProcessImplForWin32Test {
private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class);
@Before
public void before() {
PowerMockito.mockStatic(OSUtils.class);
PowerMockito.mockStatic(GetPropertyAction.class);
PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
}
@Test
public void testStart() {
try {
Process process = ProcessImplForWin32.start(
"test123", StringUtils.EMPTY, new String[]{"net"},
null, null, null, false);
Assert.assertNotNull(process);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
try {
Process process = ProcessImplForWin32.start(
"test123", StringUtils.EMPTY, new String[]{"net"},
null, null, new ProcessBuilderForWin32.Redirect[]{
ProcessBuilderForWin32.Redirect.PIPE,
ProcessBuilderForWin32.Redirect.PIPE,
ProcessBuilderForWin32.Redirect.PIPE
}, false);
Assert.assertNotNull(process);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
}
......@@ -16,21 +16,29 @@
*/
package org.apache.dolphinscheduler.server.worker.task;
import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinNT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.process.ProcessBuilderForWin32;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.io.*;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
......@@ -194,26 +202,49 @@ public abstract class AbstractCommandExecutor {
* @throws IOException IO Exception
*/
private void buildProcess(String commandFile) throws IOException {
// command list
List<String> command = new ArrayList<>();
//init process builder
ProcessBuilder processBuilder = new ProcessBuilder();
// setting up a working directory
processBuilder.directory(new File(taskDir));
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
List<String> command = new LinkedList<>();
command.add("sudo");
command.add("-u");
command.add(tenantCode);
command.add(commandInterpreter());
command.addAll(commandOptions());
command.add(commandFile);
processBuilder.command(command);
process = processBuilder.start();
if (OSUtils.isWindows()) {
ProcessBuilderForWin32 processBuilder = new ProcessBuilderForWin32();
// setting up a working directory
processBuilder.directory(new File(taskDir));
processBuilder.user(tenantCode, StringUtils.EMPTY);
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
command.add(commandInterpreter());
command.add("/c");
command.addAll(commandOptions());
command.add(commandFile);
// setting commands
processBuilder.command(command);
process = processBuilder.start();
} else {
ProcessBuilder processBuilder = new ProcessBuilder();
// setting up a working directory
processBuilder.directory(new File(taskDir));
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
command.add("sudo");
command.add("-u");
command.add(tenantCode);
command.add(commandInterpreter());
command.addAll(commandOptions());
command.add(commandFile);
// setting commands
processBuilder.command(command);
process = processBuilder.start();
}
// print command
printCommand(processBuilder);
printCommand(command);
}
/**
......@@ -320,13 +351,13 @@ public abstract class AbstractCommandExecutor {
/**
* print command
* @param processBuilder process builder
* @param command command
*/
private void printCommand(ProcessBuilder processBuilder) {
private void printCommand(List<String> command) {
String cmdStr;
try {
cmdStr = ProcessUtils.buildCommandStr(processBuilder.command());
cmdStr = ProcessUtils.buildCommandStr(command);
logger.info("task run command:\n{}", cmdStr);
} catch (IOException e) {
logger.error(e.getMessage(), e);
......@@ -358,7 +389,11 @@ public abstract class AbstractCommandExecutor {
BufferedReader inReader = null;
try {
inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
if (OSUtils.isWindows()) {
inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("GBK")));
} else {
inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
}
String line;
long lastFlushTime = System.currentTimeMillis();
......@@ -406,7 +441,7 @@ public abstract class AbstractCommandExecutor {
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
} catch (Exception e) {
logger.error("yarn applications: {} status failed ", appIds,e);
result = false;
......@@ -510,12 +545,15 @@ public abstract class AbstractCommandExecutor {
*/
private int getProcessId(Process process) {
int processId = 0;
try {
Field f = process.getClass().getDeclaredField(Constants.PID);
f.setAccessible(true);
processId = f.getInt(process);
if (OSUtils.isWindows()) {
WinNT.HANDLE handle = (WinNT.HANDLE) f.get(process);
processId = Kernel32.INSTANCE.GetProcessId(handle);
} else {
processId = f.getInt(process);
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
......
......@@ -17,11 +17,12 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
......@@ -34,10 +35,15 @@ import java.util.function.Consumer;
public class ShellCommandExecutor extends AbstractCommandExecutor {
/**
* sh
* For Unix-like, using sh
*/
public static final String SH = "sh";
/**
* For Windows, using cmd.exe
*/
public static final String CMD = "cmd.exe";
/**
* constructor
* @param logHandler log handler
......@@ -66,7 +72,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
@Override
protected String buildCommandFilePath() {
// command file
return String.format("%s/%s.command", taskDir, taskAppId);
return String.format("%s/%s.%s", taskDir, taskAppId, OSUtils.isWindows() ? "bat" : "command");
}
/**
......@@ -75,7 +81,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
*/
@Override
protected String commandInterpreter() {
return SH;
return OSUtils.isWindows() ? CMD : SH;
}
/**
......@@ -103,21 +109,26 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
logger.info("create command file:{}", commandFile);
StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");
if (envFile != null) {
sb.append("source " + envFile + "\n");
if (OSUtils.isWindows()) {
sb.append("@echo off\n");
sb.append("cd /d %~dp0\n");
if (envFile != null) {
sb.append("call ").append(envFile).append("\n");
}
} else {
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");
if (envFile != null) {
sb.append("source ").append(envFile).append("\n");
}
}
sb.append("\n\n");
sb.append(execCommand);
logger.info("command : {}",sb.toString());
logger.info("command : {}", sb.toString());
// write data to file
FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
Charset.forName("UTF-8"));
FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
}
}
......
......@@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
......@@ -338,7 +339,7 @@ public class DataxTask extends AbstractTask {
private String buildShellCommandFile(String jobConfigFilePath)
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId());
String fileName = String.format("%s/%s_node.%s", taskDir, taskProps.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
......@@ -370,7 +371,13 @@ public class DataxTask extends AbstractTask {
// create shell command file
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
Files.createFile(path, attr);
if (OSUtils.isWindows()) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
}
Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
return fileName;
......
......@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
......@@ -123,7 +124,7 @@ public class ShellTask extends AbstractTask {
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId());
String fileName = String.format("%s/%s_node.%s", taskDir, taskProps.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
......@@ -154,7 +155,11 @@ public class ShellTask extends AbstractTask {
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
Files.createFile(path, attr);
if (OSUtils.isWindows()) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
}
Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
......
......@@ -21,7 +21,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ProcessUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessUtilsTest.class);
@Test
......@@ -30,4 +35,16 @@ public class ProcessUtilsTest {
Assert.assertNotEquals("The child process of process 1 should not be empty", pidList, "");
logger.info("Sub process list : {}", pidList);
}
@Test
public void testBuildCommandStr() {
List<String> commands = new ArrayList<>();
commands.add("sudo");
try {
Assert.assertEquals(ProcessUtils.buildCommandStr(commands), "sudo");
} catch (IOException e) {
Assert.fail(e.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.shell;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.Date;
/**
* shell task test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(OSUtils.class)
public class ShellTaskTest {
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
private ShellTask shellTask;
private ProcessService processService;
private ShellCommandExecutor shellCommandExecutor;
private ApplicationContext applicationContext;
@Before
public void before() throws Exception {
PowerMockito.mockStatic(OSUtils.class);
processService = PowerMockito.mock(ProcessService.class);
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
applicationContext = PowerMockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams("{\"rawScript\": \" echo 'hello world!'\"}");
shellTask = new ShellTask(props, logger);
shellTask.init();
PowerMockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
PowerMockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
PowerMockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
String fileName = String.format("%s/%s_node.%s", props.getTaskDir(), props.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
PowerMockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0);
}
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
dataSource.setConnectionParams(
"{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}");
dataSource.setUserId(1);
return dataSource;
}
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.START_PROCESS);
processInstance.setScheduleTime(new Date());
return processInstance;
}
@After
public void after() {}
/**
* Method: ShellTask()
*/
@Test
public void testShellTask()
throws Exception {
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
ShellTask shellTaskTest = new ShellTask(props, logger);
Assert.assertNotNull(shellTaskTest);
}
/**
* Method: init for Unix-like
*/
@Test
public void testInitForUnix() {
try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
shellTask.init();
Assert.assertTrue(true);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
/**
* Method: init for Windows
*/
@Test
public void testInitForWindows() {
try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
shellTask.init();
Assert.assertTrue(true);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
/**
* Method: handle() for Unix-like
*/
@Test
public void testHandleForUnix() throws Exception {
try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
shellTask.handle();
Assert.assertTrue(true);
} catch (Error | Exception e) {
if (!e.getMessage().contains("process error . exitCode is : -1")
&& !System.getProperty("os.name").startsWith("Windows")) {
logger.error(e.getMessage());
}
}
}
/**
* Method: handle() for Windows
*/
@Test
public void testHandleForWindows() throws Exception {
try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
shellTask.handle();
Assert.assertTrue(true);
} catch (Error | Exception e) {
if (!e.getMessage().contains("process error . exitCode is : -1")) {
logger.error(e.getMessage());
}
}
}
/**
* Method: cancelApplication()
*/
@Test
public void testCancelApplication() throws Exception {
try {
shellTask.cancelApplication(true);
Assert.assertTrue(true);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
}
......@@ -684,6 +684,9 @@
<configuration>
<includes>
<include>**/common/utils/*.java</include>
<include>**/common/utils/process/ProcessBuilderForWin32Test.java</include>
<include>**/common/utils/process/ProcessEnvironmentForWin32Test.java</include>
<include>**/common/utils/process/ProcessImplForWin32Test.java</include>
<include>**/common/log/*.java</include>
<include>**/common/threadutils/*.java</include>
<include>**/common/graph/*.java</include>
......@@ -732,6 +735,7 @@
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册