提交 a993ef03 编写于 作者: S Shoaib Lari

gpstart: fix OOM issue

gpstart did a cluster-wide check of heap_checksum settings and refused
to start the cluster if this setting was inconsistent. This meant a
round of ssh'ing across the cluster which was causing OOM errors with
large clusters.

This commit moves the heap_checksum validation to gpsegstart.py, and
changes the logic so that only those segments which have the same
heap_checksum setting as master are started.

Author: Jim Doty <jdoty@pivotal.io>
Author: Nadeem Ghani <nghani@pivotal.io>
Author: Shoaib Lari <slari@pivotal.io>
上级 c9842d0c
......@@ -801,8 +801,9 @@ class SegmentTemplate:
self.logger.info('Configuring new segments (primary)')
new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getExpansionSegDbList(),
primaryMirror='primary')
self.logger.info(new_segment_info)
for host in iter(new_segment_info):
segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', new_segment_info[host],
segCfgCmd = ConfigureNewSegment(name='gpexpand configure new segments', confinfo=new_segment_info[host],
tarFile=self.segTarFile, newSegments=True,
verbose=gplog.logging_is_verbose(), batchSize=self.batch_size,
ctxt=REMOTE, remoteHost=host)
......@@ -828,7 +829,7 @@ class SegmentTemplate:
new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(mirrorsList, primaryMirror='mirror')
for host in iter(new_segment_info):
segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', new_segment_info[host],
segCfgCmd = ConfigureNewSegment(name='gpexpand configure new segments', confinfo=new_segment_info[host],
tarFile=self.schema_tar_file, newSegments=True,
verbose=gplog.logging_is_verbose(), batchSize=self.batch_size,
ctxt=REMOTE, remoteHost=host, validationOnly=False)
......@@ -1496,7 +1497,7 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
self.logger.info('Restoring original segments catalog tables')
orig_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(originalArray.getSegDbList())
for host in iter(orig_segment_info):
segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', orig_segment_info[host],
segCfgCmd = ConfigureNewSegment(name='gpexpand configure new segments', confinfo=orig_segment_info[host],
verbose=gplog.logging_is_verbose(), batchSize=self.options.batch_size,
ctxt=REMOTE, remoteHost=host)
self.pool.addCommand(segCfgCmd)
......
......@@ -545,6 +545,8 @@ SEGSTART_ERROR_SERVER_DID_NOT_RESPOND = 7
SEGSTART_ERROR_PG_CTL_FAILED = 8
SEGSTART_ERROR_CHECKING_CONNECTION_AND_LOCALE_FAILED = 9
SEGSTART_ERROR_PING_FAILED = 10 # not actually done inside GpSegStartCmd, done instead by caller
SEGSTART_ERROR_PG_CONTROLDATA_FAILED = 11
SEGSTART_ERROR_CHECKSUM_MISMATCH = 12
SEGSTART_ERROR_OTHER = 1000
......@@ -557,7 +559,7 @@ class GpSegStartArgs(CmdArgs):
"$GPHOME/sbin/gpsegstart.py -M mirrorless -V 'gpversion' -n 1 --era 123 -t 600"
"""
def __init__(self, mirrormode, gpversion, num_cids, era, timeout):
def __init__(self, mirrormode, gpversion, num_cids, era, master_checksum_value, timeout):
"""
@param mirrormode - mirror start mode (START_AS_PRIMARY_OR_MIRROR or START_AS_MIRRORLESS)
@param gpversion - version (from postgres --gp-version)
......@@ -565,14 +567,19 @@ class GpSegStartArgs(CmdArgs):
@param era - master era
@param timeout - seconds to wait before giving up
"""
CmdArgs.__init__(self, [
default_args = [
"$GPHOME/sbin/gpsegstart.py",
"-M", str(mirrormode),
"-V '%s'" % gpversion,
"-n", str(num_cids),
"--era", str(era),
"-t", str(timeout)
])
]
if master_checksum_value != None:
default_args.append("--master-checksum-version")
default_args.append(str(master_checksum_value))
CmdArgs.__init__(self, default_args)
def set_special(self, special):
"""
......@@ -597,7 +604,7 @@ class GpSegStartArgs(CmdArgs):
class GpSegStartCmd(Command):
def __init__(self, name, gphome, segments, gpversion,
mirrormode, numContentsInCluster, era,
mirrormode, numContentsInCluster, era, master_checksum_value=None,
timeout=SEGMENT_TIMEOUT_DEFAULT, verbose=False,
ctxt=LOCAL, remoteHost=None, pickledTransitionData=None,
specialMode=None, wrapper=None, wrapper_args=None,
......@@ -607,7 +614,7 @@ class GpSegStartCmd(Command):
self.dblist = [x for x in segments]
# build gpsegstart command string
c = GpSegStartArgs(mirrormode, gpversion, numContentsInCluster, era, timeout)
c = GpSegStartArgs(mirrormode, gpversion, numContentsInCluster, era, master_checksum_value,timeout)
c.set_verbose(verbose)
c.set_special(specialMode)
c.set_transition(pickledTransitionData)
......
......@@ -264,6 +264,9 @@ class PgControlData(Command):
self.data[n.strip()] = v.strip()
return self.data[name]
def get_datadir(self):
return self.datadir
class PgBaseBackup(Command):
def __init__(self, pgdata, host, port, excludePaths=[], ctxt=LOCAL, remoteHost=None):
cmd_tokens = ['pg_basebackup',
......@@ -292,4 +295,4 @@ class PgBaseBackup(Command):
cmd_tokens.append(path)
cmd_str = ' '.join(cmd_tokens)
Command.__init__(self, 'pg_basebackup', cmd_str, ctxt=ctxt, remoteHost=remoteHost)
\ No newline at end of file
Command.__init__(self, 'pg_basebackup', cmd_str, ctxt=ctxt, remoteHost=remoteHost)
......@@ -72,7 +72,7 @@ class StartSegmentsOperation:
"""
def __init__(self, workerPool, quiet, gpVersion,
gpHome, masterDataDirectory, timeout=SEGMENT_TIMEOUT_DEFAULT,
gpHome, masterDataDirectory, master_checksum_value=None, timeout=SEGMENT_TIMEOUT_DEFAULT,
specialMode=None, wrapper=None, wrapper_args=None,
logfileDirectory=False):
checkNotNone("workerPool", workerPool)
......@@ -86,6 +86,7 @@ class StartSegmentsOperation:
self.__specialMode = specialMode
self.__wrapper = wrapper
self.__wrapper_args = wrapper_args
self.master_checksum_value = master_checksum_value
self.logfileDirectory = logfileDirectory
def startSegments(self, gpArray, segments, startMethod, era):
......@@ -204,6 +205,7 @@ class StartSegmentsOperation:
mirroringModePreTransition,
numContentsInCluster,
era,
self.master_checksum_value,
self.__timeout,
verbose=logging_is_verbose(),
ctxt=base.REMOTE,
......
import sys, os
import gpsegstart
from mock import Mock, patch
from gppylib.test.unit.gp_unittest import GpTestCase
from gppylib.commands.base import CommandResult
from gppylib.commands.pg import PgControlData
from gppylib.commands import gp
class GpSegStart(GpTestCase):
def setUp(self):
self.subject = gpsegstart
self.subject.logger = Mock(
spec=['log', 'warn', 'info', 'debug', 'error', 'warning', 'fatal', 'warning_to_file_only'])
local_version = gp.GpVersion.local('local GP software version check', gp.get_gphome())
self.failing_data_dir = "/doesnt/exist/datadirs/dbfast_mirror1/demoDataDir0"
self.args_list = ["gpsegstart.py", "-V", local_version,
"-n", "3", "--era", "5c105ee373d42194_180105120208", "-t", "600", "-p", "pickled transition data",
"-D", "2|0|p|p|s|u|aspen|aspen|25432|/doesnt/exist/datadirs/dbfast1/demoDataDir0",
"-D", "5|0|m|m|s|u|aspen|aspen|25435|" + self.failing_data_dir ,
"-D", "3|1|p|p|s|u|aspen|aspen|25433|/doesnt/exist/datadirs/dbfast2/demoDataDir1",
"-D", "6|1|m|m|s|u|aspen|aspen|25436|/doesnt/exist/datadirs/dbfast_mirror2/demoDataDir1",
"-D", "4|2|p|p|s|u|aspen|aspen|25434|/doesnt/exist/datadirs/dbfast3/demoDataDir2",
"-D", "7|2|m|m|s|u|aspen|aspen|25437|/doesnt/exist/datadirs/dbfast_mirror3/demoDataDir2",
]
self.apply_patches([
patch('os.path.isdir'),
patch('os.path.exists'),
patch('os.kill'),
patch('gpsegstart.gp.recovery_startup', return_value=None),
patch('gpsegstart.base64.urlsafe_b64decode'),
patch('gpsegstart.pickle.loads', return_value="random string"),
patch('gpsegstart.base.WorkerPool'),
patch('gpsegstart.gp.read_postmaster_pidfile', return_value=111),
patch('gpsegstart.PgControlData.run')
])
self.mock_workerpool = self.get_mock_from_apply_patch('WorkerPool')
def tearDown(self):
super(GpSegStart, self).tearDown()
def test_startSegments(self):
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 0)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_SUCCESS)
@patch.object(PgControlData, "get_results", return_value=CommandResult(0, '/tmp/f1', '', True, False))
@patch.object(PgControlData, "get_value", return_value="1")
def test_startSegments_when_checksums_match(self, mock1, mock2):
self.args_list.append("--master-checksum-version")
self.args_list.append("1")
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 0)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_SUCCESS)
@patch.object(PgControlData, "get_results", return_value=CommandResult(0, '/tmp/f1', '', True, False))
@patch.object(PgControlData, "get_value", return_value="1")
def test_startSegments_when_checksums_mismatch(self, mock1, mock2):
self.args_list.append("--master-checksum-version")
self.args_list.append("0")
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 1)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_ERROR_CHECKSUM_MISMATCH)
@patch.object(PgControlData, "get_results", return_value=CommandResult(1, '/tmp/f1', '', True, False))
def test_startSegments_when_pg_controldata_failed(self, mock1):
self.args_list.append("--master-checksum-version")
self.args_list.append("1")
sys.argv = self.args_list
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 1)
for result in gpsegstart.overall_status.results:
self.assertTrue(result.reasoncode == gp.SEGSTART_ERROR_PG_CONTROLDATA_FAILED)
@patch.object(PgControlData, "get_results", return_value=CommandResult(0, '/tmp/f1', '', True, False))
@patch.object(PgControlData, "get_value", return_value="1")
@patch.object(gp.SegmentStart, "get_results", return_value=CommandResult(1, '/tmp/f1', '', True, False))
def test_startSegments_when_pg_ctl_failed(self, mock1, mock2, mock_get_results):
self.args_list.append("--master-checksum-version")
self.args_list.append("1")
sys.argv = self.args_list
mock_get_results.segment = Mock()
mock_get_results.segment.getSegmentDataDirectory.return_value = self.failing_data_dir
parser = self.subject.GpSegStart.createParser()
options, args = parser.parse_args()
gpsegstart = self.subject.GpSegStart.createProgram(options, args)
with patch('gpsegstart.base.WorkerPool.return_value.getCompletedItems', return_value=[mock_get_results]):
exitCode = gpsegstart.run()
self.assertEqual(exitCode, 1)
for result in gpsegstart.overall_status.results:
if result.datadir == self.failing_data_dir:
self.assertTrue(result.reasoncode == gp.SEGSTART_ERROR_PG_CTL_FAILED)
else:
self.assertTrue(result.reasoncode == gp.SEGSTART_SUCCESS)
......@@ -8,6 +8,7 @@ from mock import Mock, patch
from gparray import Segment, GpArray
from gppylib.operations.startSegments import StartSegmentsResult
from gppylib.test.unit.gp_unittest import GpTestCase, run_tests
from gppylib.commands import gp
class GpStart(GpTestCase):
......@@ -133,62 +134,6 @@ class GpStart(GpTestCase):
self.assertEquals(self.mock_gplog_log_to_file_only.call_count, 0)
def test_output_to_stdout_and_log_differs_for_heap_checksum(self):
sys.argv = ["gpstart", "-a"]
self.mock_heap_checksum.return_value.are_segments_consistent.return_value = False
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_os_path_exists.side_effect = os_exists_check
self.primary1.heap_checksum = 0
self.master.heap_checksum = '1'
self.mock_heap_checksum.return_value.check_segment_consistency.return_value = (
[self.primary0], [self.primary1], self.master.heap_checksum)
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
return_code = gpstart.run()
self.assertEqual(return_code, 1)
self.subject.logger.fatal.assert_any_call('Cluster heap checksum setting differences reported.')
self.mock_gplog_log_to_file_only.assert_any_call('Failed checksum consistency validation:', logging.WARN)
self.mock_gplog_log_to_file_only.assert_any_call('dbid: %s '
'checksum set to %s differs from '
'master checksum set to %s' %
(self.primary1.getSegmentDbId(), 0, 1), logging.WARN)
self.subject.logger.fatal.assert_any_call("Shutting down master")
self.assertEquals(self.mock_gp.GpStop.call_count, 1)
def test_failed_to_contact_segments_causes_logging_and_failure(self):
sys.argv = ["gpstart", "-a"]
self.mock_heap_checksum.return_value.get_segments_checksum_settings.return_value = ([], [1])
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_os_path_exists.side_effect = os_exists_check
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
return_code = gpstart.run()
self.assertEqual(return_code, 1)
self.subject.logger.fatal.assert_any_call(
'No segments responded to ssh query for heap checksum. Not starting the array.')
def test_checksum_consistent(self):
sys.argv = ["gpstart", "-a"]
self.mock_heap_checksum.return_value.get_segments_checksum_settings.return_value = ([1], [1])
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_os_path_exists.side_effect = os_exists_check
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
return_code = gpstart.run()
self.assertEqual(return_code, 0)
self.subject.logger.info.assert_any_call('Heap checksum setting is consistent across the cluster')
def test_skip_checksum_validation_succeeds(self):
sys.argv = ["gpstart", "-a", "--skip-heap-checksum-validation"]
self.mock_heap_checksum.return_value.get_segments_checksum_settings.return_value = ([1], [1])
......@@ -207,24 +152,22 @@ class GpStart(GpTestCase):
'the GUC for data_checksums '
'will not be checked between master and segments')
def test_gpstart_fails_if_standby_heap_checksum_doesnt_match_master(self):
sys.argv = ["gpstart", "-a"]
self.gparray = GpArray([self.master, self.primary0, self.primary1, self.mirror0, self.mirror1, self.standby])
self.segments_by_content_id = GpArray.getSegmentsByContentId(self.gparray.getSegDbList())
def test_log_when_heap_checksum_validation_fails(self):
sys.argv = ["gpstart", "-a", "-S"]
self.mock_os_path_exists.side_effect = os_exists_check
self.subject.unix.PgPortIsActive.local.return_value = False
self.mock_heap_checksum.return_value.get_master_value.return_value = 1
self.mock_heap_checksum.return_value.get_standby_value.return_value = 0
start_failure = StartSegmentsResult()
start_failure.addFailure(self.mirror1, "fictitious reason", gp.SEGSTART_ERROR_CHECKSUM_MISMATCH)
self.mock_start_result.return_value.startSegments.return_value.getFailedSegmentObjs.return_value = start_failure.getFailedSegmentObjs()
parser = self.subject.GpStart.createParser()
options, args = parser.parse_args()
gpstart = self.subject.GpStart.createProgram(options, args)
with patch("gpstart.GpArray.initFromCatalog", return_value=self.gparray):
return_code = gpstart.run()
return_code = gpstart.run()
self.assertEqual(return_code, 1)
self.subject.logger.warning.assert_any_call("Heap checksum settings on standby master do not match master <<<<<<<<")
self.subject.logger.error.assert_any_call("gpstart error: Heap checksum settings are not consistent across the cluster.")
messages = [msg[0][0] for msg in self.subject.logger.info.call_args_list]
self.assertIn("DBID:5 FAILED host:'sdw1' datadir:'/data/mirror1' with reason:'fictitious reason'", messages)
def _createGpArrayWith2Primary2Mirrors(self):
self.master = Segment.initFromString(
......
......@@ -120,48 +120,13 @@ class GpStart:
return 0
if self.skip_heap_checksum_validation:
self.master_checksum_value = None
logger.warning("Because of --skip-heap-checksum-validation, the GUC for data_checksums "
"will not be checked between master and segments")
"will not be checked between master and segments")
else:
# check that checksum settings are consistent across the cluster
num_workers = min(len(self.gparray.get_hostlist()), self.parallel)
heap_checksum = HeapChecksum(gparray=self.gparray, num_workers=num_workers, logger=logger)
if self.gparray.hasStandbyMaster():
if heap_checksum.get_master_value() != heap_checksum.get_standby_value():
logger.warning("Heap checksum settings on standby master do not match master <<<<<<<<")
logger.error("gpstart error: Heap checksum settings are not consistent across the cluster.")
return 1
successes, failures = heap_checksum.get_segments_checksum_settings()
if len(successes) == 0:
logger.fatal("No segments responded to ssh query for heap checksum. Not starting the array.")
return 1
consistent, inconsistent, master_checksum_value = heap_checksum.check_segment_consistency(successes)
if not heap_checksum.are_segments_consistent(consistent, inconsistent):
logger.fatal("Cluster heap checksum setting differences reported.")
logger.fatal("Shutting down master")
cmd = gp.GpStop("Shutting down master", masterOnly=True,
fast=True, quiet=logging_is_quiet(),
verbose=logging_is_verbose(),
datadir=self.master_datadir,
logfileDirectory=self.logfileDirectory)
cmd.run()
logger.fatal("Heap checksum settings on %d of %d segment instances do not match master <<<<<<<<"
% (len(inconsistent), len(inconsistent) + len(consistent)))
logger.fatal("Review %s for details" % get_logfile())
log_to_file_only("Failed checksum consistency validation:", logging.WARN)
for gpdb in inconsistent:
segment_id = gpdb.getSegmentDbId()
checksum = gpdb.heap_checksum
log_to_file_only("dbid: %s "
"checksum set to %s differs from master checksum set to %s" %
(segment_id, checksum, master_checksum_value), logging.WARN)
return 1
logger.info("Heap checksum setting is consistent across the cluster")
self.master_checksum_value = HeapChecksum(gparray=self.gparray, num_workers=num_workers,
logger=logger).get_master_value()
# Do we have an even-number of recovered segments ?
if len(self.gparray.recoveredSegmentDbids) > 0 and len(self.gparray.recoveredSegmentDbids) % 2 == 0:
......@@ -463,8 +428,8 @@ class GpStart:
# this will eventually start gpsegstart.py
segmentStartOp = StartSegmentsOperation(self.pool, self.quiet, self.gpversion,
self.gphome, self.master_datadir, self.timeout,
self.specialMode, self.wrapper, self.wrapper_args,
self.gphome, self.master_datadir, self.master_checksum_value,
self.timeout, self.specialMode, self.wrapper, self.wrapper_args,
logfileDirectory=self.logfileDirectory)
segmentStartResult = segmentStartOp.startSegments(self.gparray, segmentsToStart, startMode, self.era)
......
......@@ -18,6 +18,7 @@ from gppylib.gpparseopts import OptParser, OptChecker
from gppylib import gparray, gplog
from gppylib.commands import base, gp
from gppylib.utils import parseKeyColonValueLines
from gppylib.commands.pg import PgControlData
logger = gplog.get_default_logger()
......@@ -131,7 +132,7 @@ class GpSegStart:
def __init__(self, dblist, gpversion, mirroringMode, num_cids, era,
timeout, pickledTransitionData, specialMode, wrapper, wrapper_args,
logfileDirectory=False):
master_checksum_version, logfileDirectory=False):
# validate/store arguments
#
......@@ -164,6 +165,7 @@ class GpSegStart:
self.overall_status = None
self.logfileDirectory = logfileDirectory
self.master_checksum_version = master_checksum_version
def getOverallStatusKeys(self):
return self.overall_status.dirmap.keys()
......@@ -238,11 +240,32 @@ class GpSegStart:
self.logger.info("Starting segments... (mirroringMode %s)" % self.mirroringMode)
for datadir, seg in self.overall_status.dirmap.items():
if self.master_checksum_version != None:
cmd = PgControlData(name='run pg_controldata', datadir=datadir)
cmd.run(validateAfter=True)
res = cmd.get_results()
if res.rc != 0:
msg = "pg_controldata failed.\nstdout:%s\nstderr:%s\n" % (res.stdout, res.stderr)
reasoncode = gp.SEGSTART_ERROR_PG_CONTROLDATA_FAILED
self.overall_status.mark_failed(datadir, msg, reasoncode)
continue
segment_heap_checksum_version = cmd.get_value('Data page checksum version')
if segment_heap_checksum_version != self.master_checksum_version:
msg = "Segment checksum %s does not match master checksum %s.\n" % (segment_heap_checksum_version,
self.master_checksum_version)
reasoncode = gp.SEGSTART_ERROR_CHECKSUM_MISMATCH
self.overall_status.mark_failed(datadir, msg, reasoncode)
continue
cmd = gp.SegmentStart("Starting seg at dir %s" % datadir,
seg,
self.num_cids,
self.era,
self.mirroringMode,
self.master_checksum_version,
timeout=self.timeout,
specialMode=self.specialMode,
wrapper=self.wrapper,
......@@ -445,6 +468,7 @@ class GpSegStart:
help='start the instance in upgrade or maintenance mode')
parser.add_option('', '--wrapper', dest="wrapper", default=None, type='string')
parser.add_option('', '--wrapper-args', dest="wrapper_args", default=None, type='string')
parser.add_option('', '--master-checksum-version', dest="master_checksum_version", default=None, type='string', action="store")
return parser
......@@ -464,6 +488,7 @@ class GpSegStart:
options.specialMode,
options.wrapper,
options.wrapper_args,
options.master_checksum_version,
logfileDirectory=logfileDirectory)
#-------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册