parallel hash: increase hash buckets bug

This commit is contained in:
wuyuechuan 2020-11-28 09:08:31 +08:00
parent cf4d4cd7c9
commit 91064e7f9d
3 changed files with 99 additions and 7 deletions

View File

@ -1579,7 +1579,7 @@ static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
Assert(batchno == 0);
/* add the tuple to the proper bucket */
ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], hashTuple, hashTuple->next.shared);
ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], hashTuple, hashTuple);
/* advance index past the tuple */
idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len);
@ -2031,11 +2031,19 @@ bool ExecParallelScanHashBucket(HashJoinState* hjstate, ExprContext* econtext)
/* reset temp memory each time to avoid leaks from qual expr */
ResetExprContext(econtext);
if (ExecQual(hjclauses, econtext, false)) {
if (ExecQual(hjclauses, econtext, false) ||
(hjstate->js.nulleqqual != NIL && ExecQual(hjstate->js.nulleqqual, econtext, false))) {
hjstate->hj_CurTuple = hashTuple;
return true;
}
}
/*
* For right Semi/Anti join, we delete mathced tuples in HashTable to make next matching faster,
* so pointer hj_PreTuple is designed to follow the hj_CurTuple and to help us to clear the HashTable.
*/
if (hjstate->js.jointype == JOIN_RIGHT_SEMI || hjstate->js.jointype == JOIN_RIGHT_ANTI) {
hjstate->hj_PreTuple = hashTuple;
}
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
}
@ -3132,11 +3140,8 @@ void ExecHashTableDetach(HashJoinTable hashtable)
*/
static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
{
HashJoinTuple tuple;
ParallelHashJoinState* parallelState = hashtable->parallel_state;
Assert(parallelState);
tuple = (HashJoinTuple)dsa_get_address(hashtable->area, hashtable->buckets.shared[bucketno]);
return tuple;
Assert(hashtable->parallel_state);
return (HashJoinTuple)dsa_get_address(hashtable->area, hashtable->buckets.shared[bucketno]);
}
/*

View File

@ -94,6 +94,63 @@ select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on par
10 | 10
(10 rows)
-- parallel increase hash buckets
DROP TABLE IF EXISTS par_hash_incr_bucket_a;
NOTICE: table "par_hash_incr_bucket_a" does not exist, skipping
DROP TABLE IF EXISTS par_hash_incr_bucket_b;
NOTICE: table "par_hash_incr_bucket_b" does not exist, skipping
DROP TABLE IF EXISTS par_hash_incr_bucket_c;
NOTICE: table "par_hash_incr_bucket_c" does not exist, skipping
DROP TABLE IF EXISTS par_hash_incr_bucket_d;
NOTICE: table "par_hash_incr_bucket_d" does not exist, skipping
create table par_hash_incr_bucket_a(a int,b int,c int,d int,e int);
create table par_hash_incr_bucket_b(a int,b int,c int,d int,e int);
create table par_hash_incr_bucket_c(a int,b int,c int,d int,e int);
create table par_hash_incr_bucket_d(a int,b int,c int,d int,e int);
insert into par_hash_incr_bucket_a select n, n , n , n , n from generate_series(1,100000) n;
insert into par_hash_incr_bucket_b select n, n , n , n , n from generate_series(1,100000) n;
insert into par_hash_incr_bucket_c select n, n , n , n , n from generate_series(1,100000) n;
insert into par_hash_incr_bucket_d select n, n , n , n , n from generate_series(1,100000) n;
explain (costs off) select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d
where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b
and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d
and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a
and par_hash_incr_bucket_b.e %2 =0;
QUERY PLAN
--------------------------------------------------------------------------------------------
Aggregate
-> Hash Join
Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_d.b)
-> Gather
Number of Workers: 2
-> Parallel Hash Join
Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_c.a)
-> Parallel Hash Join
Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d)
-> Parallel Seq Scan on par_hash_incr_bucket_a
-> Parallel Hash
-> Parallel Seq Scan on par_hash_incr_bucket_b
Filter: ((e % 2) = 0)
-> Parallel Hash
-> Parallel Seq Scan on par_hash_incr_bucket_c
-> Hash
-> Seq Scan on par_hash_incr_bucket_d
(17 rows)
select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d
where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b
and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d
and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a
and par_hash_incr_bucket_b.e %2 =0;
count
-------
50000
(1 row)
DROP TABLE IF EXISTS par_hash_incr_bucket_a;
DROP TABLE IF EXISTS par_hash_incr_bucket_b;
DROP TABLE IF EXISTS par_hash_incr_bucket_c;
DROP TABLE IF EXISTS par_hash_incr_bucket_d;
drop table parallel_hashjoin_test_a;
drop table parallel_hashjoin_test_b;
reset parallel_setup_cost;

View File

@ -19,6 +19,34 @@ select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b
explain (costs off)select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10;
select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10;
-- parallel increase hash buckets
DROP TABLE IF EXISTS par_hash_incr_bucket_a;
DROP TABLE IF EXISTS par_hash_incr_bucket_b;
DROP TABLE IF EXISTS par_hash_incr_bucket_c;
DROP TABLE IF EXISTS par_hash_incr_bucket_d;
create table par_hash_incr_bucket_a(a int,b int,c int,d int,e int);
create table par_hash_incr_bucket_b(a int,b int,c int,d int,e int);
create table par_hash_incr_bucket_c(a int,b int,c int,d int,e int);
create table par_hash_incr_bucket_d(a int,b int,c int,d int,e int);
insert into par_hash_incr_bucket_a select n, n , n , n , n from generate_series(1,100000) n;
insert into par_hash_incr_bucket_b select n, n , n , n , n from generate_series(1,100000) n;
insert into par_hash_incr_bucket_c select n, n , n , n , n from generate_series(1,100000) n;
insert into par_hash_incr_bucket_d select n, n , n , n , n from generate_series(1,100000) n;
explain (costs off) select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d
where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b
and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d
and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a
and par_hash_incr_bucket_b.e %2 =0;
select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d
where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b
and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d
and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a
and par_hash_incr_bucket_b.e %2 =0;
DROP TABLE IF EXISTS par_hash_incr_bucket_a;
DROP TABLE IF EXISTS par_hash_incr_bucket_b;
DROP TABLE IF EXISTS par_hash_incr_bucket_c;
DROP TABLE IF EXISTS par_hash_incr_bucket_d;
drop table parallel_hashjoin_test_a;
drop table parallel_hashjoin_test_b;
reset parallel_setup_cost;
@ -26,3 +54,5 @@ reset min_parallel_table_scan_size;
reset parallel_tuple_cost;
reset enable_nestloop;
reset force_parallel_mode;