Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
language: c
compiler: gcc
dist: xenial
addons:
apt:
packages:
- libreadline-dev
- zlib1g-dev
install:
- curl -L https://cpanmin.us | perl - App::cpanminus
- ~/perl5/bin/cpanm --local-lib=~/perl5 local::lib && eval $(perl -I ~/perl5/lib/perl5/ -Mlocal::lib)
- ~/perl5/bin/cpanm IPC::Run
- ~/perl5/bin/cpanm Test::More
- ~/perl5/bin/cpanm Time::HiRes
script:
- export CFLAGS="-Og"
- ./configure --enable-debug --enable-cassert --enable-depend --enable-tap-tests BISONFLAGS="-v"
- make check-world
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# PostgreSQL mirror for development purposes

[![Travis-ci Status](https://travis-ci.org/ololobus/postgres.svg?branch=master)](https://travis-ci.org/ololobus/postgres)

This directory contains the source code distribution of the PostgreSQL
database management system.

PostgreSQL is an advanced object-relational database management system
that supports an extended subset of the SQL standard, including
transactions, foreign keys, subqueries, triggers, user-defined types
and functions. This distribution also contains C language bindings.

PostgreSQL has many language interfaces, many of which are listed here:

https://www.postgresql.org/download

See the file INSTALL for instructions on how to build and install
PostgreSQL. That file also lists supported operating systems and
hardware platforms and contains information regarding any other
software packages that are required to build or run the PostgreSQL
system. Copyright and license information can be found in the
file COPYRIGHT. A comprehensive documentation set is included in this
distribution; it can be read as described in the installation
instructions.

The latest version of this software may be obtained at
https://www.postgresql.org/download/. For more information look at our
web site located at https://www.postgresql.org/.
85 changes: 85 additions & 0 deletions contrib/test_decoding/test_decoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static void pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr apply_lsn);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);

void
_PG_init(void)
Expand All @@ -83,6 +101,12 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
cb->stream_message_cb = pg_decode_stream_message;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_abort_cb = pg_decode_stream_abort;
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_start_cb = pg_decode_stream_start;
cb->stream_stop_cb = pg_decode_stream_stop;
}


Expand Down Expand Up @@ -549,3 +573,64 @@ pg_decode_message(LogicalDecodingContext *ctx,
appendBinaryStringInfo(ctx->out, message, sz);
OutputPluginWrite(ctx, true);
}

static void
pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
const char *prefix, Size sz, const char *message)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
transactional, prefix, sz);
appendBinaryStringInfo(ctx->out, message, sz);
OutputPluginWrite(ctx, true);
}

static void
pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
OutputPluginWrite(ctx, true);
}

static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
OutputPluginWrite(ctx, true);
}

static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr apply_lsn)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
OutputPluginWrite(ctx, true);
}

static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
OutputPluginWrite(ctx, true);
}

static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
OutputPluginWrite(ctx, true);
}
32 changes: 32 additions & 0 deletions doc/src/sgml/config.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,38 @@ include_dir 'conf.d'
</listitem>
</varlistentry>

<varlistentry id="guc-logical-work-mem" xreflabel="logical_work_mem">
<term><varname>logical_work_mem</varname> (<type>integer</type>)
<indexterm>
<primary><varname>logical_work_mem</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the maximum amount of memory to be used by logical decoding,
before some of the decoded changes are either written to local disk
or streamed over network to the downstream node. This both limits the
amount of memory used by logical streaming replication connections, and
helps to reduce the apply lag by transferring changes from in-progress
transactions. It defaults to the same value as
<varname>maintenance_work_mem</varname>, that is 64 megabytes
(<literal>64MB</literal>). Since each replication connection only uses
a single buffer of this size, and an installation normally doesn't have
many such connections concurrently (as limited by
<varname>max_wal_senders</varname>), it's safe to set this value
significantly higher than <varname>work_mem</varname>, reducing the
amount of decoded changes written to disk.
</para>
<para>
Note that when an output plugin supports streaming of in-progress
transactions, using higher values may run counter the goal of reducing
apply lag by streaming fewer transactions. In these cases it may be
beneficial to use a smaller values, to stream decoded changes more
aggressively.
</para>
</listitem>
</varlistentry>

<varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
<term><varname>max_stack_depth</varname> (<type>integer</type>)
<indexterm>
Expand Down
184 changes: 184 additions & 0 deletions doc/src/sgml/logicaldecoding.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
Expand All @@ -400,6 +406,14 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
If <function>truncate_cb</function> is not set but a
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
</para>

<para>
An output plugin may also define functions to support streaming of large,
in-progress transactions. The <function>stream_change_cb</function>,
<function>stream_commit_cb</function>, <function>stream_abort_cb</function>,
<function>stream_start_cb</function> and <function>stream_stop_cb</function>
are required, while <function>stream_message_cb</function> is optional.
</para>
</sect2>

<sect2 id="logicaldecoding-capabilities">
Expand Down Expand Up @@ -678,6 +692,89 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
</para>
</sect3>

<sect3 id="logicaldecoding-output-plugin-stream-start">
<title>Stream Start Callback</title>
<para>
The <function>stream_start_cb</function> callback is called when opening
a block of streamed changes from an in-progress transaction.
<programlisting>
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
</programlisting>
</para>
</sect3>

<sect3 id="logicaldecoding-output-plugin-stream-stop">
<title>Stream Stop Callback</title>
<para>
The <function>stream_stop_cb</function> callback is called when closing
a block of streamed changes from an in-progress transaction.
<programlisting>
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
</programlisting>
</para>
</sect3>

<sect3 id="logicaldecoding-output-plugin-stream-change">
<title>Stream Change Callback</title>
<para>
The <function>stream_change_cb</function> callback is called when sending
a change in a block of streamed changes (demarcated by
<function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
<programlisting>
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
</programlisting>
</para>
</sect3>

<sect3 id="logicaldecoding-output-plugin-stream-message">
<title>Stream Message Callback</title>
<para>
The <function>stream_message_cb</function> callback is called when sending
a generic message in a block of streamed changes (demarcated by
<function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
<programlisting>
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
</programlisting>
</para>
</sect3>

<sect3 id="logicaldecoding-output-plugin-stream-commit">
<title>Stream Commit Callback</title>
<para>
The <function>stream_commit_cb</function> callback is called to commit
a previously streamed transaction.
<programlisting>
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
</programlisting>
</para>
</sect3>

<sect3 id="logicaldecoding-output-plugin-stream-abort">
<title>Stream Abort Callback</title>
<para>
The <function>stream_abort_cb</function> callback is called to abort
a previously streamed transaction.
<programlisting>
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
</programlisting>
</para>
</sect3>

</sect2>

<sect2 id="logicaldecoding-output-plugin-output">
Expand Down Expand Up @@ -746,4 +843,91 @@ OutputPluginWrite(ctx, true);
</para>
</note>
</sect1>

<sect1 id="logicaldecoding-streaming">
<title>Streaming of Large Transactions for Logical Decoding</title>

<para>
The basic output plugin callbacks (e.g. <function>begin_cb</function>,
<function>change_cb</function>, <function>commit_cb</function> and
<function>message_cb</function>) are only invoked when the transaction
actually commits. The changes are still decoded from the transaction
log, but are only passed to the output plugin at commit (and discarded
if the transaction aborts).
</para>

<para>
This means that while the decoding happens incrementally, and may spill
to disk to keep memory usage under control, all the decoded changes have
to be transmitted when the transaction finally commits (or more precisely,
when the commit is decoded from the transaction log). Depending on the
size of the transaction size and network bandwidth, the transfer time
may significantly increase the apply lag.
</para>

<para>
To reduce the apply lag caused by large transactions, an output plugin
may provide additional callback to support incremental streaming of
in-progress transactions. There are multiple required streaming callbacks
(<function>stream_change_cb</function>, <function>stream_commit_cb</function>,
<function>stream_abort_cb</function>, <function>stream_start_cb</function>
and <function>stream_stop_cb</function>) and one optional callback
(<function>stream_message_cb</function>).
</para>

<para>
When streaming an in-progress transaction, the changes (and messages) are
streamed in blocks demarcated by <function>stream_start_cb</function>
and <function>stream_stop_cb</function> callbacks. Once all the decoded
changes are transmitted, the transaction is committed using the
<function>stream_commit_cb</function> callback (or possibly aborted using
the <function>stream_abort_cb</function> callback).
</para>

<para>
One example sequence of streaming callback calls for one transaction may
look like this:
<programlisting>
stream_start_cb(...); &lt;-- start of first block of changes
stream_change_cb(...);
stream_change_cb(...);
stream_message_cb(...);
stream_change_cb(...);
...
stream_change_cb(...);
stream_stop_cb(...); &lt;-- end of first block of changes

stream_start_cb(...); &lt;-- start of second block of changes
stream_change_cb(...);
stream_change_cb(...);
stream_change_cb(...);
...
stream_message_cb(...);
stream_change_cb(...);
stream_stop_cb(...); &lt;-- end of second block of changes

stream_commit_cb(...); &lt;-- commit of the streamed transaction
</programlisting>
</para>

<para>
The actual sequence of callback calls may be more complicated, of course.
There may be blocks for multiple streamed transactions, some of the
transactions may get aborted, etc.
</para>

<para>
Similarly to spill-to-disk behavior, streaming is triggered when the total
amount of changes decoded from the WAL (for all in-progress transactions)
exceeds limit defined by <varname>logical_work_mem</varname> setting. At
that point the largest toplevel transaction (measured by amount of memory
currently used for decoded changes) is selected and streamed.
</para>

<para>
Even when streaming large transactions, the changes are still applied in
commit order, preserving the same guarantees as the non-streaming mode.
</para>

</sect1>
</chapter>
Loading