openGauss-server/contrib/gsredistribute/gsredistribute--1.0.sql

408 lines
17 KiB
PL/PgSQL

/* contrib/gsredistribute/gsredistribute--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION gsredistribute" to load this file. \quit
CREATE OR REPLACE FUNCTION pg_catalog.pg_get_redis_rel_end_ctid(text, name, int, int)
RETURNS tid
AS 'MODULE_PATHNAME','pg_get_redis_rel_end_ctid'
LANGUAGE C STABLE NOT FENCED;
CREATE OR REPLACE FUNCTION pg_catalog.pg_get_redis_rel_start_ctid(text, name, int, int)
RETURNS tid
AS 'MODULE_PATHNAME','pg_get_redis_rel_start_ctid'
LANGUAGE C STABLE NOT FENCED;
CREATE OR REPLACE FUNCTION pg_catalog.pg_enable_redis_proc_cancelable()
RETURNS bool
AS 'MODULE_PATHNAME','pg_enable_redis_proc_cancelable'
LANGUAGE C IMMUTABLE NOT FENCED;
CREATE OR REPLACE FUNCTION pg_catalog.pg_disable_redis_proc_cancelable()
RETURNS bool
AS 'MODULE_PATHNAME','pg_disable_redis_proc_cancelable'
LANGUAGE C IMMUTABLE NOT FENCED;
CREATE OR REPLACE FUNCTION pg_catalog.pg_tupleid_get_ctid_to_bigint(tid)
RETURNS bigint
AS 'MODULE_PATHNAME','pg_tupleid_get_ctid_to_bigint'
LANGUAGE C STABLE SHIPPABLE NOT FENCED;
CREATE OR REPLACE FUNCTION pg_catalog.pg_tupleid_get_blocknum(tid)
RETURNS bigint
AS 'MODULE_PATHNAME','pg_tupleid_get_blocknum'
LANGUAGE C STABLE SHIPPABLE NOT FENCED;
CREATE OR REPLACE FUNCTION pg_catalog.pg_tupleid_get_offset(tid)
RETURNS int
AS 'MODULE_PATHNAME','pg_tupleid_get_offset'
LANGUAGE C STABLE SHIPPABLE NOT FENCED;
-- cancel the task of one table, redis_oid means the oid of redis_old_ table oid
CREATE OR REPLACE FUNCTION pg_catalog.cancel_redis_task(
IN redis_oid oid
)
RETURNS void
AS
$$
DECLARE
database_name name;
var_r record;
sql text;
BEGIN
select current_database() into database_name;
if exists (select * from pg_job j, pg_job_proc p
where j.dbname = database_name and j.job_id = p.job_id and
p.what like 'call redis_ts_table('||redis_oid||'%') then
for var_r in (select j.job_id
from pg_job j, pg_job_proc p
where j.dbname = database_name and j.job_id = p.job_id and
p.what like 'call redis_ts_table('||redis_oid||'%') loop
sql := 'select DBE_TASK.cancel('||var_r.job_id||');';
EXECUTE sql;
end loop;
end if;
END;
$$
LANGUAGE plpgsql;
-- cancel all the task in the database, task function is redis_ts_table
CREATE OR REPLACE FUNCTION pg_catalog.cancel_all_redis_task()
RETURNS void
AS
$$
DECLARE
v_count int;
database_name name;
sql text;
BEGIN
select current_database() into database_name;
if exists (select * from pg_job j, pg_job_proc p where j.dbname = database_name and j.job_id = p.job_id
and p.what like 'call redis_ts_table(%') then
for var_r in (select j.job_id
from pg_job j, pg_job_proc p
where j.dbname = database_name and j.job_id = p.job_id
and p.what like 'call redis_ts_table(%') loop
sql := 'select DBE_TASK.cancel('||var_r.job_id||');';
EXECUTE sql;
end loop;
end if;
END;
$$
LANGUAGE plpgsql;
-- submit the task using function cancel_unuse_redis
CREATE OR REPLACE FUNCTION pg_catalog.submit_cancel_redis_task(
IN schedule_interval interval default '1 hour')
RETURNS void
AS
$$
DECLARE
v_count int;
database_name name;
sql text;
job_id integer;
time_interval numeric;
BEGIN
select current_database() into database_name;
if schedule_interval < interval '30 minutes' then
raise exception 'This task interval cannot be less than 30 minutes';
end if;
if not exists (select j.job_id, p.what
from pg_job j, pg_job_proc p
where j.dbname = database_name and j.job_id = p.job_id and p.what like 'call redis_ts_table(%') then
raise notice 'No redis_ts_table task exists for database %, so do not need to cancel task', database_name;
return;
end if;
-- the job can only submit once
if exists (select * from pg_job j, pg_job_proc p
where j.dbname = database_name and j.job_id = p.job_id and p.what like 'call cancel_unuse_redis()') then
raise notice 'The task for cancel_unuse_redis exists, cannot sumbit again.';
return;
end if;
sql := ' SELECT EXTRACT(epoch FROM interval '''||schedule_interval||''')/3600';
EXECUTE sql INTO time_interval;
sql := 'SELECT DBE_TASK.submit(''call cancel_unuse_redis()'', sysdate, ''sysdate + '||time_interval||' / 24'');';
EXECUTE sql into job_id;
END;
$$
LANGUAGE plpgsql;
-- check the job using redis_ts_table, if satisfy some conditins, will cancal the task redis_ts_table
-- and delete from table redis_timeseries_detail, table redis_timeseries_detail records the table that does not
-- finish the redistribiute process
CREATE OR REPLACE FUNCTION pg_catalog.cancel_unuse_redis()
RETURNS void
AS
$$
DECLARE
database_name name;
sql text;
var_r record;
var_jobid record;
BEGIN
select current_database() into database_name;
if not exists (select * from public.redis_timeseries_detail) then
return;
end if;
for var_r in (select reloid from public.redis_timeseries_detail) loop
if exists(select * from pg_class where oid = var_r.reloid) then
continue;
end if;
for var_jobid in (select j.job_id, p.what
from pg_job j, pg_job_proc p
where j.dbname = database_name and j.job_id = p.job_id and
p.what like 'call redis_ts_table('||var_r.reloid||'%') loop
sql := 'select DBE_TASK.cancel('||var_jobid.job_id||');';
EXECUTE sql;
sql := 'delete from redis_timeseries_detail where reloid = '||var_r.reloid||';';
execute sql;
end loop;
end loop;
END;
$$
LANGUAGE plpgsql;
-- the job to transfer the data from redis_old to redis_new, the transfer is the time delta,
-- for timeseries table, have one tstime, using the condition by transfer_interval on tstime
-- drop the partition if it is empty, if it is last partition, just drop the table
CREATE OR REPLACE FUNCTION pg_catalog.redis_ts_table(
--the old table in old nodes, relname is redis_old_
IN redis_oid oid,
IN transfer_interval interval
)
RETURNS void
AS
$$
DECLARE
v_count int;
time_column_name name;
old_rel_name name;
new_rel_name name;
origin_relname name;
schemaname name;
schemaoid oid;
part_name name;
start_time timestamptz;
end_time timestamptz;
i_count bigint;
sql text;
rulename name;
max_try_transfer int;
BEGIN
if not exists(select * from pg_class where oid = redis_oid) then
raise notice 'oid % does not exists.', redis_oid;
return;
end if;
-- revoke this function must guarantee the redis_old and redis_new exists, and it is timeseries table
select c.relname, n.nspname, c.relnamespace from pg_class c, pg_namespace n
where c.oid = redis_oid and n.oid = c.relnamespace
into old_rel_name, schemaname, schemaoid;
origin_relname = right(old_rel_name, length(old_rel_name) - length('redis_old_'));
new_rel_name = 'redis_new_' || origin_relname;
--check if table exists, because last job may drop the table, but next job still execute
if not exists(select * from pg_class where relname = old_rel_name and relnamespace = schemaoid) then
raise notice 'Table %.% does not exists.', schemaname, old_rel_name;
return;
end if;
if not exists(select * from pg_class where relname = new_rel_name and relnamespace = schemaoid) then
raise notice 'Table %.% does not exists.', schemaname, new_rel_name;
return;
end if;
-- cannot just set in current transaction, the kernel check the usess, input parameter must be false
sql := 'select set_config(''enable_cluster_resize'', ''on'', false)';
execute sql;
select attname from pg_attribute where attkvtype = 3 and attrelid = redis_oid into time_column_name;
select relname, boundaries[1] from pg_partition where parttype = 'p' and parentid = redis_oid
order by boundaries[1] desc limit 1 into part_name, start_time;
sql := 'select count(*) from (select * from "'||schemaname||'"."'||old_rel_name||'" partition('||part_name||') limit 1);';
execute sql into v_count;
-- if this partition has no value, drop this partition or drop this table
if v_count = 0 then
sql := 'select count(*) from (select * from pg_partition where parttype = ''p'' and parentid = '||redis_oid||' limit 2);';
execute sql into v_count;
if v_count = 1 then
-- drop the redis_view and rule cascade
sql = 'drop table if exists "'||schemaname||'"."'||old_rel_name||'" cascade;';
execute sql;
--cannot drop table redis_old_ because this task need it, in the task, if we cancel the task, it will raise error
sql = 'alter table "'||schemaname||'"."'||new_rel_name||'" rename to "'||origin_relname||'";';
execute sql;
return;
else
sql = 'alter table "'||schemaname||'"."'||old_rel_name||'" drop partition "'||part_name||'";';
execute sql;
end if;
return;
end if;
-- the partition must has value, so execute delete this time
max_try_transfer = 0;
while 1 loop
sql := 'select timestamp '''||start_time||''' - interval '''||transfer_interval||''';';
execute sql into end_time;
sql := 'insert into "'||schemaname||'"."'||new_rel_name||
'" select * from "'||schemaname||'"."'||old_rel_name||'" partition("'||part_name||'")
where "'||time_column_name||'" between '''||start_time||''' and '''||end_time||''';';
execute sql;
GET DIAGNOSTICS i_count = ROW_COUNT;
if i_count = 0 then
start_time = end_time;
-- for the last partition, if we just try to drop by time interval, it may be very long, so we just execute 50 times and then transfer all the partition
if max_try_transfer >= 50 then
sql := 'select count(*) from (select * from pg_partition where parttype = ''p'' and parentid = '||redis_oid||' limit 2);';
execute sql into v_count;
if v_count = 1 then
-- next run time job will do the last drop rule and table
sql := 'insert into "'||schemaname||'"."'||new_rel_name||
'" select * from "'||schemaname||'"."'||old_rel_name||'" partition("'||part_name||'");';
execute sql;
sql := 'truncate "'||schemaname||'"."'||old_rel_name||'";';
execute sql;
return;
end if;
end if;
else
sql := 'delete from "'||schemaname||'"."'||old_rel_name||'" where "'||time_column_name||'" between '''
||start_time||''' and '''||end_time||''';';
execute sql;
return;
end if;
max_try_transfer := max_try_transfer + 1;
end loop;
END;
$$
LANGUAGE plpgsql;
-- submit the redis_ts_table task for one table, old_oid means redis_old_ which exists in the old group
CREATE OR REPLACE FUNCTION pg_catalog.submit_redis_task(
IN old_oid oid,
IN transfer_interval interval default '10 minutes',
IN schedule_interval interval default '1 minute'
)
RETURNS void
AS
$$
DECLARE
sql text;
time_interval numeric;
database_name name;
job_id int;
BEGIN
select current_database() into database_name;
if exists (select * from pg_job j, pg_job_proc p
where j.dbname = database_name and j.job_id = p.job_id and
p.what like 'call redis_ts_table('||old_oid||'%' limit 1) then
raise notice 'The task for table oid % exists, cannot sumbit again.', old_oid;
return;
end if;
sql := ' SELECT EXTRACT(epoch FROM interval '''||schedule_interval||''')/3600';
EXECUTE sql INTO time_interval;
sql := 'SELECT DBE_TASK.submit(''call redis_ts_table('||old_oid||','''''||transfer_interval||''''');'',
sysdate, ''sysdate + '||time_interval||' / 24'');';
EXECUTE sql into job_id;
-- if drop the table redis_new_ or redis_old_, must cancel the task mannual
END;
$$
LANGUAGE plpgsql;
-- the first is the time column that transfer to other table, the second is the intervel that call the job
CREATE OR REPLACE FUNCTION pg_catalog.submit_all_redis_task(
IN transfer_interval interval default '10 minutes',
IN schedule_interval interval default '1 minute')
RETURNS void
AS
$$
DECLARE
old_group_name name;
var_r record;
v_count int;
sql text;
BEGIN
select count(*) from pgxc_group where in_redistribution = 'y' into v_count;
if v_count = 0 then
raise exception 'No old group in pgxc_group, no need to redistribute';
end if;
select group_name from pgxc_group where in_redistribution = 'y' into old_group_name;
sql := 'select count(*) from pg_class c, pgxc_class xc where array[''orientation=timeseries''] && c.reloptions and
c.relkind = ''r'' and c.parttype=''p'' and xc.pgroup='''||old_group_name||''' and xc.pcrelid = c.oid';
execute sql into v_count;
if v_count = 0 then
raise notice 'No ts table to redis.';
return;
end if;
sql := 'select c.oid from pg_class c, pgxc_class xc where array[''orientation=timeseries''] && c.reloptions and
c.relkind = ''r'' and c.parttype=''p'' and xc.pgroup='''||old_group_name||'''
and xc.pcrelid = c.oid and c.relname like ''redis_old_%''';
for var_r in execute sql loop
sql := 'select pg_catalog.submit_redis_task('||var_r.oid||', '
||quote_literal(transfer_interval)||', '
||quote_literal(schedule_interval)||');';
execute sql;
end loop;
END;
$$
LANGUAGE plpgsql;
-- just flush redistribute instead rule, if do something, user may need to flush the rule
CREATE OR REPLACE FUNCTION pg_catalog.flush_depend_rule(relid Oid)
RETURNS void
AS
$$
DECLARE
relname name;
schemaname name;
schemaoid oid;
newname name;
oldname name;
def text;
leftpos int;
rightpos int;
var_r record;
sql text;
BEGIN
if not exists(select oid from pg_class where oid = relid) then
raise exception 'Table with oid % does not exists', relid;
end if;
select c.relname, c.relnamespace, n.nspname from pg_class c, pg_namespace n
where c.oid = relid and n.oid = c.relnamespace
into relname, schemaoid, schemaname;
if not exists(select rulename from pg_rewrite where ev_class = relid and is_instead = 't') then
raise notice 'Have no rule to flush, nothing need to do';
return;
end if;
sql := 'select set_config(''enable_cluster_resize'', ''on'', false)';
execute sql;
newname = 'redis_new_' ||relname||;
oldname = 'redis_old_' ||relname||;
if not exists(select oid from pg_class where relnamespace = schemaoid and relname = newname) then
raise notice '% does not exists, nothing need to do', newname;
return;
end if;
if not exists(select oid from pg_class where relnamespace = schemaoid and relname = oldname) then
raise notice '% does not exists, nothing need to do', oldname;
return;
end if;
for var_r in (select oid, rulename, ev_type from pg_catalog.pg_rewrite) loop
sql := 'select definition from pg_rules where rulename = '||quote_ident(var_r.rulename)||
' and schemaname = '||quote_ident(schemaname)||' and tablename = '||quote_ident(relname);
execute sql into def;
def = "CREATE OR REPLACE" || substring(def, 7, length(def));
if var_r.ev_type = 6 then
--copy rule and alter table rule
--do not need to rewrite when only alter table add column, the query tree in pg_rewrite does not have the column indo
execute def;
elsif var_r.ev_type = 3 then
sql = split_part(def, ') VALUES (', 1) || ') VALUES (new.*);';
execute sql;
elsif var_r.ev_type = 1 then
sql = split_part(def, 'DO INSTEAD SELECT', 1) || 'DO INSTEAD SELECT * FROM '||quote_ident(schemaname)||'.'||quote_ident(oldname)
||' UNION ALL SELECT * FROM '||quote_ident(schemaname)||'.'||quote_ident(newname)||';';
execute sql;
end if;
end loop;
END;
$$
LANGUAGE plpgsql;