提交 1494ab2d 编写于 作者: M Mike Roth

Revert "gpstart: fix OOM issue"

This reverts commit a0fcfc37.
上级 59abec44
......@@ -359,7 +359,7 @@ class SegmentStart(Command):
all of the segments on a specified GpHost.
"""
def __init__(self, name, gpdb, numContentsInCluster, era, mirrormode, master_heap_checksum_value,
def __init__(self, name, gpdb, numContentsInCluster, era, mirrormode,
utilityMode=False, ctxt=LOCAL, remoteHost=None,
noWait=False, timeout=SEGMENT_TIMEOUT_DEFAULT,
specialMode=None, wrapper=None, wrapper_args=None):
......@@ -706,8 +706,6 @@ SEGSTART_ERROR_SERVER_DID_NOT_RESPOND = 7
SEGSTART_ERROR_PG_CTL_FAILED = 8
SEGSTART_ERROR_CHECKING_CONNECTION_AND_LOCALE_FAILED = 9
SEGSTART_ERROR_PING_FAILED = 10 # not actually done inside GpSegStartCmd, done instead by caller
SEGSTART_ERROR_PG_CONTROLDATA_FAILED = 11
SEGSTART_ERROR_CHECKSUM_MISMATCH = 12
SEGSTART_ERROR_OTHER = 1000
......@@ -720,7 +718,7 @@ class GpSegStartArgs(CmdArgs):
"$GPHOME/sbin/gpsegstart.py -M mirrorless -V 'gpversion' -n 1 --era 123 -t 600"
"""
def __init__(self, mirrormode, gpversion, num_cids, era, master_checksum_value, timeout):
def __init__(self, mirrormode, gpversion, num_cids, era, timeout):
"""
@param mirrormode - mirror start mode (START_AS_PRIMARY_OR_MIRROR or START_AS_MIRRORLESS)
@param gpversion - version (from postgres --gp-version)
......@@ -728,19 +726,14 @@ class GpSegStartArgs(CmdArgs):
@param era - master era
@param timeout - seconds to wait before giving up
"""
default_args = [
CmdArgs.__init__(self, [
"$GPHOME/sbin/gpsegstart.py",
"-M", str(mirrormode),
"-V '%s'" % gpversion,
"-n", str(num_cids),
"--era", str(era),
"-t", str(timeout)
]
if master_checksum_value != None:
default_args.append("--master-checksum-version")
default_args.append(str(master_checksum_value))
CmdArgs.__init__(self, default_args)
])
def set_special(self, special):
"""
......@@ -765,7 +758,7 @@ class GpSegStartArgs(CmdArgs):
class GpSegStartCmd(Command):
def __init__(self, name, gphome, segments, gpversion,
mirrormode, numContentsInCluster, era, master_checksum_value,
mirrormode, numContentsInCluster, era,
timeout=SEGMENT_TIMEOUT_DEFAULT, verbose=False,
ctxt=LOCAL, remoteHost=None, pickledTransitionData=None,
specialMode=None, wrapper=None, wrapper_args=None,
......@@ -775,7 +768,7 @@ class GpSegStartCmd(Command):
self.dblist = [x for x in segments]
# build gpsegstart command string
c = GpSegStartArgs(mirrormode, gpversion, numContentsInCluster, era, master_checksum_value,timeout)
c = GpSegStartArgs(mirrormode, gpversion, numContentsInCluster, era, timeout)
c.set_verbose(verbose)
c.set_special(specialMode)
c.set_transition(pickledTransitionData)
......
......@@ -262,6 +262,3 @@ class PgControlData(Command):
(n,v) = l.split(':', 1)
self.data[n.strip()] = v.strip()
return self.data[name]
def get_datadir(self):
return self.datadir
......@@ -72,7 +72,7 @@ class StartSegmentsOperation:
"""
def __init__(self, workerPool, quiet, gpVersion,
gpHome, masterDataDirectory, master_checksum_value, timeout=SEGMENT_TIMEOUT_DEFAULT,
gpHome, masterDataDirectory, timeout=SEGMENT_TIMEOUT_DEFAULT,
specialMode=None, wrapper=None, wrapper_args=None,
logfileDirectory=False):
checkNotNone("workerPool", workerPool)
......@@ -86,7 +86,6 @@ class StartSegmentsOperation:
self.__specialMode = specialMode
self.__wrapper = wrapper
self.__wrapper_args = wrapper_args
self.master_checksum_value = master_checksum_value
self.logfileDirectory = logfileDirectory
def startSegments(self, gpArray, segments, startMethod, era):
......@@ -205,7 +204,6 @@ class StartSegmentsOperation:
mirroringModePreTransition,
numContentsInCluster,
era,
self.master_checksum_value,
self.__timeout,
verbose=logging_is_verbose(),
ctxt=base.REMOTE,
......
import sys, os
import gpsegstart
from mock import Mock, patch
from gppylib.test.unit.gp_unittest import GpTestCase
from gppylib.commands.base import CommandResult
from gppylib.commands.pg import PgControlData
from gppylib.commands import gp
class GpSegStart(GpTestCase):
def setUp(self):
self.subject = gpsegstart
self.subject.logger = Mock(
spec=['log', 'warn', 'info', 'debug', 'error', 'warning', 'fatal', 'warning_to_file_only'])
local_version = gp.GpVersion.local('local GP software version check', gp.get_gphome())
self.failing_data_dir = "/doesnt/exist/datadirs/dbfast_mirror1/demoDataDir0"
self.args_list = ["gpsegstart.py", "-V", local_version,
"-n", "3", "--era", "5c105ee373d42194_180105120208", "-t", "600", "-p", "pickled transition data",
"-D", "2|0|p|p|s|u|aspen|aspen|25432|25438|/doesnt/exist/datadirs/dbfast1/demoDataDir0||",
"-D", "5|0|m|m|s|u|aspen|aspen|25435|25441|" + self.failing_data_dir + "||",
"-D", "3|1|p|p|s|u|aspen|aspen|25433|25439|/doesnt/exist/datadirs/dbfast2/demoDataDir1||",
"-D", "6|1|m|m|s|u|aspen|aspen|25436|25442|/doesnt/exist/datadirs/dbfast_mirror2/demoDataDir1||",
"-D", "4|2|p|p|s|u|aspen|aspen|25434|25440|/doesnt/exist/datadirs/dbfast3/demoDataDir2||",
"-D", "7|2|m|m|s|u|aspen|aspen|25437|25443|/doesnt/exist/datadirs/dbfast_mirror3/demoDataDir2||",
]
self.apply_patches([
patch('os.path.isdir'),
patch('os.path.exists'),
patch('os.kill'),
patch('gpsegstart.gp.recovery_startup', return_value=None),
patch('gpsegstart.base64.urlsafe_b64decode'),
patch('gpsegstart.pickle.loads', return_value="random string"),
patch('gpsegstart.gp.SendFilerepTransitionMessage.buildTransitionMessageCommand'),
patch('gpsegstart.base.WorkerPool'),
patch('gpsegstart.gp.read_postmaster_pidfile', return_value=111),
patch('gpsegstart.PgControlData.run')
])
self.mock_workerpool = self.get_mock_from_apply_patch('WorkerPool')
def tearDown(self):
super(GpSegStart, self).tearDown()
def test_startSegments(self):
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 0)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_SUCCESS)
@patch.object(PgControlData, "get_results", return_value=CommandResult(0, '/tmp/f1', '', True, False))
@patch.object(PgControlData, "get_value", return_value="1")
def test_startSegments_when_checksums_match(self, mock1, mock2):
self.args_list.append("--master-checksum-version")
self.args_list.append("1")
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 0)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_SUCCESS)
@patch.object(PgControlData, "get_results", return_value=CommandResult(0, '/tmp/f1', '', True, False))
@patch.object(PgControlData, "get_value", return_value="1")
def test_startSegments_when_checksums_mismatch(self, mock1, mock2):
self.args_list.append("--master-checksum-version")
self.args_list.append("0")
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 1)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_ERROR_CHECKSUM_MISMATCH)
@patch.object(PgControlData, "get_results", return_value=CommandResult(1, '/tmp/f1', '', True, False))
def test_startSegments_when_pg_controldata_failed(self, mock1):
self.args_list.append("--master-checksum-version")
self.args_list.append("1")
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 1)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_ERROR_PG_CONTROLDATA_FAILED)
@patch.object(PgControlData, "get_results", return_value=CommandResult(0, '/tmp/f1', '', True, False))
@patch.object(PgControlData, "get_value", return_value="1")
@patch.object(gp.SegmentStart, "get_results", return_value=CommandResult(1, '/tmp/f1', '', True, False))
def test_startSegments_when_pg_ctl_failed(self, mock1, mock2, mock_get_results):
self.args_list.append("--master-checksum-version")
self.args_list.append("1")
sys.argv = self.args_list
mock_get_results.segment = Mock()
mock_get_results.segment.getSegmentDataDirectory.return_value = self.failing_data_dir
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
with patch('gpsegstart.base.WorkerPool.return_value.getCompletedItems', return_value=[mock_get_results]):
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 1)
for result in gpsegstart.overall_status.results:
if result.datadir == self.failing_data_dir:
self.assertTrue(result.reasoncode == gp.SEGSTART_ERROR_PG_CTL_FAILED)
else:
self.assertTrue(result.reasoncode == gp.SEGSTART_SUCCESS)
......@@ -8,7 +8,6 @@ from mock import Mock, patch
from gparray import Segment, GpArray
from gppylib.operations.startSegments import StartSegmentsResult
from gppylib.test.unit.gp_unittest import GpTestCase, run_tests
from gppylib.commands import gp
class GpStart(GpTestCase):
......@@ -74,7 +73,6 @@ class GpStart(GpTestCase):
self.mock_heap_checksum.return_value.check_segment_consistency.return_value = ([], [], None)
self.mock_pgconf.readfile.return_value = Mock()
self.mock_gplog_log_to_file_only = self.get_mock_from_apply_patch("log_to_file_only")
self.mock_filespace_is_filespace_configured = self.get_mock_from_apply_patch("is_filespace_configured")
self.mock_gp.get_masterdatadir.return_value = 'masterdatadir'
self.mock_gp.GpCatVersion.local.return_value = 1
......@@ -140,6 +138,62 @@ class GpStart(GpTestCase):
self.assertEquals(self.mock_gplog_log_to_file_only.call_count, 0)
def test_output_to_stdout_and_log_differs_for_heap_checksum(self):
sys.argv = ["gpstart", "-a"]
self.mock_heap_checksum.return_value.are_segments_consistent.return_value = False
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_os_path_exists.side_effect = os_exists_check
self.primary1.heap_checksum = 0
self.master.heap_checksum = '1'
self.mock_heap_checksum.return_value.check_segment_consistency.return_value = (
[self.primary0], [self.primary1], self.master.heap_checksum)
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
return_code = gpstart.run()
self.assertEqual(return_code, 1)
self.subject.logger.fatal.assert_any_call('Cluster heap checksum setting differences reported.')
self.mock_gplog_log_to_file_only.assert_any_call('Failed checksum consistency validation:', logging.WARN)
self.mock_gplog_log_to_file_only.assert_any_call('dbid: %s '
'checksum set to %s differs from '
'master checksum set to %s' %
(self.primary1.getSegmentDbId(), 0, 1), logging.WARN)
self.subject.logger.fatal.assert_any_call("Shutting down master")
self.assertEquals(self.mock_gp.GpStop.call_count, 1)
def test_failed_to_contact_segments_causes_logging_and_failure(self):
sys.argv = ["gpstart", "-a"]
self.mock_heap_checksum.return_value.get_segments_checksum_settings.return_value = ([], [1])
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_os_path_exists.side_effect = os_exists_check
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
return_code = gpstart.run()
self.assertEqual(return_code, 1)
self.subject.logger.fatal.assert_any_call(
'No segments responded to ssh query for heap checksum. Not starting the array.')
def test_checksum_consistent(self):
sys.argv = ["gpstart", "-a"]
self.mock_heap_checksum.return_value.get_segments_checksum_settings.return_value = ([1], [1])
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_os_path_exists.side_effect = os_exists_check
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
return_code = gpstart.run()
self.assertEqual(return_code, 0)
self.subject.logger.info.assert_any_call('Heap checksum setting is consistent across the cluster')
def test_skip_checksum_validation_succeeds(self):
sys.argv = ["gpstart", "-a", "--skip-heap-checksum-validation"]
self.mock_heap_checksum.return_value.get_segments_checksum_settings.return_value = ([1], [1])
......@@ -158,23 +212,24 @@ class GpStart(GpTestCase):
'the GUC for data_checksums '
'will not be checked between master and segments')
def test_log_when_heap_checksum_validation_fails(self):
sys.argv = ["gpstart", "-a", "-S"]
def test_gpstart_fails_if_standby_heap_checksum_doesnt_match_master(self):
sys.argv = ["gpstart", "-a"]
self.gparray = GpArray([self.master, self.primary0, self.primary1, self.mirror0, self.mirror1, self.standby])
self.segments_by_content_id = GpArray.getSegmentsByContentId(self.gparray.getSegDbList())
self.mock_os_path_exists.side_effect = os_exists_check
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_heap_checksum.return_value.get_master_value.return_value = 1
self.mock_filespace_is_filespace_configured.return_value = False
start_failure = StartSegmentsResult()
start_failure.addFailure(self.mirror1, "fictitious reason", gp.SEGSTART_ERROR_CHECKSUM_MISMATCH)
self.mock_start_result.return_value.startSegments.return_value.getFailedSegmentObjs.return_value = start_failure.getFailedSegmentObjs()
self.mock_heap_checksum.return_value.get_standby_value.return_value = 0
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
return_code = gpstart.run()
with patch("gpstart.GpArray.initFromCatalog", return_value=self.gparray):
return_code = gpstart.run()
self.assertEqual(return_code, 1)
messages = [msg[0][0] for msg in self.subject.logger.info.call_args_list]
self.assertIn("DBID:5 FAILED host:'sdw1' datadir:'/data/mirror1' with reason:'fictitious reason'", messages)
self.subject.logger.warning.assert_any_call("Heap checksum settings on standby master do not match master <<<<<<<<")
self.subject.logger.error.assert_any_call("gpstart error: Heap checksum settings are not consistent across the cluster.")
def _createGpArrayWith2Primary2Mirrors(self):
self.master = Segment.initFromString(
......@@ -201,7 +256,7 @@ def os_exists_check(arg):
# Skip file related checks
if 'pg_log' in arg:
return True
elif 'postmaster.pid' in arg or '.s.PGSQL' in arg or 'filespace':
elif 'postmaster.pid' in arg or '.s.PGSQL' in arg:
return False
return False
......
......@@ -121,13 +121,48 @@ class GpStart:
return 0
if self.skip_heap_checksum_validation:
self.master_checksum_value = None
logger.warning("Because of --skip-heap-checksum-validation, the GUC for data_checksums "
"will not be checked between master and segments")
"will not be checked between master and segments")
else:
# check that checksum settings are consistent across the cluster
num_workers = min(len(self.gparray.get_hostlist()), self.parallel)
self.master_checksum_value = HeapChecksum(gparray=self.gparray, num_workers=num_workers,
logger=logger).get_master_value()
heap_checksum = HeapChecksum(gparray=self.gparray, num_workers=num_workers, logger=logger)
if self.gparray.hasStandbyMaster():
if heap_checksum.get_master_value() != heap_checksum.get_standby_value():
logger.warning("Heap checksum settings on standby master do not match master <<<<<<<<")
logger.error("gpstart error: Heap checksum settings are not consistent across the cluster.")
return 1
successes, failures = heap_checksum.get_segments_checksum_settings()
if len(successes) == 0:
logger.fatal("No segments responded to ssh query for heap checksum. Not starting the array.")
return 1
consistent, inconsistent, master_checksum_value = heap_checksum.check_segment_consistency(successes)
if not heap_checksum.are_segments_consistent(consistent, inconsistent):
logger.fatal("Cluster heap checksum setting differences reported.")
logger.fatal("Shutting down master")
cmd = gp.GpStop("Shutting down master", masterOnly=True,
fast=True, quiet=logging_is_quiet(),
verbose=logging_is_verbose(),
datadir=self.master_datadir,
logfileDirectory=self.logfileDirectory)
cmd.run()
logger.fatal("Heap checksum settings on %d of %d segment instances do not match master <<<<<<<<"
% (len(inconsistent), len(inconsistent) + len(consistent)))
logger.fatal("Review %s for details" % get_logfile())
log_to_file_only("Failed checksum consistency validation:", logging.WARN)
for gpdb in inconsistent:
segment_id = gpdb.getSegmentDbId()
checksum = gpdb.heap_checksum
log_to_file_only("dbid: %s "
"checksum set to %s differs from master checksum set to %s" %
(segment_id, checksum, master_checksum_value), logging.WARN)
return 1
logger.info("Heap checksum setting is consistent across the cluster")
# Do we have an even-number of recovered segments ?
if len(self.gparray.recoveredSegmentDbids) > 0 and len(self.gparray.recoveredSegmentDbids) % 2 == 0:
......@@ -443,8 +478,8 @@ class GpStart:
# this will eventually start gpsegstart.py
segmentStartOp = StartSegmentsOperation(self.pool, self.quiet, self.gpversion,
self.gphome, self.master_datadir, self.master_checksum_value,
self.timeout, self.specialMode, self.wrapper, self.wrapper_args,
self.gphome, self.master_datadir, self.timeout,
self.specialMode, self.wrapper, self.wrapper_args,
logfileDirectory=self.logfileDirectory)
segmentStartResult = segmentStartOp.startSegments(self.gparray, segmentsToStart, startMode, self.era)
......
......@@ -18,7 +18,6 @@ from gppylib.gpparseopts import OptParser, OptChecker
from gppylib import gparray, gplog
from gppylib.commands import base, gp
from gppylib.utils import parseKeyColonValueLines
from gppylib.commands.pg import PgControlData
logger = gplog.get_default_logger()
......@@ -132,7 +131,7 @@ class GpSegStart:
def __init__(self, dblist, gpversion, mirroringMode, num_cids, era,
timeout, pickledTransitionData, specialMode, wrapper, wrapper_args,
master_checksum_version, logfileDirectory=False):
logfileDirectory=False):
# validate/store arguments
#
......@@ -165,7 +164,6 @@ class GpSegStart:
self.overall_status = None
self.logfileDirectory = logfileDirectory
self.master_checksum_version = master_checksum_version
def getOverallStatusKeys(self):
return self.overall_status.dirmap.keys()
......@@ -240,32 +238,11 @@ class GpSegStart:
self.logger.info("Starting segments... (mirroringMode %s)" % self.mirroringMode)
for datadir, seg in self.overall_status.dirmap.items():
if self.master_checksum_version != None:
cmd = PgControlData(name='run pg_controldata', datadir=datadir)
cmd.run(validateAfter=True)
res = cmd.get_results()
if res.rc != 0:
msg = "pg_controldata failed.\nstdout:%s\nstderr:%s\n" % (res.stdout, res.stderr)
reasoncode = gp.SEGSTART_ERROR_PG_CONTROLDATA_FAILED
self.overall_status.mark_failed(datadir, msg, reasoncode)
continue
segment_heap_checksum_version = cmd.get_value('Data page checksum version')
if segment_heap_checksum_version != self.master_checksum_version:
msg = "Segment checksum %s does not match master checksum %s.\n" % (segment_heap_checksum_version,
self.master_checksum_version)
reasoncode = gp.SEGSTART_ERROR_CHECKSUM_MISMATCH
self.overall_status.mark_failed(datadir, msg, reasoncode)
continue
cmd = gp.SegmentStart("Starting seg at dir %s" % datadir,
seg,
self.num_cids,
self.era,
self.mirroringMode,
self.master_checksum_version,
timeout=self.timeout,
specialMode=self.specialMode,
wrapper=self.wrapper,
......@@ -471,7 +448,6 @@ class GpSegStart:
help='start the instance in upgrade or maintenance mode')
parser.add_option('', '--wrapper', dest="wrapper", default=None, type='string')
parser.add_option('', '--wrapper-args', dest="wrapper_args", default=None, type='string')
parser.add_option('', '--master-checksum-version', dest="master_checksum_version", default=None, type='string', action="store")
return parser
......@@ -491,7 +467,6 @@ class GpSegStart:
options.specialMode,
options.wrapper,
options.wrapper_args,
options.master_checksum_version,
logfileDirectory=logfileDirectory)
#-------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册