小布源码分析札记-05 :PG进程间通讯的方式

xiaobu 7天前 37

大家在阅读PG源代码时,会遇到进程之间通讯的代码。本文给初学者介绍一下PG的后台进程之间的通讯方式。


第一种方式是共享内存。所谓共享内存,就是两个或者多个进程可以访问的同一块内存。PG的共享内存是由主进程postmaster在启动阶段通过调用mmap()的系统调用创建的。PG的任何后台进程都是postmaster通过系统调用fork()产生的子进程,不存在孙进程的概念,所有的进程只有一个爸爸就是postmaster。

当一个进程通过fork()出一个子进程后,该子进程会继承父进程的很多资源,包括父进程创建的共享内存。所以postmaster创建完共享内存后,再通过fork()创建子进程,子进程自然而然地拥有了对这块共享内存的访问权限,包括读和写。

明白了这个基本原理,我们很容易理解两个PG进程之间使用共享内存通讯,即一个进程在约定好的内存里存放一些东西,另外一个进程隔一段时间去检查这个约定好的内存即可,它一旦看到约定的地方有小纸条,就能会心一笑:月上柳梢头,人约黄昏后。

fork process

第二种方式是信号signal。一个进程可以先拿到另外一个进程的进程号pid,然后通过kill(pid, SIGXXX)的方式给这个进程发送一个信号,类似一个人拍一下另外一个人的肩膀,示意他要注意一些事情。下面是kill()系统调用的函数原型:

#include <signal.h>

int kill(pid_t pid, int sig);

系统调用kill()非常简单,两个入口参数的含义一目了然。kill()的调用是软中断。我们可以想象一个人正在忙自己的事情,突然另外一个人拍了一下他的肩膀,他本能地会放下手中的事情,回头看一下,这说明他被“中断“了。这种机制可以确保这个人不需要通过反复检查某个信号来获悉某个事情的发生。很显然,反复检查一个东西的方式是比较低效、愚笨的方式。

信号方式的缺点是它能够传递的信息量有限,仅仅是引发对方注意,引发对方中断而已,所以它和共享内存两种方式各有优缺点。

有了信号和共享内存两种机制,我们就很容易构造一个更加优雅的进程之间通讯方式。假设进程A想和进程B通讯,进程A需要做两件事情:第一件事情是往共享内存中提前放置它想传递给B的信息,因为共享内存很大,A想放多少信息就放多少信息。第二件事情就是A给B通过kill()发送一个约定的信号SIGXXX,如SIGINT, SIGUSER1, SIGTERM等等。B收到这个信号后,就会被中断,它的执行指令流就会跳转到一个事先注册好的函数中,这个函数被称为事件处理函数。在这个函数中,B可以检查共享内存中的信息,进一步获悉A想传递的信息是什么。

 

https://cherry-creek.net/p/fork_process.svg

以上就是PG两个进程之间传递信息的主要方式。大家可以参考RequestCheckpoint()这个函数,它是由想触发检查点的进程调用。这个函数的要点是在

	/*
	 * Atomically set the request flags, and take a snapshot of the counters.
	 * When we see ckpt_started > old_started, we know the flags we set here
	 * have been seen by checkpointer.
	 *
	 * Note that we OR the flags with any existing flags, to avoid overriding
	 * a "stronger" request by another backend.  The flag senses must be
	 * chosen to make this work!
	 */
	SpinLockAcquire(&CheckpointerShmem->ckpt_lck);

	old_failed = CheckpointerShmem->ckpt_failed;
	old_started = CheckpointerShmem->ckpt_started;
         /// 设置共享内存中的ckpt_flags
	CheckpointerShmem->ckpt_flags |= (flags | CHECKPOINT_REQUESTED); 

	SpinLockRelease(&CheckpointerShmem->ckpt_lck);

进程checkpointer的主体是一个无限循环,它会反复检查CheckpointerShmem->ckpt_flags的值是否为0,如果是非0,则意味着有别的进程向它发出执行检查点的请求。如果没有,则checkpointer进程会睡眠一段时间,大约十几毫秒。

如果调用者想等待checkpointer执行检查点操作完毕后再往下执行它自己的操作,可以设置RequestCheckpoint()函数的入口参数flag为CHECKPOINT_WAIT,它表示”等待检查点执行完毕“后再返回的意思。

	/*
	 * If requested, wait for completion.  We detect completion according to
	 * the algorithm given above.
	 */
	if (flags & CHECKPOINT_WAIT)
	{
		int			new_started,
					new_failed;

		/* Wait for a new checkpoint to start. */
		ConditionVariablePrepareToSleep(&CheckpointerShmem->start_cv);
		for (;;)
		{
			SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
			new_started = CheckpointerShmem->ckpt_started;
			SpinLockRelease(&CheckpointerShmem->ckpt_lck);

			if (new_started != old_started)
				break;

			ConditionVariableSleep(&CheckpointerShmem->start_cv,
								   WAIT_EVENT_CHECKPOINT_START);
		}
		ConditionVariableCancelSleep();

		/*
		 * We are waiting for ckpt_done >= new_started, in a modulo sense.
		 */
		ConditionVariablePrepareToSleep(&CheckpointerShmem->done_cv);
		for (;;)
		{
			int			new_done;

			SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
			new_done = CheckpointerShmem->ckpt_done;
			new_failed = CheckpointerShmem->ckpt_failed;
			SpinLockRelease(&CheckpointerShmem->ckpt_lck);

			if (new_done - new_started >= 0)
				break;

			ConditionVariableSleep(&CheckpointerShmem->done_cv,
								   WAIT_EVENT_CHECKPOINT_DONE);
		}
		ConditionVariableCancelSleep();

		if (new_failed != old_failed)
			ereport(ERROR,
					(errmsg("checkpoint request failed"),
					 errhint("Consult recent messages in the server log for details.")));
	}

如何确保检查点被执行成功后才返回,涉及到一个算法,它在checkpointer.c的开头有描述。以后我们再探讨这个算法的细节。

 

 

最新回复 (1)
  • xiaobu 5天前
    引用 2

    在autovacuum.c源文件中的开头有这么一段注释:

    * The autovacuum launcher cannot start the worker processes by itself,
     * because doing so would cause robustness issues (namely, failure to shut
     * them down on exceptional conditions, and also, since the launcher is
     * connected to shared memory and is thus subject to corruption there, it is
     * not as robust as the postmaster).  So it leaves that task to the postmaster.
     *
     * There is an autovacuum shared memory area, where the launcher stores
     * information about the database it wants vacuumed.  When it wants a new
     * worker to start, it sets a flag in shared memory and sends a signal to the
     * postmaster.  Then postmaster knows nothing more than it must start a worker;
     * so it forks a new child, which turns into a worker.  This new process
     * connects to shared memory, and there it can inspect the information that the
     * launcher has set up.

    这段注释描述了autovacuum launcher进程(简称AVL)是如何启动autovacuum worker进程(简称AVW)的。当AVL进程想启动AVW进程时,它会在共享内存中设置一个标志,然后给postmaster进程发送一个SIGUSR2的信号。具体代码如下:

    SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);

    AVL就是调用这个函数,要求postmaster进程创建AVW进程的。这个函数的代码如下:

    /*
     * SendPostmasterSignal - signal the postmaster from a child process
     */
    void
    SendPostmasterSignal(PMSignalReason reason)
    {
    	/* If called in a standalone backend, do nothing */
    	if (!IsUnderPostmaster)
    		return;
    	/* Atomically set the proper flag */
    	PMSignalState->PMSignalFlags[reason] = true;
    	/* Send signal to postmaster */
    	kill(PostmasterPid, SIGUSR1);
    }
    

    这个函数的代码非常好理解。IsUnderPostmaster是一个布尔型变量,如果它为true,则意味着本进程是postmaster的亲儿子,是postmaster通过fork()系统调用产生的子进程。它实际上就是把PMSignalState->PMSignalFlags[PMSIGNAL_START_AUTOVAC_WORKER]设置为true,然后通过kill向postmaster进程发送了SIGUSR1信号而已。

    这就是AVL进程创建AVW进程的技术内幕!

返回
发新帖