x
1
DECLARE
2
p_clone_id text;
3
p_excluded_columns_text text;
4
sqltemplate text;
5
sqltext text;
6
dblink_connection_name text;
7
dblink_msg text;
8
BEGIN
9
10
IF p_excluded_columns IS NULL THEN
11
p_excluded_columns_text = '';
12
ELSE
13
p_excluded_columns_text = array_to_string(p_excluded_columns, '@');
14
END IF;
15
16
-- Get clone server id
17
SELECT server_id::text INTO p_clone_id
18
FROM lizsync.server_metadata
19
LIMIT 1;
20
21
IF p_clone_id IS NULL THEN
22
RETURN QUERY
23
SELECT
24
NULL, NULL, NULL, NULL,
25
NULL, NULL, NULL, NULL, NULL, NULL
26
LIMIT 0;
27
END IF;
28
29
-- Create dblink connection
30
dblink_connection_name = (md5(((random())::text || (clock_timestamp())::text)))::text;
31
SELECT dblink_connect(
32
dblink_connection_name,
33
'central_server'
34
)
35
INTO dblink_msg;
36
37
sqltemplate = '
38
39
WITH
40
cid AS (
41
SELECT server_id::text AS p_central_id
42
FROM lizsync.server_metadata
43
LIMIT 1
44
),
45
last_sync AS (
46
SELECT
47
max_action_tstamp_tx,
48
max_event_id
49
FROM lizsync.history
50
WHERE True
51
AND server_from = (SELECT p_central_id FROM cid)
52
AND ''%1$s'' = ANY (server_to)
53
AND sync_status = ''done''
54
ORDER BY sync_time DESC
55
LIMIT 1
56
),
57
tables AS (
58
SELECT sync_tables
59
FROM lizsync.synchronized_tables
60
WHERE server_id = ''%1$s''::uuid
61
LIMIT 1
62
)
63
SELECT
64
a.event_id,
65
a.action_tstamp_tx AS action_tstamp_tx,
66
extract(epoch from a.action_tstamp_tx)::numeric AS action_tstamp_epoch,
67
concat(a.schema_name, ''.'', a.table_name) AS ident,
68
a.action AS action_type,
69
CASE
70
WHEN a.sync_data->>''origin'' IS NULL THEN ''central''
71
ELSE ''clone''
72
END AS origine,
73
Coalesce(
74
lizsync.get_event_sql(
75
a.event_id,
76
''%2$s''::text,
77
array_cat(
78
string_to_array(''%3$s'',''@''),
79
array_remove(akeys(a.changed_fields), s)
80
)
81
),
82
''''
83
) AS action,
84
s AS updated_field,
85
(a.row_data->''%2$s'')::uuid AS uid,
86
CASE
87
WHEN a.sync_data->>''action_tstamp_tx'' IS NOT NULL
88
AND a.sync_data->>''origin'' IS NOT NULL
89
THEN extract(epoch from Cast(a.sync_data->>''action_tstamp_tx'' AS TIMESTAMP WITH TIME ZONE))::numeric
90
ELSE extract(epoch from a.action_tstamp_tx)::numeric
91
END AS original_action_tstamp_tx
92
FROM lizsync.logged_actions AS a
93
-- Create as many lines as there are changed fields in UPDATE
94
LEFT JOIN skeys(a.changed_fields) AS s ON TRUE,
95
last_sync, tables
96
97
WHERE True
98
99
-- modifications do not come from clone database
100
AND (a.sync_data->>''origin'' != ''%1$s'' OR a.sync_data->>''origin'' IS NULL)
101
102
-- modifications have not yet been replayed in the clone database
103
AND (NOT (a.sync_data->''replayed_by'' ? ''%1$s'') OR a.sync_data->''replayed_by'' = jsonb_build_object() )
104
105
-- modifications after the last synchronisation
106
-- MAX_ACTION_TSTAMP_TX Par ex: 2019-04-20 12:00:00+02
107
AND a.action_tstamp_tx > last_sync.max_action_tstamp_tx
108
109
-- Event ID is bigger than last sync event id
110
-- MAX_EVENT_ID
111
AND a.event_id > last_sync.max_event_id
112
113
-- only for tables synchronised by the clone server ID
114
AND sync_tables ? concat(''"'', a.schema_name, ''"."'', a.table_name, ''"'')
115
116
ORDER BY a.event_id
117
;
118
';
119
120
sqltext = format(sqltemplate,
121
p_clone_id,
122
p_uid_field,
123
p_excluded_columns_text
124
);
125
--RAISE NOTICE '%', sqltext;
126
127
RETURN QUERY
128
SELECT *
129
FROM dblink(
130
dblink_connection_name,
131
sqltext
132
) AS t(
133
event_id bigint, action_tstamp_tx timestamp with time zone, action_tstamp_epoch numeric,
134
ident text, action_type text, origine text, action text, updated_field text,
135
uid uuid, original_action_tstamp_tx numeric
136
)
137
;
138
139
END;
140