信号处理整体流程
- 信号从bgworker发出后,主进程将ParallelMessagePending置为true,下次CHECK_FOR_INTERRUPTS()时,会进入信号处理逻辑中:HandleParallelMessages。
- 进入信号处理逻辑后,首先遍历所有现存的ParallelContext,每一个ParallelContext都是由CreateParallelContext创建出来的,代表一个并行逻辑单元。例如vacuum有一个ParallelContext、并行执行计划也会创建一个ParallelContext。
- 每一个ParallelContext中,都会管理多个并行bgworker进程,而每一个bgworker都会提前申请一个共享内存mq,也就是error_mqh,具体在ParallelContext->worker[i].error_mqh中。
- 开始遍历每个进程的mq,看其中是否存在消息,如果有消息则安下面HandleParallelMessage逻辑处理掉即可。
  
信号处理逻辑
static void
HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
{
	char		msgtype;
	if (pcxt->known_attached_workers != NULL &&
		!pcxt->known_attached_workers[i])
	{
		pcxt->known_attached_workers[i] = true;
		pcxt->nknown_attached_workers++;
	}
	msgtype = pq_getmsgbyte(msg);
	switch (msgtype)
	{
		case 'K':				/* BackendKeyData */
			{
				int32		pid = pq_getmsgint(msg, 4);
				(void) pq_getmsgint(msg, 4);	/* discard cancel key */
				(void) pq_getmsgend(msg);
				pcxt->worker[i].pid = pid;
				break;
			}
		case 'E':				/* ErrorResponse */
		case 'N':				/* NoticeResponse */
			{
				ErrorData	edata;
				ErrorContextCallback *save_error_context_stack;
				/* Parse ErrorResponse or NoticeResponse. */
				pq_parse_errornotice(msg, &edata);
				/* Death of a worker isn't enough justification for suicide. */
				edata.elevel = Min(edata.elevel, ERROR);
				/*
				 * If desired, add a context line to show that this is a
				 * message propagated from a parallel worker.  Otherwise, it
				 * can sometimes be confusing to understand what actually
				 * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
				 * because it causes test-result instability depending on
				 * whether a parallel worker is actually used or not.)
				 */
				if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
				{
					if (edata.context)
						edata.context = psprintf("%s\n%s", edata.context,
												 _("parallel worker"));
					else
						edata.context = pstrdup(_("parallel worker"));
				}
				/*
				 * Context beyond that should use the error context callbacks
				 * that were in effect when the ParallelContext was created,
				 * not the current ones.
				 */
				save_error_context_stack = error_context_stack;
				error_context_stack = pcxt->error_context_stack;
				/* Rethrow error or print notice. */
				ThrowErrorData(&edata);
				/* Not an error, so restore previous context stack. */
				error_context_stack = save_error_context_stack;
				break;
			}
		case 'A':				/* NotifyResponse */
			{
				/* Propagate NotifyResponse. */
				int32		pid;
				const char *channel;
				const char *payload;
				pid = pq_getmsgint(msg, 4);
				channel = pq_getmsgrawstring(msg);
				payload = pq_getmsgrawstring(msg);
				pq_endmessage(msg);
				NotifyMyFrontEnd(channel, payload, pid);
				break;
			}
		case 'X':				/* Terminate, indicating clean exit */
			{
				shm_mq_detach(pcxt->worker[i].error_mqh);
				pcxt->worker[i].error_mqh = NULL;
				break;
			}
		default:
			{
				elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
					 msgtype, msg->len);
			}
	}
}
信号是在哪里发出的?
每一个bgworker都会进入ParallelWorkerMain函数,在ParallelWorkerMain中会根据执行情况发出不同的信号:
例如:
void
ParallelWorkerMain(Datum main_arg)
{
发'K'告知主进程当前子进程的PID。
	pq_beginmessage(&msgbuf, 'K');
	pq_sendint32(&msgbuf, (int32) MyProcPid);
	pq_sendint32(&msgbuf, (int32) MyCancelKey);
	pq_endmessage(&msgbuf);
发'X'告知主进程当前子进程正常退出了。
	pq_putmessage('X', NULL, 0);
}
还有一种情况,bgworker里面执行的逻辑如果出错了怎么办?
会走elog的底层逻辑,给这个mq发'E'或'N',代码逻辑在这里:
 
error mq的内存是在哪里申请的?
和其他dsm中的共享内存结构一样,先调toc分配一段空间,然后初始化这段空间,然后调toc插入。后面用的时候用toc find函数用key来找到这段空间即可,key就是PARALLEL_KEY_ERROR_QUEUE。
 



















