Queue-based Concurrent Stats Prototype Implementation

This is just a prototype of a queue-based concurrent statistics implementation - using the same basic implementation I've used a a couple of years ago to create indexes concurrently.There are reasons why such an implementation might be useful - in 11.2.0.x the built-in Concurrent Stats feature might turn out to be not really that efficient by creating lots of jobs that potentially attempt to gather statistics for different sub-objects of the same table at the same time - which can lead to massive contention on Library Cache level due to the exclusive Library Cache locks required by DDL / DBMS_STATS calls.In 12.1 the Concurrent Stats feature obviously got a major re-write by using some more intelligent processing what and how should be processed concurrently - some of the details are exposed via the new view DBA_OPTSTAT_OPERATION_TASKS, but again I've seen it running lots of very small jobs serially one of the other in the main session which can be a performance problem if the sheer number of objects to analyze is huge.This prototype here tries to work around these problems by using a queue-based approach for true concurrent processing in combination with an attempt to use some "intelligent" ordering of the objects to analyze in the hope to minimize contention on Library Cache level.This prototype determines the objects to gather statistics on by calling DBMS_STATS.GATHER_DATABASE_STATS using one of the available LIST* options - so it's supposed to replace a call to GATHER_DATABASE_STATS or the built-in default nightly statistics job.The jobs for concurrent stats gathering are created using DBMS_SCHEDULER and a custom job class, which offers the feature of binding the jobs to a specific service, which can come handy if you for example want these jobs only to execute on certain node(s) in a RAC cluster database.It comes with the following (known) limitations:- The current implementation offers only rudimentary logging to a very simple log table, which also gets truncated at the start of each run, so no history from previous runs gets retained. In 12c this is not such an issue since DBA_OPTSTAT_OPERATION_TASKS contains a lot of details for each individual stats gathering call.- Currently only objects of type TABLE returned by GATHER_DATABASE_STATS are considered assuming the CASCADE option will take care of any indexes to gather statistics for- The default behaviour attempts to make use of all available CPUs of a single node by starting as many threads as defined via CPU_COUNT. If you explicitly specify the number of concurrent threads the default behaviour is to use a DEGREE (or DOP) per gather stats operation that again makes use of all available CPU resources by using a DEGREE = CPU_COUNT divided by number_of_threads. If you don't want that you will have to specify the DEGREE / DOP explicitly, too- I haven't spend time to investigate how this behaves with regards to incremental statistics - since it's possible that GLOBAL and (sub-)partition statistics of the same object get gathered in different jobs, potentially at the same time and in random order (so GLOBAL prior to partition for example) the outcome and behaviour with incremental statistics turned on could be a problemMore details can be found in the comments section of the code, in particular what privileges might be required and how you could replace the default statistics job if desired.The script provided includes a de-installation and installation part of the code. All that needs to be called then to start a database-wide gather statistics processing is the main entry point "pk_stats_concurrent.stats_concurrent" using any optional parameters as desired - consider in particular the parameters to control the number of threads and the intra-operation DOP as just outlined. See the code comments for a detailed description of the available parameters.


--------------------------------------------------------------------------------
--
-- Script: pk_stats_concurrent.sql
--
-- Author: Randolf Geist
--
-- Copyright: http://oracle-randolf.blogspot.com
--
-- Purpose: A queue based concurrent stats implementation - installation script
--
-- Usage: @pk_stats_concurrent
--
-- The script will first drop all objects (can be skipped)
--
-- And then attempt to create the objects required (can be skipped)
--
--------------------------------------------------------------------------------

spool pk_stats_concurrent.log

prompt Concurrent Stats - Deinstallation
prompt *----------------------------------------------------*
prompt This script will now attempt to drop the objects
prompt belonging to this installation
accept skip_deinstall prompt 'Hit Enter to continue, CTRL+C to cancel or enter S to skip deinstall: '

set serveroutput on

declare
procedure exec_ignore_fail(p_sql in varchar2)
as
begin
execute immediate p_sql;
exception
when others then
dbms_output.put_line('Error executing: ' || p_sql);
dbms_output.put_line('Error message: ' || SQLERRM);
end;
begin
if upper('&skip_deinstall') = 'S' then
null;
else
exec_ignore_fail('begin pk_stats_concurrent.teardown_aq; end;');
exec_ignore_fail('drop table stats_concurrent_log');
exec_ignore_fail('drop type stats_concurrent_info force');
exec_ignore_fail('drop package pk_stats_concurrent');
exec_ignore_fail('begin dbms_scheduler.drop_job_class(''CONC_STATS''); end;');
end if;
end;
/

prompt Concurrent Stats - Installation
prompt *----------------------------------------------------*
prompt This script will now attempt to create the objects
prompt belonging to this installation
PAUSE Hit CTRL+C to cancel, ENTER to continue...

/**
* The log table for minimum logging of the concurrent execution threads
* Since we cannot access the DBMS_OUTPUT of these separate processes
* This needs to be cleaned up manually if / when required
**/
create table stats_concurrent_log (log_timestamp timestamp, sql_statement clob, message clob);

/**
* The single object type used as payload in the AQ queue for concurrent execution
* Each message will have a description of the index plus the actual DDL text as payload
**/
create or replace type stats_concurrent_info as object
(
ownname varchar2(30)
, tabname varchar2(30)
, partname varchar2(30)
, degree number
, granularity varchar2(30)
);
/

show errors

create or replace package pk_stats_concurrent authid current_user
as

------------------------------------------------------------------------------
-- $Id$
------------------------------------------------------------------------------

/**
* PK_STATS_CONCURRENT.SQL
*
* Created By : Randolf Geist (http://oracle-randolf.blogspot.com)
* Creation Date : 31-OCT-2016
* Last Update : 06-DEC-2016
* Authors : Randolf Geist (RG)
*
* History :
*
* When | Who | What
* ----------------------------------
* 31-OCT-2016 | RG | Created
* 06-DEC-2016 | RG | This header comment updated
*
* Description :
*
* This is a simple prototype implementation for the given task of gathering database stats
* concurrently, in case you are not satisfied with the built-in concurrent stats option available since 11.2.0.x
*
* In 11.2, the CONCURRENT stats option creates as many jobs as there are objects to gather
* And the JOB_QUEUE_PROCESSES parameter then controls the number of concurrent jobs running
* Since the ordering of execution isn't really optimized, many of these concurrent jobs might attempt to gather stats on the same object in case it is (sub)partitioned
* This can lead to significant contention on Library Cache level (due to exclusive Library Cache Locks required by DDL / DBMS_STATS)
*
* In 12.1 the CONCURRENT stats option was obviously completed rewritten and uses some more intelligent processing
* by calculating if and yes how many jobs should run concurrently for what kind of objects (see for example the new DBA_OPTSTAT_OPERATION_TASKS view that exposes some of these details)
* Still I've observed many occasions with this new implementation where lots of objects were deliberately gathered in the main session
* one after the other which doesn't really make good use of available resources in case many objects need to be analyzed
*
* This implementation tries to work around these points by using a simple queue-based approach for true concurrent stats processing
* combined with an attempt to distribute the tables to analyze across the different threads in a way that minimizes the contention on Library Cache level
*
* It needs to be installed / executed under a suitable account that has the privileges to create queues, types, packages, tables, jobs and job classes and gather stats on the whole database
*
* A sample user profile could look like this:

create user conc_stats identified by conc_stats;

grant create session, create table, create procedure, create type, create job, manage scheduler, analyze any, analyze any dictionary to conc_stats;

grant execute on sys.dbms_aq to conc_stats;

grant execute on sys.dbms_aqadm to conc_stats;

grant select on sys.v_$parameter to conc_stats;

alter user conc_stats default tablespace users;

alter user conc_stats quota unlimited on users;

* Parameters to be checked, depending on concurrency desired:
*
* job_queue_processes: Needs to be set high enough to accommodate for the concurrent stats threads spawned. By default this package spawns CPU_COUNT concurrent threads
* parallel_max_servers: If a stats thread is supposed to use Parallel Execution (degree > 1) for gathering stats you'll need at least threads * degree Parallel Slaves configured
* services: It's possible to specify a service to have the stats threads only executed on RAC nodes that run that service
*
* The jobs are created under the same job class, currently hard coded value CONC_STATS - this makes the handling easier in case you want to stop / drop the jobs submitted manually
*
* The job class name can be passed to calls to DBMS_SCHEDULER.DROP_JOB or STOP_JOB - remember that job classes are owned by SYS, so you have to specify SYS.CONC_STATS for the job class name used here
*
* The main entry point STATS_CONCURRENT is all you need to call to start concurrent stats gathering on the database
* similar to GATHER_DATABASE_STATS using one of the options GATHER, GATHER STALE or GATHER AUTO (default) - here you have to use LIST EMPTY, LIST STALE or LIST AUTO (default)
*
* The default behaviour when not specifying any parameters is to start as many threads as there are CPUs by using the CPU_COUNT parameter
* If you want this to be multiplied by the number of instances in a RAC cluster uncomment the CLUSTER_DATABASE_INSTANCES reference below in the code (assuming same CPU_COUNT on all nodes)
*
* This also means that the "intra" parallelism per gather_table_stats call will be one in such a case since the intra parallelism is calculated by default as CPU_COUNT / number of threads
*
* If you don't want to have that many threads / PX slaves started, specify the number of concurrent threads and an intra-operation DOP explicitly when calling STATS_CONCURRENT
*
* If you want to replace the default nightly stats job with this here, the following steps should achieve this:

BEGIN DBMS_AUTO_TASK_ADMIN.disable(
client_name => 'auto optimizer stats collection',
operation => NULL,
window_name => NULL);
END;

BEGIN DBMS_SCHEDULER.CREATE_JOB(
job_name => '',
schedule_name => 'MAINTENANCE_WINDOW_GROUP',
job_type => 'PLSQL_BLOCK',
job_action => 'begin pk_stats_concurrent.stats_concurrent(); end;',
comments => 'auto optimizer stats collection replacement using concurrent stats operations based on AQ'
enabled => true);
END;

* Please ensure the job is submitted under the account it's supposed to be run - using a different account like SYS to submit the job for a different schema
* seems to cause problems with privileges (insufficient privileges error messages), at least this was reported to me
*
* The code at present only processes objects of type TABLE returned by GATHER_DATABASE_STATS but not indexes
* assuming that these should be covered by the CASCADE functionality
*
* Note: This script is a prototype and comes with NO warranty. Use at your own risk and test/adjust in your environment as necessary
* before using it in any production-like case
*
* @headcom
**/

subtype oracle_object is varchar2(30);

/**
* Let the procedure stats_concurrent decide itself which degree to use.
* At present this means simply to spawn as many child threads as defined by the CPU_COUNT parameter
**/
G_AUTO_PARALLEL_DEGREE constant integer := null;

/**
* The main entry point to gather statistics via parallel threads / AQ
* @param p_parallel_degree The number of threads to start G_AUTO_PARALLEL_DEGREE means use the CPU_COUNT parameter to determine number of threads automatically
* @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently (Default DOP = CPU_COUNT / number of threads)
* @param p_service Specify a service if you want the jobs to be assigned to that particular service, default NULL
* @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
* @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
for example forcing a particular parallel degree
**/
procedure stats_concurrent(
p_parallel_degree in integer default G_AUTO_PARALLEL_DEGREE
, p_intra_degree in integer default null
, p_service in varchar2 default null
, p_gather_option in varchar2 default 'LIST AUTO'
, p_optional_init in varchar2 default null
);

/**
* Setup the AQ infrastructure (Queue tables, Queues)
**/
procedure setup_aq;

/**
* Teardown the AQ infrastructure (Queue tables, Queues)
**/
procedure teardown_aq;

/**
* Helper function to populate the AQ queue with data to process
* @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
**/
function list_stale_database_stats (
p_gather_option in varchar2 default 'LIST AUTO'
)
return dbms_stats.objecttab pipelined;

/**
* Populate the AQ queue with data to process
* @param p_parallel_degree The number threads to use - will be used for proper data preparation / queueing order
* @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently
* @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
**/
procedure populate_queue(
p_parallel_degree in integer
, p_intra_degree in integer default null
, p_gather_option in varchar2 default 'LIST AUTO'
);

/**
* This gets called for every stats thread
* It pulls the object to gather from the AQ queue
* @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
for example forcing a particular parallel degree
**/
procedure stats_thread(
p_optional_init in varchar2 default null
);

end pk_stats_concurrent;
/

show errors

create or replace package body pk_stats_concurrent
as
------------------------------------------------------------------------------
-- $Id$
------------------------------------------------------------------------------

/**
* PK_STATS_CONCURRENT.SQL
*
* Created By : Randolf Geist (http://oracle-randolf.blogspot.com)
* Creation Date : 31-OCT-2016
* Last Update : 06-DEC-2016
* Authors : Randolf Geist (RG)
*
* History :
*
* When | Who | What
* ----------------------------------
* 31-OCT-2016 | RG | Created
* 06-DEC-2016 | RG | This header comment updated
*
* Description :
*
* This is a simple prototype implementation for the given task of gathering database stats
* concurrently, in case you are not satisfied with the built-in concurrent stats option available since 11.2.0.x
*
* In 11.2, the CONCURRENT stats option creates as many jobs as there are objects to gather
* And the JOB_QUEUE_PROCESSES parameter then controls the number of concurrent jobs running
* Since the ordering of execution isn't really optimized, many of these concurrent jobs might attempt to gather stats on the same object in case it is (sub)partitioned
* This can lead to significant contention on Library Cache level (due to exclusive Library Cache Locks required by DDL / DBMS_STATS)
*
* In 12.1 the CONCURRENT stats option was obviously completed rewritten and uses some more intelligent processing
* by calculating if and yes how many jobs should run concurrently for what kind of objects (see for example the new DBA_OPTSTAT_OPERATION_TASKS view that exposes some of these details)
* Still I've observed many occasions with this new implementation where lots of objects were deliberately gathered in the main session
* one after the other which doesn't really make good use of available resources in case many objects need to be analyzed
*
* This implementation tries to work around these points by using a simple queue-based approach for true concurrent stats processing
* combined with an attempt to distribute the tables to analyze across the different threads in a way that minimizes the contention on Library Cache level
*
* It needs to be installed / executed under a suitable account that has the privileges to create queues, types, packages, tables, jobs and job classes and gather stats on the whole database
*
* A sample user profile could look like this:

create user conc_stats identified by conc_stats;

grant create session, create table, create procedure, create type, create job, manage scheduler, analyze any, analyze any dictionary to conc_stats;

grant execute on sys.dbms_aq to conc_stats;

grant execute on sys.dbms_aqadm to conc_stats;

grant select on sys.v_$parameter to conc_stats;

alter user conc_stats default tablespace users;

alter user conc_stats quota unlimited on users;

* Parameters to be checked, depending on concurrency desired:
*
* job_queue_processes: Needs to be set high enough to accommodate for the concurrent stats threads spawned. By default this package spawns CPU_COUNT concurrent threads
* parallel_max_servers: If a stats thread is supposed to use Parallel Execution (degree > 1) for gathering stats you'll need at least threads * degree Parallel Slaves configured
* services: It's possible to specify a service to have the stats threads only executed on RAC nodes that run that service
*
* The jobs are created under the same job class, currently hard coded value CONC_STATS - this makes the handling easier in case you want to stop / drop the jobs submitted manually
*
* The job class name can be passed to calls to DBMS_SCHEDULER.DROP_JOB or STOP_JOB - remember that job classes are owned by SYS, so you have to specify SYS.CONC_STATS for the job class name used here
*
* The main entry point STATS_CONCURRENT is all you need to call to start concurrent stats gathering on the database
* similar to GATHER_DATABASE_STATS using one of the options GATHER, GATHER STALE or GATHER AUTO (default) - here you have to use LIST EMPTY, LIST STALE or LIST AUTO (default)
*
* The default behaviour when not specifying any parameters is to start as many threads as there are CPUs by using the CPU_COUNT parameter
* If you want this to be multiplied by the number of instances in a RAC cluster uncomment the CLUSTER_DATABASE_INSTANCES reference below in the code (assuming same CPU_COUNT on all nodes)
*
* This also means that the "intra" parallelism per gather_table_stats call will be one in such a case since the intra parallelism is calculated by default as CPU_COUNT / number of threads
*
* If you don't want to have that many threads / PX slaves started, specify the number of concurrent threads and an intra-operation DOP explicitly when calling STATS_CONCURRENT
*
* If you want to replace the default nightly stats job with this here, the following steps should achieve this:

BEGIN DBMS_AUTO_TASK_ADMIN.disable(
client_name => 'auto optimizer stats collection',
operation => NULL,
window_name => NULL);
END;

BEGIN DBMS_SCHEDULER.CREATE_JOB(
job_name => '',
schedule_name => 'MAINTENANCE_WINDOW_GROUP',
job_type => 'PLSQL_BLOCK',
job_action => 'begin pk_stats_concurrent.stats_concurrent(); end;',
comments => 'auto optimizer stats collection replacement using concurrent stats operations based on AQ'
enabled => true);
END;

* Please ensure the job is submitted under the account it's supposed to be run - using a different account like SYS to submit the job for a different schema
* seems to cause problems with privileges (insufficient privileges error messages), at least this was reported to me
*
* The code at present only processes objects of type TABLE returned by GATHER_DATABASE_STATS but not indexes
* assuming that these should be covered by the CASCADE functionality
*
* Note: This script is a prototype and comes with NO warranty. Use at your own risk and test/adjust in your environment as necessary
* before using it in any production-like case
*
* @headcom
**/

-- The queue name to use for AQ operations
G_QUEUE_NAME constant varchar2(24) := 'STATS_QUEUE';

/**
* Rudimentary logging required by the parallel threads since the
* serveroutput generated can not be accessed
* @param p_sql The SQL to log that raised the error
* @param p_error_msg The error message to log
**/
procedure log(
p_sql in clob
, p_msg in clob
)
as
-- We do this in an autonomous transaction since we want the logging
-- to be visible while any other main transactions might be still going on
pragma autonomous_transaction;
begin
insert into stats_concurrent_log(
log_timestamp
, sql_statement
, message
) values (
systimestamp
, p_sql
, p_msg
);
commit;
end log;

/**
* Execute a SQL statement potentially in a different schema (dummy implementation here).
* The string will be put to serveroutput before being executed
* @param p_owner The schema to execute
* @param p_sql The SQL to execute
* @param p_log_error Should an error be logged or not. Default is true
**/
procedure execute(
p_owner in oracle_object
, p_sql in clob
, p_log_error in boolean default true
)
as
begin
-- dbms_output.put_line('Owner: ' || p_owner || ' SQL: ' || substr(p_sql, 1, 4000));
$if dbms_db_version.ver_le_10 $then
declare
a_sql dbms_sql.varchar2a;
n_start_line number;
n_end_line number;
c integer;
n integer;
LF constant varchar2(10) := '
';
len_LF constant integer := length(LF);
begin
n_start_line := 1 - len_LF;
loop
n_end_line := instr(p_sql, LF, n_start_line + len_LF);
a_sql(a_sql.count + 1) := substr(p_sql, n_start_line + len_LF, case when n_end_line = 0 then length(p_sql) else n_end_line end - (n_start_line + len_LF) + len_LF);
-- dbms_output.put_line(a_sql.count || ':' || a_sql(a_sql.count));
exit when n_end_line = 0;
n_start_line := n_end_line;
end loop;
c := dbms_sql.open_cursor;
dbms_sql.parse(c, a_sql, 1, a_sql.count, false, dbms_sql.NATIVE);
n := dbms_sql.execute(c);
dbms_sql.close_cursor(c);
end;
$elsif dbms_db_version.ver_le_11 $then
execute immediate p_sql;
$else
execute immediate p_sql;
$end
exception
when others then
dbms_output.put_line('Error: ' || SQLERRM);
if p_log_error then
log(p_sql, SQLERRM);
end if;
raise;
end execute;

/**
* Execute a SQL statement potentially in a different schema (dummy implementation here).
* This one uses an autonomous transaction.
* The string will be put to serveroutput before being executed
* @param p_owner The schema to execute
* @param p_sql The SQL to execute
* @param p_log_error Should an error be logged or not. Default is true
**/
procedure execute_autonomous(
p_owner in oracle_object
, p_sql in clob
, p_log_error in boolean default true
)
as
pragma autonomous_transaction;
begin
execute(p_owner, p_sql, p_log_error);
end execute_autonomous;

/**
* Setup the AQ infrastructure (Queue tables, Queues)
**/
procedure setup_aq
as
begin
begin
execute(
null
, 'begin dbms_aqadm.create_queue_table(
queue_table => ''' || G_QUEUE_NAME || '''
, queue_payload_type => ''stats_concurrent_info''
); end;'
);
exception
when others then
dbms_output.put_line('Error creating Queue table: ' || SQLERRM);
raise;
end;

begin
execute(
null
, 'begin dbms_aqadm.create_queue(
queue_name => ''' || G_QUEUE_NAME || '''
, queue_table => ''' || G_QUEUE_NAME || '''
); end;'
);
exception
when others then
dbms_output.put_line('Error creating Queue: ' || SQLERRM);
raise;
end;

begin
execute(
null
, 'begin dbms_aqadm.start_queue(
queue_name => ''' || G_QUEUE_NAME || '''
); end;'
);
exception
when others then
dbms_output.put_line('Error starting Queue: ' || SQLERRM);
raise;
end;
end setup_aq;

/**
* Teardown the AQ infrastructure (Queue tables, Queues)
**/
procedure teardown_aq
as
begin
begin
execute(
null
, 'begin dbms_aqadm.stop_queue(
queue_name => ''' || G_QUEUE_NAME || '''
, wait => true
); end;'
, false
);
exception
when others then
dbms_output.put_line('Error stopping Queue: ' || SQLERRM);
-- raise;
end;

begin
execute(
null
, 'begin dbms_aqadm.drop_queue(
queue_name => ''' || G_QUEUE_NAME || '''
); end;'
, false
);
exception
when others then
dbms_output.put_line('Error dropping Queue: ' || SQLERRM);
-- raise;
end;

begin
execute(
null
, 'begin dbms_aqadm.drop_queue_table(
queue_table => ''' || G_QUEUE_NAME || '''
, force => true
); end;'
, false
);
exception
when others then
dbms_output.put_line('Error dropping Queue table: ' || SQLERRM);
-- raise;
end;

end teardown_aq;

/**
* Helper function to populate the AQ queue with data to process
* @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
**/
function list_stale_database_stats (
p_gather_option in varchar2 default 'LIST AUTO'
)
return dbms_stats.objecttab pipelined
as
pragma autonomous_transaction;
m_object_list dbms_stats.objecttab;
begin
if p_gather_option not in (
'LIST AUTO', 'LIST STALE','LIST EMPTY'
) then
null;
else
dbms_stats.gather_database_stats(
options => p_gather_option,
objlist => m_object_list
);
for i in 1..m_object_list.count loop
pipe row (m_object_list(i));
end loop;
end if;
return;
end list_stale_database_stats;

/**
* Populate the AQ queue with data to process
* @param p_parallel_degree The number threads to use - will be used for proper data preparation / queueing order
* @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently
* @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
**/
procedure populate_queue(
p_parallel_degree in integer
, p_intra_degree in integer default null
, p_gather_option in varchar2 default 'LIST AUTO'
)
as
enq_msgid raw(16);
payload stats_concurrent_info := stats_concurrent_info(null, null, null, null, null);
n_dop integer;
begin
-- By default determine what intra-operation DOP to use depending on how many concurrent stats threads are supposed to run
select nvl(p_intra_degree, ceil((select to_number(value) from v$parameter where name = 'cpu_count') / p_parallel_degree)) as dop
into n_dop
from dual;
-- Populate the queue and use some "intelligent" ordering attempting to minimize (library cache) contention on the objects
for rec in (
with
-- The baseline, all TABLE objects returned by GATHER_DATABASE_STATS LIST* call
a as (
select /*+ materialize */ rownum as rn, a.* from table(pk_stats_concurrent.list_stale_database_stats(p_gather_option)) a where objtype = 'TABLE'
),
-- Assign all table, partitions and subpartitions to p_parallel_degree buckets
concurrent_stats as (
select ntile(p_parallel_degree) over (order by rn) as new_order, a.* from a where partname is null
union all
select ntile(p_parallel_degree) over (order by rn) as new_order, a.* from a where partname is not null and subpartname is null
union all
select ntile(p_parallel_degree) over (order by rn) as new_order, a.* from a where partname is not null and subpartname is not null
),
-- Now assign a row number within each bucket
b as (
select c.*, row_number() over (partition by new_order order by rn) as new_rn from concurrent_stats c
)
-- And pick one from each bucket in turn for queuing order
select
ownname
, objname as tabname
, coalesce(subpartname, partname) as partname
, n_dop as degree
, case when partname is null then 'GLOBAL' when partname is not null and subpartname is null then 'PARTITION' else 'SUBPARTITION' end as granularity
from
b
order by
new_rn, new_order
) loop
payload.ownname := rec.ownname;
payload.tabname := rec.tabname;
payload.partname := rec.partname;
payload.degree := rec.degree;
payload.granularity := rec.granularity;
-- TODO: Enqueue via array using ENQUEUE_ARRAY
execute immediate '
declare
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
begin
dbms_aq.enqueue(
queue_name => ''' || G_QUEUE_NAME || ''',
enqueue_options => eopt,
message_properties => mprop,
payload => :payload,
msgid => :enq_msgid);
end;'
using payload, out enq_msgid;
end loop;
commit;
end populate_queue;

/**
* This gets called for every stats thread
* It pulls the object to gather from the AQ queue
* @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
for example forcing a particular parallel degree
**/
procedure stats_thread(
p_optional_init in varchar2 default null
)
as
deq_msgid RAW(16);
payload stats_concurrent_info;
no_messages exception;
pragma exception_init(no_messages, -25228);
s_sql clob;
begin
if p_optional_init is not null then
execute(null, p_optional_init);
end if;

-- If the VISIBILITY is set to IMMEDIATE
-- it will cause the "queue transaction" to be committed
-- Which means that the STOP_QUEUE call with the WAIT option will
-- be able to stop the queue while the processing takes place
-- and the queue table can be monitored for progress
loop
begin
execute immediate '
declare
dopt dbms_aq.dequeue_options_t;
mprop dbms_aq.message_properties_t;
begin
dopt.visibility := dbms_aq.IMMEDIATE;
dopt.wait := dbms_aq.NO_WAIT;
dbms_aq.dequeue(
queue_name => ''' || G_QUEUE_NAME || ''',
dequeue_options => dopt,
message_properties => mprop,
payload => :payload,
msgid => :deq_msgid);
end;'
using out payload, out deq_msgid;
s_sql := '
begin
dbms_stats.gather_table_stats(
ownname => ''' || payload.ownname || '''
, tabname => ''' || payload.tabname || '''
, partname => ''' || payload.partname || '''
, degree => ' || payload.degree || '
, granularity => ''' || payload.granularity || '''
);
end;
';
-- Execute the command
log(s_sql, 'Ownname: ' || payload.ownname || ' Tabname: ' || payload.tabname || ' Partname: ' || payload.partname || ' Degree: ' || payload.degree || ' Granularity: ' || payload.granularity);
begin
execute_autonomous(payload.ownname, s_sql);
exception
/*
when object_already_exists then
null;
when object_does_not_exist then
null;
*/
when others then
null;
end;
exception
when no_messages then
exit;
end;
end loop;
commit;
end stats_thread;

/**
* The main entry point to gather statistics via parallel threads / AQ
* @param p_parallel_degree The number of threads to start G_AUTO_PARALLEL_DEGREE means use the CPU_COUNT (but not
CLUSTER_DATABASE_INSTANCES parameter, commented out below) to determine number of threads automatically
* @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently
* @param p_service Specify a service if you want the jobs to be assigned to that particular service, default NULL
* @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
* @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
for example forcing a particular parallel degree
**/
procedure stats_concurrent(
p_parallel_degree in integer default G_AUTO_PARALLEL_DEGREE
, p_intra_degree in integer default null
, p_service in varchar2 default null
, p_gather_option in varchar2 default 'LIST AUTO'
, p_optional_init in varchar2 default null
)
as
n_cpu_count binary_integer;
n_instance_count binary_integer;
n_thread_count binary_integer;
strval varchar2(256);
partyp binary_integer;
e_job_class_exists exception;
pragma exception_init(e_job_class_exists, -27477);
s_job_class constant varchar2(30) := 'CONC_STATS';
begin
-- Truncate the log table
execute immediate 'truncate table stats_concurrent_log';
-- Just in case something has been left over from a previous run
teardown_aq;
setup_aq;
-- Populate the queue
populate_queue(p_parallel_degree, p_intra_degree, p_gather_option);
-- Determine auto degree of parallelism
partyp := dbms_utility.get_parameter_value('cpu_count', n_cpu_count, strval);
partyp := dbms_utility.get_parameter_value('cluster_database_instances', n_instance_count, strval);
n_thread_count := nvl(p_parallel_degree, n_cpu_count/* * n_instance_count*/);
-- Create/use a common job class, makes job handling easier and allows binding to a specific service
begin
dbms_scheduler.create_job_class(s_job_class);
exception
when e_job_class_exists then
null;
end;
-- Assign jobs to a particular service if requested
if p_service is null then
dbms_scheduler.set_attribute_null('SYS.' || s_job_class, 'SERVICE');
else
dbms_scheduler.set_attribute('SYS.' || s_job_class, 'SERVICE', p_service);
end if;
-- Submit the jobs
for i in 1..n_thread_count loop
dbms_scheduler.create_job(
job_name => s_job_class || '_' || i
, job_type => 'PLSQL_BLOCK'
, job_class => s_job_class
, enabled => true
, job_action => 'begin dbms_session.set_role(''ALL''); pk_stats_concurrent.stats_thread(''' || p_optional_init || '''); end;'
);
end loop;
-- Just in case anyone wants to use DBMS_JOB instead we need to commit the DBMS_JOB.SUBMIT
commit;
--execute immediate 'begin dbms_lock.sleep(:p_sleep_seconds); end;' using p_sleep_seconds;
--teardown_aq;
end stats_concurrent;
end pk_stats_concurrent;
/

show errors

spool off