本節(jié)繼續(xù)介紹PostgreSQL的后臺進(jìn)程walsender,重點介紹的是調(diào)用棧中的exec_replication_command和StartReplication函數(shù).
調(diào)用棧如下:
(gdb) bt
#0 0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0,
nevents=1) at latch.c:1048
#2 0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1,
wait_event_info=83886092) at latch.c:1000
#3 0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999,
wait_event_info=83886092) at latch.c:385
#4 0x000000000085405b in WalSndLoop (send_data=0x8547fe ) at walsender.c:2229
#5 0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6 0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
at walsender.c:1539
#7 0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
at postgres.c:4178
#8 0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9 0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228
StringInfo
StringInfoData結(jié)構(gòu)體保存關(guān)于擴(kuò)展字符串的相關(guān)信息.
/*-------------------------
* StringInfoData holds information about an extensible string.
* StringInfoData結(jié)構(gòu)體保存關(guān)于擴(kuò)展字符串的相關(guān)信息.
* data is the current buffer for the string (allocated with palloc).
* data 通過palloc分配的字符串緩存
* len is the current string length. There is guaranteed to be
* a terminating '\0' at data[len], although this is not very
* useful when the string holds binary data rather than text.
* len 是當(dāng)前字符串的長度.保證以ASCII 0(\0)結(jié)束(data[len] = '\0').
* 雖然如果存儲的是二進(jìn)制數(shù)據(jù)而不是文本時不太好使.
* maxlen is the allocated size in bytes of 'data', i.e. the maximum
* string size (including the terminating '\0' char) that we can
* currently store in 'data' without having to reallocate
* more space. We must always have maxlen > len.
* maxlen 以字節(jié)為單位已分配的'data'的大小,限定了最大的字符串大小(包括結(jié)尾的ASCII 0)
* 小于此尺寸的數(shù)據(jù)可以直接存儲而無需重新分配.
* cursor is initialized to zero by makeStringInfo or initStringInfo,
* but is not otherwise touched by the stringinfo.c routines.
* Some routines use it to scan through a StringInfo.
* cursor 通過makeStringInfo或initStringInfo初始化為0,但不受stringinfo.c例程的影響.
* 某些例程使用該字段掃描StringInfo
*-------------------------
*/
typedef struct StringInfoData
{
char *data;
int len;
int maxlen;
int cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;
exec_replication_command
exec_replication_command執(zhí)行復(fù)制命令,如cmd_string被識別為WalSender命令,返回T,否則返回F.
其主要邏輯如下:
1.執(zhí)行相關(guān)初始化和校驗
2.切換內(nèi)存上下文
3.初始化復(fù)制掃描器
4.執(zhí)行事務(wù)相關(guān)的判斷或校驗
5.初始化輸入輸出消息
6.根據(jù)命令類型執(zhí)行相應(yīng)的命令
6.1命令類型為T_StartReplicationCmd,調(diào)用StartReplication
/*
* Execute an incoming replication command.
* 執(zhí)行復(fù)制命令.
*
* Returns true if the cmd_string was recognized as WalSender command, false
* if not.
* 如cmd_string被識別為WalSender命令,返回T,否則返回F
*/
bool
exec_replication_command(const char *cmd_string)
{
int parse_rc;
Node *cmd_node;
MemoryContext cmd_context;
MemoryContext old_context;
/*
* If WAL sender has been told that shutdown is getting close, switch its
* status accordingly to handle the next replication commands correctly.
* 如果WAL sender已被通知關(guān)閉,切換狀態(tài)以應(yīng)對接下來的復(fù)制命令.
*/
if (got_STOPPING)
WalSndSetState(WALSNDSTATE_STOPPING);
/*
* Throw error if in stopping mode. We need prevent commands that could
* generate WAL while the shutdown checkpoint is being written. To be
* safe, we just prohibit all new commands.
* 如在stopping模式,則拋出錯誤.
* 我們需要在shutdown checkpoint寫入期間禁止命令的產(chǎn)生.
* 安全期間,禁止所有新的命令.
*/
if (MyWalSnd->state == WALSNDSTATE_STOPPING)
ereport(ERROR,
(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
/*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
* command arrives. Clean up the old stuff if there's anything.
* CREATE_REPLICATION_SLOT ... LOGICAL 導(dǎo)出快照直至下個命令到達(dá).
* 如存在,則清理舊的stuff.
*
*/
SnapBuildClearExportedSnapshot();
//檢查中斷
CHECK_FOR_INTERRUPTS();
//命令上下文
cmd_context = AllocSetContextCreate(CurrentMemoryContext,
"Replication command context",
ALLOCSET_DEFAULT_SIZES);
old_context = MemoryContextSwitchTo(cmd_context);
//初始化復(fù)制掃描器
replication_scanner_init(cmd_string);
parse_rc = replication_yyparse();
if (parse_rc != 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errmsg_internal("replication command parser returned %d",
parse_rc))));
cmd_node = replication_parse_result;
/*
* Log replication command if log_replication_commands is enabled. Even
* when it's disabled, log the command with DEBUG1 level for backward
* compatibility. Note that SQL commands are not logged here, and will be
* logged later if log_statement is enabled.
* 如log_replication_commands啟用,則記錄復(fù)制命令在日志中.
* 就算該選項被禁止,通過DEBUG1級別記錄日志.
* 注意SQL命令不在這里記錄,在log_statement啟用的情況下在后續(xù)進(jìn)行記錄.
*
*/
if (cmd_node->type != T_SQLCmd)
ereport(log_replication_commands ? LOG : DEBUG1,
(errmsg("received replication command: %s", cmd_string)));
/*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
* called outside of transaction the snapshot should be cleared here.
* CREATE_REPLICATION_SLOT ... LOGICAL導(dǎo)出快照.
* 該命令如果在事務(wù)的外層被調(diào)用,那么快照應(yīng)在這里清除.
*/
if (!IsTransactionBlock())
SnapBuildClearExportedSnapshot();
/*
* For aborted transactions, don't allow anything except pure SQL, the
* exec_simple_query() will handle it correctly.
* 對于廢棄的事務(wù),除了純SQL外不允許其他命令,exec_simple_query()函數(shù)可以正確處理這種情況.
*/
if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
ereport(ERROR,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("current transaction is aborted, "
"commands ignored until end of transaction block")));
CHECK_FOR_INTERRUPTS();
/*
* Allocate buffers that will be used for each outgoing and incoming
* message. We do this just once per command to reduce palloc overhead.
* 為消息I/O分配緩存.
* 每個命令執(zhí)行一次以減少palloc的負(fù)載.
*/
initStringInfo(&output_message);
initStringInfo(&reply_message);
initStringInfo(&tmpbuf);
/* Report to pgstat that this process is running */
//向pgstat報告該進(jìn)程正在運行.
pgstat_report_activity(STATE_RUNNING, NULL);
//根據(jù)命令類型執(zhí)行相應(yīng)的命令
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
//識別系統(tǒng)
IdentifySystem();
break;
case T_BaseBackupCmd:
//BASE_BACKUP
PreventInTransactionBlock(true, "BASE_BACKUP");
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
case T_CreateReplicationSlotCmd:
//創(chuàng)建復(fù)制slot
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
break;
case T_DropReplicationSlotCmd:
//刪除復(fù)制slot
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
break;
case T_StartReplicationCmd:
//START_REPLICATION
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
PreventInTransactionBlock(true, "START_REPLICATION");
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
break;
}
case T_TimeLineHistoryCmd:
//構(gòu)造時間線歷史 TIMELINE_HISTORY
PreventInTransactionBlock(true, "TIMELINE_HISTORY");
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
break;
case T_VariableShowStmt:
//
{
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
GetPGVariable(n->name, dest);
}
break;
case T_SQLCmd:
//SQL命令
if (MyDatabaseId == InvalidOid)
ereport(ERROR,
(errmsg("cannot execute SQL commands in WAL sender for physical replication")));
/* Report to pgstat that this process is now idle */
pgstat_report_activity(STATE_IDLE, NULL);
/* Tell the caller that this wasn't a WalSender command. */
return false;
default:
//其他命令
elog(ERROR, "unrecognized replication command node tag: %u",
cmd_node->type);
}
/* done */
//執(zhí)行完畢,回到原來的內(nèi)存上下文中
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
/* Send CommandComplete message */
//命令結(jié)束
EndCommand("SELECT", DestRemote);
/* Report to pgstat that this process is now idle */
//報告狀態(tài)
pgstat_report_activity(STATE_IDLE, NULL);
return true;
}
StartReplication
StartReplication處理START_REPLICATION命令.
其主要邏輯如下:
1.執(zhí)行相關(guān)初始化和校驗
2.選擇時間線
3.進(jìn)入COPY模式
3.1設(shè)置狀態(tài)
3.2發(fā)送CopyBothResponse消息,啟動streaming
3.3初始化相關(guān)變量,如共享內(nèi)存狀態(tài)等
3.4進(jìn)入主循環(huán)(WalSndLoop)
/*
* Handle START_REPLICATION command.
* 處理START_REPLICATION命令
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
* 該函數(shù)不會返回,但ereport(ERROR)調(diào)用可以回到主循環(huán)
*/
static void
StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
XLogRecPtr FlushPtr;
if (ThisTimeLineID == 0)
//時間線校驗
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
* 在這里,由于在PostmasterMain()假定已為log-shipping記錄了足夠多的信息
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
* 注意:wal_level只能在shutdown的情況下進(jìn)行修改,
* 因此在大多數(shù)情況下,很難看到在wal_level='minimal'的情況下的WAL數(shù)據(jù).
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
//#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot use a logical replication slot for physical replication"))));
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
* 選擇時間線.
* 如果通過客戶端明確給出,則使用該值.
* 否則的話,使用最后重放記錄的時間線,在ThisTimeLineID中保存.
*/
if (am_cascading_walsender)
{
/* this also updates ThisTimeLineID */
//這也會更新ThisTimeLineID變量
FlushPtr = GetStandbyFlushRecPtr();
}
else
FlushPtr = GetFlushRecPtr();
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
if (sendTimeLine == ThisTimeLineID)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
}
else
{
List *timeLineHistory;
sendTimeLineIsHistoric = true;
/*
* Check that the timeline the client requested exists, and the
* requested start location is on that timeline.
* 檢查客戶端請求的時間線是否存在,請求的開始位置是否在該時間線上.
*/
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
/*
* Found the requested timeline in the history. Check that
* requested startpoint is on that timeline in our history.
* 通過歷史文件找到請求的時間線.
* 在歷史中檢查請求的開始點是否在時間線上.
*
* This is quite loose on purpose. We only check that we didn't
* fork off the requested timeline before the switchpoint. We
* don't check that we switched *to* it before the requested
* starting point. This is because the client can legitimately
* request to start replication from the beginning of the WAL
* segment that contains switchpoint, but on the new timeline, so
* that it doesn't end up with a partial segment. If you ask for
* too old a starting point, you'll get an error later when we
* fail to find the requested WAL segment in pg_wal.
* 這是有意為之.我們只檢查在切換點之前沒有fork off的請求的時間線.
* 我們不會檢查在請求的開始點之前的時間線.
* 這是因為客戶端可以合法地請求從包含交換點的WAL端的開始處進(jìn)行復(fù)制,
* 在新的時間線上如此執(zhí)行,以避免出現(xiàn)由于部分segment的問題導(dǎo)致出錯.
* 如果客戶端請求一個較舊的開始點,在pg_wal中無法找到請求的WAL段時會報錯.
*
* XXX: we could be more strict here and only allow a startpoint
* that's older than the switchpoint, if it's still in the same
* WAL segment.
* XXX: 我們可以更嚴(yán)格,如果仍然在同一個WAL segment中,那么可以只允許比切換點舊的開始點
*/
if (!XLogRecPtrIsInvalid(switchpoint) &&
switchpoint < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
(uint32) (switchpoint >> 32),
(uint32) (switchpoint))));
}
sendTimeLineValidUpto = switchpoint;
}
}
else
{
sendTimeLine = ThisTimeLineID;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
streamingDoneSending = streamingDoneReceiving = false;
/* If there is nothing to stream, don't even enter COPY mode */
//如果沒有任何東西需要stream,不需要啟動COPY命令
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
* 在首次啟動復(fù)制時,standby節(jié)點會落后于master節(jié)點.
* 對于某些應(yīng)用,比如同步復(fù)制,對于這種初始的catchup模式有一個干凈的狀態(tài)是十分重要的,
* 因此在改變streaming狀態(tài)時我們可以觸發(fā)相關(guān)的動作.
* 我們可以處于這種狀態(tài)很長時間,這正是我們希望有能力監(jiān)控我們是否仍在這里的原因.
*/
//設(shè)置狀態(tài)
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
//發(fā)送CopyBothResponse消息,啟動streaming
pq_beginmessage(&buf, 'W');//W->COPY命令?
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
pq_flush();
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
* 不允許請求該服務(wù)器上一個尚未刷入到磁盤上的WAL未來位置.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
(uint32) (FlushPtr >> 32),
(uint32) (FlushPtr))));
}
/* Start streaming from the requested point */
//從請求點開始streaming
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
//初始化共享內(nèi)存狀態(tài)
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Main loop of walsender */
//walsender主循環(huán),開始復(fù)制,激活復(fù)制
replication_active = true;
//主循環(huán)
WalSndLoop(XLogSendPhysical);
//完結(jié)后設(shè)置為非活動狀態(tài)
replication_active = false;
if (got_STOPPING)
proc_exit(0);//退出
//設(shè)置狀態(tài)
WalSndSetState(WALSNDSTATE_STARTUP);
Assert(streamingDoneSending && streamingDoneReceiving);
}
if (cmd->slotname)
ReplicationSlotRelease();
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
* Copy命令已完結(jié).發(fā)送單行結(jié)果集以提升下一個timeline
*/
if (sendTimeLineIsHistoric)
{
char startpos_str[8 + 1 + 8 + 1];
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls));
/*
* Need a tuple descriptor representing two columns. int8 may seem
* like a surprising data type for this, but in theory int4 would not
* be wide enough for this, as TimeLineID is unsigned.
*/
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
TEXTOID, -1, 0);
/* prepare for projection of tuple */
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
values[1] = CStringGetTextDatum(startpos_str);
/* send it to dest */
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
/* Send CommandComplete message */
pq_puttextmessage('C', "START_STREAMING");
}
在主節(jié)點上用gdb跟蹤postmaster,在PostgresMain上設(shè)置斷點后啟動standby節(jié)點,進(jìn)入斷點
在永泰等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計、成都網(wǎng)站制作 網(wǎng)站設(shè)計制作按需網(wǎng)站設(shè)計,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,成都全網(wǎng)營銷推廣,成都外貿(mào)網(wǎng)站建設(shè),永泰網(wǎng)站建設(shè)費用合理。
[xdb@localhost ~]$ ps -ef|grep postgres
xdb 1339 1 2 14:45 pts/0 00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:1438
1438 if (got_STOPPING)
(gdb)
第一個命令是IDENTIFY_SYSTEM,第二個命令才是需要跟蹤的對象START_REPLICATION
(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438 if (got_STOPPING)
(gdb)
1.執(zhí)行相關(guān)初始化和校驗
(gdb) n
1446 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb)
1454 SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0,
writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456 CHECK_FOR_INTERRUPTS();
(gdb)
2.切換內(nèi)存上下文
(gdb)
1458 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb)
1461 old_context = MemoryContextSwitchTo(cmd_context);
(gdb)
3.初始化復(fù)制掃描器
(gdb)
1463 replication_scanner_init(cmd_string);
(gdb) n
1464 parse_rc = replication_yyparse();
(gdb)
1465 if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb)
(gdb) n
1471 cmd_node = replication_parse_result;
(gdb)
(gdb)
1479 if (cmd_node->type != T_SQLCmd)
(gdb) n
1480 ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)
4.執(zhí)行事務(wù)相關(guān)的判斷或校驗
(gdb) n
1487 if (!IsTransactionBlock())
(gdb)
1488 SnapBuildClearExportedSnapshot();
(gdb)
1494 if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
(gdb)
1500 CHECK_FOR_INTERRUPTS();
(gdb)
5.初始化輸入輸出消息
(gdb)
1506 initStringInfo(&output_message);
(gdb)
1507 initStringInfo(&reply_message);
(gdb)
1508 initStringInfo(&tmpbuf);
(gdb)
1511 pgstat_report_activity(STATE_RUNNING, NULL);
6.根據(jù)命令類型執(zhí)行相應(yīng)的命令
6.1命令類型為T_StartReplicationCmd,調(diào)用StartReplication
(gdb) n
1513 switch (cmd_node->type)
(gdb)
1534 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb)
1536 PreventInTransactionBlock(true, "START_REPLICATION");
(gdb)
1538 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb)
1539 StartReplication(cmd);
進(jìn)入StartReplication
1539 StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532 if (ThisTimeLineID == 0)
(gdb)
1.執(zhí)行相關(guān)初始化和校驗
(gdb) n
546 if (cmd->slotname)
(gdb)
560 if (am_cascading_walsender)
(gdb)
2.選擇時間線
(gdb) n
568 if (cmd->timeline != 0)
(gdb)
572 sendTimeLine = cmd->timeline;
(gdb)
573 if (sendTimeLine == ThisTimeLineID)
(gdb)
575 sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576 sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb)
634 streamingDoneSending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16,
startpoint = 1560281088, options = 0x0}
(gdb)
3.進(jìn)入COPY模式
(gdb) n
637 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)
3.1設(shè)置狀態(tài)
648 WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)
3.2發(fā)送CopyBothResponse消息,啟動streaming
(gdb) n
651 pq_beginmessage(&buf, 'W');
(gdb)
652 pq_sendbyte(&buf, 0);
(gdb)
653 pq_sendint16(&buf, 0);
(gdb)
654 pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0: 0
(gdb) x/32hb buf->data
0x1df53b0: 0 0 0 127 127 127 127 127
0x1df53b8: 127 127 127 127 127 127 127 127
0x1df53c0: 127 127 127 127 127 127 127 127
0x1df53c8: 127 127 127 127 127 127 127 127
(gdb)
3.3初始化相關(guān)變量,如共享內(nèi)存狀態(tài)等
(gdb) n
655 pq_flush();
(gdb)
661 if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672 sentPtr = cmd->startpoint;
(gdb)
675 SpinLockAcquire(&MyWalSnd->mutex);
(gdb)
676 MyWalSnd->sentPtr = sentPtr;
(gdb)
677 SpinLockRelease(&MyWalSnd->mutex);
(gdb)
679 SyncRepInitConfig();
(gdb)
682 replication_active = true;
3.4進(jìn)入主循環(huán)(WalSndLoop)
(gdb)
684 WalSndLoop(XLogSendPhysical);
(gdb)
DONE!
PG Source Code