Make use of initReadOnlyStringInfo() in more places
f0efa5aec introduced the concept of "read-only" StringInfos which makes use of an existing, possibly not NUL terminated, buffer. Here we adjust two places that make use of StringInfos to receive data to avoid using appendBinaryStringInfo() in cases where a NUL termination character is not required. This saves a possible palloc() and saves having to needlessly memcpy() from one buffer to another. Here we adjust two places which were using appendBinaryStringInfo(). Neither of these cases seem particularly performance-critical. In the case of XLogWalRcvProcessMsg(), the appendBinaryStringInfo() was only appending 24 bytes. The change made here does mean that we can get rid of the incoming_message global variable and make that local instead. The apply_spooled_messages() case applies in logical decoding when applying (possibly large) changes which have been serialized to a file. Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/CAApHDvoxYUDHwqPf-ShvchsERf1RzmkGoLwg63JNvHCkDCuyKQ@mail.gmail.com
This commit is contained in:
parent
18b585155a
commit
ac7d6f5f83
@ -2019,7 +2019,6 @@ void
|
|||||||
apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
|
apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
|
||||||
XLogRecPtr lsn)
|
XLogRecPtr lsn)
|
||||||
{
|
{
|
||||||
StringInfoData s2;
|
|
||||||
int nchanges;
|
int nchanges;
|
||||||
char path[MAXPGPATH];
|
char path[MAXPGPATH];
|
||||||
char *buffer = NULL;
|
char *buffer = NULL;
|
||||||
@ -2057,7 +2056,6 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
|
|||||||
CurrentResourceOwner = oldowner;
|
CurrentResourceOwner = oldowner;
|
||||||
|
|
||||||
buffer = palloc(BLCKSZ);
|
buffer = palloc(BLCKSZ);
|
||||||
initStringInfo(&s2);
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldcxt);
|
MemoryContextSwitchTo(oldcxt);
|
||||||
|
|
||||||
@ -2079,6 +2077,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
|
|||||||
nchanges = 0;
|
nchanges = 0;
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
|
StringInfoData s2;
|
||||||
size_t nbytes;
|
size_t nbytes;
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
@ -2104,9 +2103,8 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
|
|||||||
|
|
||||||
BufFileTell(stream_fd, &fileno, &offset);
|
BufFileTell(stream_fd, &fileno, &offset);
|
||||||
|
|
||||||
/* copy the buffer to the stringinfo and call apply_dispatch */
|
/* init a stringinfo using the buffer and call apply_dispatch */
|
||||||
resetStringInfo(&s2);
|
initReadOnlyStringInfo(&s2, buffer, len);
|
||||||
appendBinaryStringInfo(&s2, buffer, len);
|
|
||||||
|
|
||||||
/* Ensure we are reading the data into our memory context. */
|
/* Ensure we are reading the data into our memory context. */
|
||||||
oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
|
oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
|
||||||
|
@ -132,7 +132,6 @@ typedef enum WalRcvWakeupReason
|
|||||||
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
|
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
|
||||||
|
|
||||||
static StringInfoData reply_message;
|
static StringInfoData reply_message;
|
||||||
static StringInfoData incoming_message;
|
|
||||||
|
|
||||||
/* Prototypes for private functions */
|
/* Prototypes for private functions */
|
||||||
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
|
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
|
||||||
@ -425,7 +424,6 @@ WalReceiverMain(void)
|
|||||||
/* Initialize LogstreamResult and buffers for processing messages */
|
/* Initialize LogstreamResult and buffers for processing messages */
|
||||||
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
|
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
|
||||||
initStringInfo(&reply_message);
|
initStringInfo(&reply_message);
|
||||||
initStringInfo(&incoming_message);
|
|
||||||
|
|
||||||
/* Initialize nap wakeup times. */
|
/* Initialize nap wakeup times. */
|
||||||
now = GetCurrentTimestamp();
|
now = GetCurrentTimestamp();
|
||||||
@ -843,19 +841,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
|
|||||||
TimestampTz sendTime;
|
TimestampTz sendTime;
|
||||||
bool replyRequested;
|
bool replyRequested;
|
||||||
|
|
||||||
resetStringInfo(&incoming_message);
|
|
||||||
|
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
case 'w': /* WAL records */
|
case 'w': /* WAL records */
|
||||||
{
|
{
|
||||||
/* copy message to StringInfo */
|
StringInfoData incoming_message;
|
||||||
|
|
||||||
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
|
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
|
||||||
if (len < hdrlen)
|
if (len < hdrlen)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg_internal("invalid WAL message received from primary")));
|
errmsg_internal("invalid WAL message received from primary")));
|
||||||
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
|
|
||||||
|
/* initialize a StringInfo with the given buffer */
|
||||||
|
initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
|
||||||
|
|
||||||
/* read the fields */
|
/* read the fields */
|
||||||
dataStart = pq_getmsgint64(&incoming_message);
|
dataStart = pq_getmsgint64(&incoming_message);
|
||||||
@ -870,13 +869,16 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
|
|||||||
}
|
}
|
||||||
case 'k': /* Keepalive */
|
case 'k': /* Keepalive */
|
||||||
{
|
{
|
||||||
/* copy message to StringInfo */
|
StringInfoData incoming_message;
|
||||||
|
|
||||||
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
|
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
|
||||||
if (len != hdrlen)
|
if (len != hdrlen)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg_internal("invalid keepalive message received from primary")));
|
errmsg_internal("invalid keepalive message received from primary")));
|
||||||
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
|
|
||||||
|
/* initialize a StringInfo with the given buffer */
|
||||||
|
initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
|
||||||
|
|
||||||
/* read the fields */
|
/* read the fields */
|
||||||
walEnd = pq_getmsgint64(&incoming_message);
|
walEnd = pq_getmsgint64(&incoming_message);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user