Changeset Extraction

CREATE TABLE src(src_id serial primary key, data text);
CREATE TABLE trgt(trgt_id serial primary key, src_id int, data text);

INSERT INTO src(data) VALUES ('andres'), ('freund');

WITH foo AS (
    DELETE FROM src RETURNING *
)
INSERT INTO trgt(src_id, data) SELECT src_id, data FROM foo;

Changeset Extraction - Output

=# SELECT * FROM start_logical_replication('my-slot-name', 'now', 'include-timestamp', 'yes');
 0/3BBF1A8 | 1822 | BEGIN 1822
 0/3BBF000 | 1822 | table "src": INSERT: src_id[int4]:1 data[text]:andres
 0/3BBF0C8 | 1822 | table "src": INSERT: src_id[int4]:2 data[text]:freund
 0/3BBF1A8 | 1822 | COMMIT 1822 (at 2013-11-08 09:22:02.78975+01)
 0/3BBF4B0 | 1823 | BEGIN 1823
 0/3BBF1A8 | 1823 | table "src": DELETE: src_id[int4]:1
 0/3BBF2B0 | 1823 | table "trgt": INSERT: trgt_id[int4]:1 src_id[int4]:1 data[text]:andres
 0/3BBF380 | 1823 | table "src": DELETE: src_id[int4]:2
 0/3BBF3C8 | 1823 | table "trgt": INSERT: trgt_id[int4]:2 src_id[int4]:2 data[text]:freund
 0/3BBF4B0 | 1823 | COMMIT 1823 (at 2013-11-08 09:22:02.794058+01)

Existing Solutions

Problems:

Different Usecases

⇒ Different Output Formats

New Architecture

index__1.png

Output Plugins

Example: JSON Output Plugin

{
    "xid": 1822,"timestamp": "2013-11-08 09:22:02.78975+01",
    "changes": [
        {"kind": "insert",
         "schema": "public",
         "table": "src",
         "columnnames": ["src_id", "data"],
         "columntypes": ["int","text"],
         "columnvalues": [1,"andres"]
        },
            ...
    ]
}
\0

Example: Binary Output Plugin

⇒ much smaller and much more effecient, more complex to use

Several Streams - Slots

Lifecycle

index__2.png

Decoding Lifecycle - INIT

Decoding Lifecycle - START

Decoding Lifecycle - FREE

Exported Snapshot #1

INIT_LOGICAL_REPLICATION "test" "test_decoding";
LOG:  01000: Initiating logical rep from 0/3BE82A0
 replication_id | consistent_point | snapshot_name |    plugin
----------------+------------------+---------------+---------------
 test           | 0/3BE82D8        | 00000723-1    | test_decoding

Exported Snapshot #2

postgres=# DELETE FROM trgt;
postgres=# SELECT * FROM trgt;
 trgt_id | src_id | data
---------+--------+------
(0 rows)
postgres=# START TRANSACTION ISOLATION LEVEL REPEATABLE READ;
postgres=# SET TRANSACTION SNAPSHOT '00000723-1';
SET
Time: 0.365 ms
postgres=# SELECT * FROM trgt;
 trgt_id | src_id |  data
---------+--------+--------
       1 |      1 | andres
       2 |      2 | freund
(2 rows)
postgres=# COMMIT;

Exported Snapshot #3

postgres=# SELECT * FROM start_logical_replication('test', 'now', 'include-timestamp', 'yes');
 location  | xid  |                     data
-----------+------+-----------------------------------------------
 0/3BE8548 | 1828 | BEGIN 1828
 0/3BE8400 | 1828 | table "trgt": DELETE: trgt_id[int4]:1
 0/3BE84D0 | 1828 | table "trgt": DELETE: trgt_id[int4]:2
 0/3BE8548 | 1828 | COMMIT 1828 (at 2013-11-08 11:26:06.11296+01)

Receiving Changes

Feedback Messages

Some Details

Status

Why do we do this

BDR

Related Pieces

Missing Pieces

Plans

Info