cdbdisp_async.c 24.7 KB
Newer Older
1 2 3 4 5 6 7

/*-------------------------------------------------------------------------
 *
 * cdbdisp_async.c
 *	  Functions for asynchronous implementation of dispatching
 *	  commands to QExecutors.
 *
8 9 10 11 12 13
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/dispatcher/cdbdisp_async.c
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"
#include <limits.h>

#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif

#include "storage/ipc.h"		/* For proc_exit_inprogress  */
#include "tcop/tcopprot.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_async.h"
#include "cdb/cdbdispatchresult.h"
33 34
#include "libpq-fe.h"
#include "libpq-int.h"
35 36 37
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
A
Adam Lee 已提交
38
#include "cdb/cdbpq.h"
39 40
#include "miscadmin.h"

41 42 43 44 45 46 47 48
#define DISPATCH_WAIT_TIMEOUT_MSEC 2000

/*
 * Ideally, we should set timeout to zero to cancel QEs as soon as possible,
 * but considering the cost of sending cancel signal is high, we want to process
 * as many finishing QEs as possible before cancelling
 */
#define DISPATCH_WAIT_CANCEL_TIMEOUT_MSEC 100
49 50 51 52 53 54 55 56 57 58 59 60

typedef struct CdbDispatchCmdAsync
{

	/*
	 * dispatchResultPtrArray: Array[0..dispatchCount-1] of CdbDispatchResult*
	 * Each CdbDispatchResult object points to a SegmentDatabaseDescriptor
	 * that dispatcher will send the command to.
	 */
	struct CdbDispatchResult **dispatchResultPtrArray;

	/* Number of segment DBs dispatched */
61
	int			dispatchCount;
62 63 64

	/*
	 * Depending on this mode, we may send query cancel or query finish
65 66
	 * message to QE while we are waiting it to complete.  NONE means we
	 * expect QE to complete without any instruction.
67 68 69 70
	 */
	volatile DispatchWaitMode waitMode;

	/*
71 72
	 * Text information to dispatch: The format is type(1 byte) + length(size
	 * of int) + content(n bytes)
73
	 *
74 75 76
	 * For DTX command, type is 'T', it's built by function
	 * buildGpDtxProtocolCommand. For query, type is 'M', it's built by
	 * function buildGpQueryString.
77
	 */
78 79
	char	   *query_text;
	int			query_text_len;
80

81
} CdbDispatchCmdAsync;
82

83
static void *cdbdisp_makeDispatchParams_async(int maxSlices, char *queryText, int len);
84

85 86
static void cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
								  DispatchWaitMode waitMode);
87

88 89 90 91
static void cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
							 struct Gang *gp,
							 int sliceIndex,
							 CdbDispatchDirectDesc *dispDirect);
92
static void	cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds);
93

94 95
static bool	cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds);
static int cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds);
96 97 98 99 100

DispatcherInternalFuncs DispatcherAsyncFuncs =
{
	NULL,
	cdbdisp_checkForCancel_async,
101
	cdbdisp_getWaitSocketFd_async,
102 103
	cdbdisp_makeDispatchParams_async,
	cdbdisp_checkDispatchResult_async,
104 105
	cdbdisp_dispatchToGang_async,
	cdbdisp_waitDispatchFinish_async
106 107 108
};


109
static void dispatchCommand(CdbDispatchResult *dispatchResult,
110 111 112
				const char *query_text,
				int query_text_len);

113 114
static void checkDispatchResult(CdbDispatcherState *ds,
					bool wait);
115

116
static bool processResults(CdbDispatchResult *dispatchResult);
117 118

static void
119
			signalQEs(CdbDispatchCmdAsync *pParms);
120 121

static void
122
			checkSegmentAlive(CdbDispatchCmdAsync *pParms);
123

124
static void
125
			handlePollError(CdbDispatchCmdAsync *pParms);
126

127
static void
128
			handlePollSuccess(CdbDispatchCmdAsync *pParms, struct pollfd *fds);
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144

/*
 * Check dispatch result.
 * Don't wait all dispatch commands to complete.
 *
 * Return true if any connection received error.
 */
static bool
cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds)
{
	Assert(ds);

	checkDispatchResult(ds, false);
	return cdbdisp_checkResultsErrcode(ds->primaryResults);
}

145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
/*
 * Return a FD to wait for, after dispatching.
 */
static int
cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds)
{
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	int			i;

	Assert(ds);

	if (proc_exit_inprogress)
		return PGINVALID_SOCKET;

	/*
	 * This should match the logic in cdbdisp_checkForCancel_async(). In
	 * particular, when cdbdisp_checkForCancel_async() is called, it must
	 * process any incoming data from the socket we return here, or we
	 * will busy wait.
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult;
		SegmentDatabaseDescriptor *segdbDesc;

		dispatchResult = pParms->dispatchResultPtrArray[i];
		segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Already finished with this QE?
		 */
		if (!dispatchResult->stillRunning)
			continue;

		Assert(!cdbconn_isBadConnection(segdbDesc));

		return PQsocket(segdbDesc->conn);
	}

	return PGINVALID_SOCKET;
}

187 188 189 190 191 192 193 194
/*
 * Block until all data are dispatched.
 */
static void
cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds)
{
	const static int DISPATCH_POLL_TIMEOUT = 500;
	struct pollfd *fds;
195 196 197 198
	int			nfds,
				i;
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	int			dispatchCount = pParms->dispatchCount;
199 200 201

	fds = (struct pollfd *) palloc(dispatchCount * sizeof(struct pollfd));

202
	while (true)
203
	{
204
		int			pollRet;
205 206 207 208 209 210 211 212

		nfds = 0;
		memset(fds, 0, dispatchCount * sizeof(struct pollfd));

		for (i = 0; i < dispatchCount; i++)
		{
			CdbDispatchResult *qeResult = pParms->dispatchResultPtrArray[i];
			SegmentDatabaseDescriptor *segdbDesc = qeResult->segdbDesc;
213 214
			PGconn	   *conn = segdbDesc->conn;
			int			ret;
215 216 217 218 219

			/* skip already completed connections */
			if (conn->outCount == 0)
				continue;

220 221 222 223
			/*
			 * call send for this connection regardless of its POLLOUT status,
			 * because it may be writable NOW
			 */
224 225 226 227 228 229
			ret = pqFlushNonBlocking(conn);

			if (ret == 0)
				continue;
			else if (ret > 0)
			{
230 231
				int			sock = PQsocket(segdbDesc->conn);

232 233 234 235 236 237 238 239
				Assert(sock >= 0);
				fds[nfds].fd = sock;
				fds[nfds].events = POLLOUT;
				nfds++;
			}
			else if (ret < 0)
			{
				pqHandleSendFailure(conn);
240
				char	   *msg = PQerrorMessage(conn);
241 242 243 244

				qeResult->stillRunning = false;
				ereport(ERROR,
						(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
245
						 errmsg("Command could not be dispatch to segment %s: %s", qeResult->segdbDesc->whoami, msg ? msg : "unknown error")));
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
			}
		}

		if (nfds == 0)
			break;

		/* guarantee poll() is interruptible */
		do
		{
			CHECK_FOR_INTERRUPTS();

			pollRet = poll(fds, nfds, DISPATCH_POLL_TIMEOUT);
			if (pollRet == 0)
				ELOG_DISPATCHER_DEBUG("cdbdisp_waitDispatchFinish_async(): Dispatch poll timeout after %d ms", DISPATCH_POLL_TIMEOUT);
		}
		while (pollRet == 0 || (pollRet < 0 && (SOCK_ERRNO == EINTR || SOCK_ERRNO == EAGAIN)));

		if (pollRet < 0)
			elog(ERROR, "Poll failed during dispatch");
	}

	pfree(fds);
}

270 271 272
/*
 * Dispatch command to gang.
 *
273 274
 * Throw out error to upper try-catch block if anything goes wrong. This function only kicks off dispatching,
 * call cdbdisp_waitDispatchFinish_async to ensure the completion
275 276 277 278 279
 */
static void
cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
							 struct Gang *gp,
							 int sliceIndex,
280
							 CdbDispatchDirectDesc *dispDirect)
281
{
282 283
	int			i;
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
284 285 286 287 288 289

	/*
	 * Start the dispatching
	 */
	for (i = 0; i < gp->size; i++)
	{
290
		CdbDispatchResult *qeResult;
291 292

		SegmentDatabaseDescriptor *segdbDesc = &gp->db_descriptors[i];
293

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
		Assert(segdbDesc != NULL);

		if (dispDirect->directed_dispatch)
		{
			/* We can direct dispatch to one segment DB only */
			Assert(dispDirect->count == 1);
			if (dispDirect->content[0] != segdbDesc->segindex)
				continue;
		}

		/*
		 * Initialize the QE's CdbDispatchResult object.
		 */
		qeResult = cdbdisp_makeResult(ds->primaryResults, segdbDesc, sliceIndex);
		if (qeResult == NULL)
		{
			/*
			 * writer_gang could be NULL if this is an extended query.
			 */
			if (ds->primaryResults->writer_gang)
				ds->primaryResults->writer_gang->dispatcherActive = true;

			elog(FATAL, "could not allocate resources for segworker communication");
		}
		pParms->dispatchResultPtrArray[pParms->dispatchCount++] = qeResult;

		dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len);
	}
}

/*
 * Check dispatch result.
 *
 * Wait all dispatch work to complete, either success or fail.
 * (Set stillRunning to true when one dispatch work is completed)
 */
static void
cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
332
								  DispatchWaitMode waitMode)
333 334
{
	Assert(ds != NULL);
335
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
336 337

	/* cdbdisp_destroyDispatcherState is called */
338
	if (pParms == NULL)
339 340
		return;

341 342 343 344
	/*
	 * Don't overwrite DISPATCH_WAIT_CANCEL or DISPATCH_WAIT_FINISH with
	 * DISPATCH_WAIT_NONE
	 */
345 346 347 348 349 350
	if (waitMode != DISPATCH_WAIT_NONE)
		pParms->waitMode = waitMode;

	checkDispatchResult(ds, true);

	/*
351 352
	 * It looks like everything went fine, make sure we don't miss a user
	 * cancellation?
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
	 *
	 * The waitMode argument is NONE when we are doing "normal work".
	 */
	if (waitMode == DISPATCH_WAIT_NONE || waitMode == DISPATCH_WAIT_FINISH)
		CHECK_FOR_INTERRUPTS();
}

/*
 * Allocates memory for a CdbDispatchCmdAsync structure and do the initialization.
 *
 * Memory will be freed in function cdbdisp_destroyDispatcherState by deleting the
 * memory context.
 */
static void *
cdbdisp_makeDispatchParams_async(int maxSlices, char *queryText, int len)
{
369 370
	int			maxResults = maxSlices * getgpsegmentCount();
	int			size = 0;
371 372 373 374 375 376 377 378 379 380

	CdbDispatchCmdAsync *pParms = palloc0(sizeof(CdbDispatchCmdAsync));

	size = maxResults * sizeof(CdbDispatchResult *);
	pParms->dispatchResultPtrArray = (CdbDispatchResult **) palloc0(size);
	pParms->dispatchCount = 0;
	pParms->waitMode = DISPATCH_WAIT_NONE;
	pParms->query_text = queryText;
	pParms->query_text_len = len;

381
	return (void *) pParms;
382 383 384 385 386 387 388 389 390 391 392 393 394
}

/*
 * Receive and process results from all running QEs.
 *
 * wait: true, wait until all dispatch works are completed.
 *       false, return immediate when there's no more data.
 *
 * Don't throw out error, instead, append the error message to
 * CdbDispatchResult.error_message.
 */
static void
checkDispatchResult(CdbDispatcherState *ds,
395
					bool wait)
396
{
397
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
398
	CdbDispatchResults *meleeResults = ds->primaryResults;
399 400
	SegmentDatabaseDescriptor *segdbDesc;
	CdbDispatchResult *dispatchResult;
401 402 403 404
	int			i;
	int			db_count = 0;
	int			timeout = 0;
	bool		sentSignal = false;
405
	struct pollfd *fds;
406
	uint8 ftsVersion = 0;
407 408 409 410 411

	db_count = pParms->dispatchCount;
	fds = (struct pollfd *) palloc(db_count * sizeof(struct pollfd));

	/*
412 413
	 * OK, we are finished submitting the command to the segdbs. Now, we have
	 * to wait for them to finish.
414 415 416
	 */
	for (;;)
	{
417 418 419
		int			sock;
		int			n;
		int			nfds = 0;
420
		PGconn		*conn;
421 422

		/*
423 424
		 * bail-out if we are dying. Once QD dies, QE will recognize it
		 * shortly anyway.
425 426 427 428
		 */
		if (proc_exit_inprogress)
			break;

429
		/*
430 431 432
		 * escalate waitMode to cancel if: - user interrupt has occurred, - or
		 * an error has been reported by any QE, - in case the caller wants
		 * cancelOnError
433 434 435 436
		 */
		if ((InterruptPending || meleeResults->errcode) && meleeResults->cancelOnError)
			pParms->waitMode = DISPATCH_WAIT_CANCEL;

437 438 439 440 441 442 443
		/*
		 * Which QEs are still running and could send results to us?
		 */
		for (i = 0; i < db_count; i++)
		{
			dispatchResult = pParms->dispatchResultPtrArray[i];
			segdbDesc = dispatchResult->segdbDesc;
444
			conn = segdbDesc->conn;
445

P
Pengzhou Tang 已提交
446 447 448 449 450 451
			/*
			 * Already finished with this QE?
			 */
			if (!dispatchResult->stillRunning)
				continue;

452
			Assert(!cdbconn_isBadConnection(segdbDesc));
453

454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
			/*
			 * Flush out buffer in case some commands are not fully
			 * dispatched to QEs, this can prevent QD from polling
			 * on such QEs forever.
			 */
			if (conn->outCount > 0)
			{
				/*
				 * Don't error out here, let following poll() routine to
				 * handle it.
				 */
				if (pqFlush(conn) < 0)
					elog(LOG, "Failed flushing outbound data to %s:%s",
						 segdbDesc->whoami, PQerrorMessage(conn));
			}

470 471 472
			/*
			 * Add socket to fd_set if still connected.
			 */
473
			sock = PQsocket(conn);
474 475 476 477 478 479 480 481 482 483 484 485 486
			Assert(sock >= 0);
			fds[nfds].fd = sock;
			fds[nfds].events = POLLIN;
			nfds++;
		}

		/*
		 * Break out when no QEs still running.
		 */
		if (nfds <= 0)
			break;

		/*
487 488
		 * Wait for results from QEs
		 *
489 490 491 492
		 * Don't wait if: - this is called from interconnect to check if
		 * there's any error.
		 *
		 * Lower the timeout if: - we need send signal to QEs.
493
		 */
494 495 496 497 498 499 500 501
		if (!wait)
			timeout = 0;
		else if (pParms->waitMode == DISPATCH_WAIT_NONE || sentSignal)
			timeout = DISPATCH_WAIT_TIMEOUT_MSEC;
		else
			timeout = DISPATCH_WAIT_CANCEL_TIMEOUT_MSEC;

		n = poll(fds, nfds, timeout);
502

503 504 505 506
		/*
		 * poll returns with an error, including one due to an interrupted
		 * call
		 */
507 508
		if (n < 0)
		{
509 510
			int			sock_errno = SOCK_ERRNO;

511 512 513 514
			if (sock_errno == EINTR)
				continue;

			elog(LOG, "handlePollError poll() failed; errno=%d", sock_errno);
515

516
			handlePollError(pParms);
517 518 519 520 521 522 523

			/*
			 * Since an error was detected for the segment, request
			 * FTS to perform a probe before checking the segment
			 * state.
			 */
			FtsNotifyProber();
524 525
			checkSegmentAlive(pParms);

526 527 528 529 530
			if (pParms->waitMode != DISPATCH_WAIT_NONE)
			{
				signalQEs(pParms);
				sentSignal = true;
			}
531 532 533

			if (!wait)
				break;
534 535 536 537
		}
		/* If the time limit expires, poll() returns 0 */
		else if (n == 0)
		{
538
			if (pParms->waitMode != DISPATCH_WAIT_NONE)
539
			{
540 541
				signalQEs(pParms);
				sentSignal = true;
542
			}
543

544 545 546 547 548 549 550 551
			/*
			 * This code relies on FTS being triggered at regular
			 * intervals. Iff FTS detects change in configuration
			 * then check segment state. FTS probe is not triggered
			 * explicitly in this case because this happens every
			 * DISPATCH_WAIT_TIMEOUT_MSEC.
			 */
			if (ftsVersion == 0 || ftsVersion != getFtsVersion())
552
			{
553
				ftsVersion = getFtsVersion();
554
				checkSegmentAlive(pParms);
555
			}
556 557 558

			if (!wait)
				break;
559 560 561 562 563 564 565 566 567 568 569 570 571
		}
		/* We have data waiting on one or more of the connections. */
		else
			handlePollSuccess(pParms, fds);
	}

	pfree(fds);
}

/*
 * Helper function that actually kicks off the command on the libpq connection.
 */
static void
572
dispatchCommand(CdbDispatchResult *dispatchResult,
573 574 575 576
				const char *query_text,
				int query_text_len)
{
	TimestampTz beforeSend = 0;
577 578
	long		secs;
	int			usecs;
579 580 581 582 583 584 585

	if (DEBUG1 >= log_min_messages)
		beforeSend = GetCurrentTimestamp();

	/*
	 * Submit the command asynchronously.
	 */
586
	if (PQsendGpQuery_shared(dispatchResult->segdbDesc->conn, (char *) query_text, query_text_len, true) == 0)
587
	{
588 589
		char	   *msg = PQerrorMessage(dispatchResult->segdbDesc->conn);

590 591 592 593
		dispatchResult->stillRunning = false;
		ereport(ERROR,
				(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
				 errmsg("Command could not be dispatch to segment %s: %s",
594
						dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error")));
595 596 597 598 599 600 601 602 603 604 605
	}

	if (DEBUG1 >= log_min_messages)
	{
		TimestampDifference(beforeSend, GetCurrentTimestamp(), &secs, &usecs);

		if (secs != 0 || usecs > 1000)	/* Time > 1ms? */
			elog(LOG, "time for PQsendGpQuery_shared %ld.%06d", secs, usecs);
	}

	/*
606 607 608
	 * We'll keep monitoring this QE -- whether or not the command was
	 * dispatched -- in order to check for a lost connection or any other
	 * errors that libpq might have in store for us.
609 610 611 612 613 614 615
	 */
	dispatchResult->stillRunning = true;
	dispatchResult->hasDispatched = true;

	ELOG_DISPATCHER_DEBUG("Command dispatched to QE (%s)", dispatchResult->segdbDesc->whoami);
}

616
/*
617 618 619 620 621 622
 * Helper function to checkDispatchResult that handles errors that occur
 * during the poll() call.
 *
 * NOTE: The cleanup of the connections will be performed by handlePollTimeout().
 */
static void
623
handlePollError(CdbDispatchCmdAsync *pParms)
624
{
625
	int			i;
626 627 628 629 630 631 632 633 634 635 636 637 638

	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/* Skip if already finished or didn't dispatch. */
		if (!dispatchResult->stillRunning)
			continue;

		/* We're done with this QE, sadly. */
		if (PQstatus(segdbDesc->conn) == CONNECTION_BAD)
		{
639 640
			char	   *msg = PQerrorMessage(segdbDesc->conn);

641 642 643 644 645 646 647
			if (msg)
				elog(LOG, "Dispatcher encountered connection error on %s: %s", segdbDesc->whoami, msg);

			elog(LOG, "Dispatcher noticed bad connection in handlePollError()");

			/* Save error info for later. */
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
648 649 650
										   "Error after dispatch from %s: %s",
										   segdbDesc->whoami,
										   msg ? msg : "unknown error");
651 652 653 654 655 656 657 658 659 660

			PQfinish(segdbDesc->conn);
			segdbDesc->conn = NULL;
			dispatchResult->stillRunning = false;
		}
	}

	return;
}

661 662 663 664
/*
 * Receive and process results from QEs.
 */
static void
665
handlePollSuccess(CdbDispatchCmdAsync *pParms,
666 667
				  struct pollfd *fds)
{
668 669
	int			currentFdNumber = 0;
	int			i = 0;
670 671 672 673 674 675

	/*
	 * We have data waiting on one or more of the connections.
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
676 677
		bool		finished;
		int			sock;
678 679 680 681 682 683 684 685 686 687
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Skip if already finished or didn't dispatch.
		 */
		if (!dispatchResult->stillRunning)
			continue;

		ELOG_DISPATCHER_DEBUG("looking for results from %d of %d (%s)",
688
							  i + 1, pParms->dispatchCount, segdbDesc->whoami);
689 690 691 692 693 694 695 696 697 698 699 700

		sock = PQsocket(segdbDesc->conn);
		Assert(sock >= 0);
		Assert(sock == fds[currentFdNumber].fd);

		/*
		 * Skip this connection if it has no input available.
		 */
		if (!(fds[currentFdNumber++].revents & POLLIN))
			continue;

		ELOG_DISPATCHER_DEBUG("PQsocket says there are results from %d of %d (%s)",
701
							  i + 1, pParms->dispatchCount, segdbDesc->whoami);
702 703 704 705 706

		/*
		 * Receive and process results from this QE.
		 */
		finished = processResults(dispatchResult);
707

708 709 710 711 712 713 714 715
		/*
		 * Are we through with this QE now?
		 */
		if (finished)
		{
			dispatchResult->stillRunning = false;

			ELOG_DISPATCHER_DEBUG("processResults says we are finished with %d of %d (%s)",
716
								  i + 1, pParms->dispatchCount, segdbDesc->whoami);
717 718 719

			if (DEBUG1 >= log_min_messages)
			{
720 721
				char		msec_str[32];

722 723 724 725 726
				switch (check_log_duration(msec_str, false))
				{
					case 1:
					case 2:
						elog(LOG, "duration to dispatch result received from %d (seg %d): %s ms",
727
							 i + 1, dispatchResult->segdbDesc->segindex, msec_str);
728 729 730 731 732 733 734 735 736
						break;
				}
			}

			if (PQisBusy(dispatchResult->segdbDesc->conn))
				elog(LOG, "We thought we were done, because finished==true, but libpq says we are still busy");
		}
		else
			ELOG_DISPATCHER_DEBUG("processResults says we have more to do with %d of %d (%s)",
737
								  i + 1, pParms->dispatchCount, segdbDesc->whoami);
738 739 740 741 742 743 744
	}
}

/*
 * Send finish or cancel signal to QEs if needed.
 */
static void
745
signalQEs(CdbDispatchCmdAsync *pParms)
746
{
747
	int			i;
748
	DispatchWaitMode waitMode = pParms->waitMode;
749 750 751

	for (i = 0; i < pParms->dispatchCount; i++)
	{
752 753
		char		errbuf[256];
		bool		sent = false;
754
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
755

756 757 758 759
		Assert(dispatchResult != NULL);
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
760 761
		 * Don't send the signal if - QE is finished or canceled - the signal
		 * was already sent - connection is dead
762 763
		 */

764 765 766 767
		if (!dispatchResult->stillRunning ||
			dispatchResult->wasCanceled ||
			cdbconn_isBadConnection(segdbDesc))
			continue;
768

769 770 771 772 773
		memset(errbuf, 0, sizeof(errbuf));

		sent = cdbconn_signalQE(segdbDesc, errbuf, waitMode == DISPATCH_WAIT_CANCEL);
		if (sent)
			dispatchResult->sentSignal = waitMode;
774
		else
775 776
			elog(LOG, "Unable to cancel: %s",
				 strlen(errbuf) == 0 ? "cannot allocate PGCancel" : errbuf);
777 778 779 780 781 782 783
	}
}

/*
 * Check if any segment DB down is detected by FTS.
 */
static void
784
checkSegmentAlive(CdbDispatchCmdAsync *pParms)
785
{
786
	int			i;
787 788

	/*
A
Ashwin Agrawal 已提交
789
	 * check the connection still valid
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Skip if already finished or didn't dispatch.
		 */
		if (!dispatchResult->stillRunning)
			continue;

		/*
		 * Skip the entry db.
		 */
		if (segdbDesc->segindex < 0)
			continue;

		ELOG_DISPATCHER_DEBUG("FTS testing connection %d of %d (%s)",
							  i + 1, pParms->dispatchCount, segdbDesc->whoami);

A
Ashwin Agrawal 已提交
811
		if (!FtsIsSegmentUp(segdbDesc->segment_database_info))
812
		{
813 814
			char	   *msg = PQerrorMessage(segdbDesc->conn);

815 816
			dispatchResult->stillRunning = false;
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
817 818
										   "FTS detected connection lost during dispatch to %s: %s",
										   dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error");
819

820
			/*
821 822
			 * Not a good idea to store into the PGconn object. Instead, just
			 * close it.
823 824
			 */
			PQfinish(segdbDesc->conn);
825
			segdbDesc->conn = NULL;
826 827 828 829 830 831 832 833 834 835 836
		}
	}
}

/*
 * Receive and process input from one QE.
 *
 * Return true if all input are consumed or the connection went wrong.
 * Return false if there'er still more data expected.
 */
static bool
837
processResults(CdbDispatchResult *dispatchResult)
838 839
{
	SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
840
	char	   *msg;
841 842 843 844 845 846 847 848

	/*
	 * Receive input from QE.
	 */
	if (PQconsumeInput(segdbDesc->conn) == 0)
	{
		msg = PQerrorMessage(segdbDesc->conn);
		cdbdisp_appendMessageNonThread(dispatchResult, LOG,
849 850
									   "Error on receive from %s: %s",
									   segdbDesc->whoami, msg ? msg : "unknown error");
851 852 853 854 855 856 857 858 859
		return true;
	}

	/*
	 * If we have received one or more complete messages, process them.
	 */
	while (!PQisBusy(segdbDesc->conn))
	{
		/* loop to call PQgetResult; won't block */
860
		PGresult   *pRes;
861
		ExecStatusType resultStatus;
862
		int			resultIndex;
863 864

		/*
865 866 867
		 * PQisBusy() does some error handling, which can cause the connection
		 * to die -- we can't just continue on as if the connection is happy
		 * without checking first.
868
		 *
869 870
		 * For example, cdbdisp_numPGresult() will return a completely bogus
		 * value!
871 872 873 874 875
		 */
		if (cdbconn_isBadConnection(segdbDesc))
		{
			msg = PQerrorMessage(segdbDesc->conn);
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
876 877
										   "Connection lost when receiving from %s: %s",
										   segdbDesc->whoami, msg ? msg : "unknown error");
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917
			return true;
		}

		/*
		 * Get one message.
		 */
		ELOG_DISPATCHER_DEBUG("PQgetResult");
		pRes = PQgetResult(segdbDesc->conn);

		/*
		 * Command is complete when PGgetResult() returns NULL. It is critical
		 * that for any connection that had an asynchronous command sent thru
		 * it, we call PQgetResult until it returns NULL. Otherwise, the next
		 * time a command is sent to that connection, it will return an error
		 * that there's a command pending.
		 */
		if (!pRes)
		{
			ELOG_DISPATCHER_DEBUG("%s -> idle", segdbDesc->whoami);
			/* this is normal end of command */
			return true;
		}

		/*
		 * Attach the PGresult object to the CdbDispatchResult object.
		 */
		resultIndex = cdbdisp_numPGresult(dispatchResult);
		cdbdisp_appendResult(dispatchResult, pRes);

		/*
		 * Did a command complete successfully?
		 */
		resultStatus = PQresultStatus(pRes);
		if (resultStatus == PGRES_COMMAND_OK ||
			resultStatus == PGRES_TUPLES_OK ||
			resultStatus == PGRES_COPY_IN ||
			resultStatus == PGRES_COPY_OUT ||
			resultStatus == PGRES_EMPTY_QUERY)
		{
			ELOG_DISPATCHER_DEBUG("%s -> ok %s",
918 919
								  segdbDesc->whoami,
								  PQcmdStatus(pRes) ? PQcmdStatus(pRes) : "(no cmdStatus)");
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935

			if (resultStatus == PGRES_EMPTY_QUERY)
				ELOG_DISPATCHER_DEBUG("QE received empty query.");

			/*
			 * Save the index of the last successful PGresult. Can be given to
			 * cdbdisp_getPGresult() to get tuple count, etc.
			 */
			dispatchResult->okindex = resultIndex;

			/*
			 * SREH - get number of rows rejected from QE if any
			 */
			if (pRes->numRejected > 0)
				dispatchResult->numrowsrejected += pRes->numRejected;

936
			/*
937 938
			 * COPY FROM ON SEGMENT - get the number of rows completed by QE
			 * if any
939 940 941 942
			 */
			if (pRes->numCompleted > 0)
				dispatchResult->numrowscompleted += pRes->numCompleted;

943 944 945 946
			if (resultStatus == PGRES_COPY_IN ||
				resultStatus == PGRES_COPY_OUT)
				return true;
		}
947

948 949 950 951 952 953 954 955 956 957 958 959
		/*
		 * Note QE error. Cancel the whole statement if requested.
		 */
		else
		{
			/* QE reported an error */
			char	   *sqlstate = PQresultErrorField(pRes, PG_DIAG_SQLSTATE);
			int			errcode = 0;

			msg = PQresultErrorMessage(pRes);

			ELOG_DISPATCHER_DEBUG("%s -> %s %s  %s",
960 961 962 963
								  segdbDesc->whoami,
								  PQresStatus(resultStatus),
								  sqlstate ? sqlstate : "(no SQLSTATE)",
								  msg);
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979

			/*
			 * Convert SQLSTATE to an error code (ERRCODE_xxx). Use a generic
			 * nonzero error code if no SQLSTATE.
			 */
			if (sqlstate && strlen(sqlstate) == 5)
				errcode = sqlstate_to_errcode(sqlstate);

			/*
			 * Save first error code and the index of its PGresult buffer
			 * entry.
			 */
			cdbdisp_seterrcode(errcode, resultIndex, dispatchResult);
		}
	}

980
	return false;				/* we must keep on monitoring this socket */
981
}