-- ss_upsert.sql
--
-- PURPOSE
--	A set of execsql scripts to check data in a staging table, or
--	a set of staging tables, and then update and insert rows of a base table
--	or base tables from the staging table(s) of the same name.
--
--	This script contains code specific to Microsoft SQL Server. It was developed 
--	and tested using SQL Server 2017 Developer, and also tested against SQL Server 2016.
--
-- HOW TO USE THESE SCRIPTS
--	In the following steps, "call" means to use an execsql "execute script"
--	metacommand.  Script names are displayed in uppercase to distinguish
--	them below, but execsql is not case-sensitive.
--
--	The simplest usage is:
--		1. Call STAGED_TO_LOAD to create and initially populate a table
--			with the names of staging tables to be loaded, and initial
--			control variables used by other scripts.
--		2. Call LOAD_STAGING to perform QA checks for erroneous nulls,
--			duplicated primary keys, and missing foreign keys, and to 
--			load the data using update and insert statements if there were 
--			no QA errors.
--
--	The control table that is produced in step 1 above can be edited to
--	add a list of columns to exclude from the update, or to change the
--	defaults for display of changed data.  See the header notes for the
--	STAGED_TO_LOAD script.
--
--	The processes of performing QA checks and performing the upsert operations
--	can be further broken down into individual steps.  See Note #1 below
--	for the other scripts that can be used to carry out these steps.
--
-- NOTES
--	1. The scripts contained in this file that are intended to be called
--		directly by the user are:
--			STAGED_TO_LOAD	: Initialize a control table to load multiple tables.
--			LOAD_STAGING	: Perform all QA checks and load data from all staging tables.
--			QA_ALL			: Perform null and foreign key checks on all staging tables.
--			UPSERT_ALL		: Load data from all staging tables.
--			NULLQA_ONE		: Perform null column checks on one staging table.
--			PKQA_ONE		: Perform primary key check on one staging table.
--			FKQA_ONE		: Perform foreign key checks on one staging table.
--			UPSERT_ONE		: Load data from one staging table.
--			UPDTPK_ONE		: Perform PK updates for one table.
--			UPDTPKQA_ONE	: Perform QA checks related to PK updates, for one table.
--		This file contains other scripts that are intended to be used
--		only by one of the scripts listed above, and not called
--		directly by the user.
--	2. These scripts query the information schema to obtain
--		the information needed to perform the checks and changes.
--	3. These scripts were developed for Microsoft SQL Server; they were developed
--		and tested using SQL Server 2017 Developer Edition, and also tested using 
--		Microsoft SQL Server 2016 Professional; they will likely require
--		modification to run on other DBMSs, or with earlier versions of SQL Server.
--	4. These scripts take arguments that control their functions.  They
--		also use global variables pertinent to logging, if they are
--		defined.  The logging-control variables are global because
--		they may also be used by other code that uses these scripts,
--		and some of that code may be older and only recognize global
--		variables rather than script arguments; logging is intended
--		to be compatible with them.
--	5. The control table that is used to drive the loading of multiple
--		staging tables will be updated by the QA scripts with information
--		about any QA failures.  This information consists of a list of
--		the names of the columns or constraints that failed the check, 
--		with the number of failing rows in parentheses following the name.
--	6. The control table that is used to drive the loading of multiple
--		staging tables will be updated by the upsert operation with 
--		a count of the number of rows of the base table that are updated,
--		and a count of the number of rows that were inserted into the
--		base table.
--	7. All of these scripts assume that schema, table, and column
--		names need not be quoted.
--	8. These scripts create temporary tables.  All of these
--		have prefixes of "ups_".  Scripts that include this file
--		should not use this prefix to avoid possible conflicts.
--
-- COPYRIGHT AND LICENSE
--	Copyright (c) 2019, Elizabeth Shea
--	This program is free software: you can redistribute it and/or modify it
--	under the terms of the GNU General Public License as published by the
--	Free Software Foundation, either version 3 of the License, or (at your
--	option) any later version. This program is distributed in the hope that
--	it will be useful, but WITHOUT ANY WARRANTY; without even the implied
--	warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
--	GNU General Public License for more details. The GNU General Public
--	License is available at http://www.gnu.org/licenses/.

-- AUTHORS
--  Elizabeth Shea (ES)
--	Dreas Nielsen (RDN)
--
-- VERSION
--	2.0.0
--
-- HISTORY
--	 Date		 Remarks
--	-----------	 -----------------------------------------------------
--  2019-02-17	 Created. Began adapting from pg_upsert.sql (version 
--				 for PostgresSQL). ES.
--  2019-03-03	 Completed first draft. ES.
--	2019-03-07	Added primary key check and fixed console progress bar
--				for QA checks. ES.
--  2019-03-08	Updated header. ES.
--	2019-03-09	Minor updates and corrections to address RDN comments. 
--				ES.
--	2019-03-13	Added definition and execution of validation scripts
--				to validate base and staging schemas and tables. ES.
--	2019-03-14	Streamlined execution of validation scripts. ES. 
--	2019-05-01	Replaced one instance of 'trim()' with 'rtrim(ltrim())'.  RDN.
--  2019-05-08	Added notes on primary key update script to header
--				and added script UPDATEPK_ONE. 
--				Made edit to selection of #ups_cols in script UPSERT_ONE
--				to prevent an error if the staging table contains any
--				columns that are not in the base table (e.g., "new" cols
--				for PK updates). ES. 
--	2019-05-10	Changed name of PK update script; added header notes for
--				PK update QA script. Changed table_name length in 
--				control table from 8000 to 1700, max key length for a 
--				nonclustered index. ES.
--	2019-05-14	Added script UPDTPKQA_ONE and some (not all) QA checks. ES.
--  2019-05-21	Completed first draft of UPDTPKQA_ONE. ES.
--  2019-05-22	Rearranged, organized, and standardized variable names 
--				in UPDTPKQA_ONE. ES.
--	2019-05-29	Corrections to comments. ES.
--	2019-06-03	Correction to logging in UPDTPKQA_ONE_INNERLOOP. ES.
-- ==================================================================

-- ################################################################
--			Script TEMPTABLE_ISVALID
-- ===============================================================
--
-- Utility script to validate SQL Server temp table name. 
-- Confirms that the name passed by the user is prefixed with a '#',
-- and raises an error if not
--
-- Input parameters:
--		temptable_name	: The name of the temporary table to be created.
--
-- Columns in the table created:
--		temptbl_name	 : The name of the temp table to be created by
--						  the calling script.
--
-- 
-- Example:
--		-- !x! execute script temp_table_isvalid with (temptable_name=#stagingtables)
-- ===============================================================
-- !x! BEGIN SCRIPT TEMPTABLE_ISVALID with parameters (temptable_name)

-- Take the user-provided temp_table argument and validate syntax
if object_id('tempdb..#tmptbl_name') is not null drop table #tmptbl_name;
select
	case when left('!!#temptable_name!!',1) = '#' 
		then 1
		else 0
		end as temptbl_name
into #tmptbl_name
;
-- !x! subdata ~valid_tmptable #tmptbl_name
-- !x! if(is_zero(!!~valid_tmptable!!))
	-- !x! sub ~message Error in SQL Server temporary table name assignment, script !!$CURRENT_SCRIPT!!, line !!$SCRIPT_LINE!!.
	-- !x! sub_append ~message The name "!!#temptable_name!!" is not a valid SQL Server temporary table name.
	-- !x! sub_append ~message Temporary table names must include a '#' prefix (e.g., '#mytable')
	-- !x! halt message "!!~message!!"
-- !x! endif

-- !x! END SCRIPT
-- ##################  End of TEMPTABLE_ISVALID  #####################
-- ################################################################


-- ################################################################
--			Script STRING_AGG
-- ===============================================================
--
-- Utility script to perform string aggregation
-- Beginning with 2017 edition, SQL Server *does* include the string_agg() 
-- function, but earlier versions do not.
--
-- Input parameters:
--		table_name	: The name of the table containing the column to be aggregated.
--						This may be a temp table containing pre-processed strings.
--		string_col  : The name of the column to be aggregated.
--		order_col   : The name of the column to order by
--      delimiter   : Character to be used as a delimiter
--		string_var  : Name of variable to which aggregated string will be assigned
--
--	Output parameters:
--		string_var	: The name of the variable to receive the aggregated string.
--
-- 
-- Example:
--		-- !x! execute script string_agg with (table_name=#nonnulcols, string_col=null_errors, order_col=null_errors, delimiter=", ", string_var=+nullcols)
-- ===============================================================
-- !x! BEGIN SCRIPT STRING_AGG with parameters (table_name, string_col, order_col, delimiter, string_var)

if object_id('tempdb..#agg_string') is not null drop table #agg_string;
with enum as 
	(
	select
		cast(!!#string_col!! as varchar(max)) as agg_string,
		row_number() over (order by !!#order_col!!) as row_num
	from
		!!#table_name!!
	),
agg as 
	(
	select
		one.agg_string,
		one.row_num
	from
		enum as one
	where
		one.row_num=1
	UNION ALL
	select
		agg.agg_string + '!!#delimiter!!' + enum.agg_string as agg_string,
		enum.row_num
	from 
		agg, enum
	where
		enum.row_num=agg.row_num+1
	)
select
agg_string 
into #agg_string
from agg
where row_num=(select max(row_num) from agg);
-- !x! if(hasrows(#agg_string))
	-- !x! subdata !!#string_var!! #agg_string
-- !x! endif

-- !x! END SCRIPT
-- ##################  End of STRING_AGG  #####################
-- ################################################################


-- ################################################################
--			Script VALIDATE_SCHEMAS
-- ===============================================================
--
-- Utility script to validate base and staging schema
--
-- Required input arguments:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--
--	Required output arguments:
--		error_list		: The name of the variable to receive a comma-
--							delimited list of the names of invalid
--							schema names.
-- ===============================================================
-- !x! BEGIN SCRIPT VALIDATE_SCHEMAS with parameters (base_schema, staging, error_list)

if object_id('tempdb..#ups_ctrl_invl_schema') is not null drop table #ups_ctrl_invl_schema;
select
	schemas.schema_name,
	schemas.schema_type,
	schemas.schema_name + ' (' + schema_type + ')' as schema_string
into #ups_ctrl_invl_schema
from
	(
	select	
		'!!#base_schema!!' as schema_name,
		'base' as schema_type
	union
	select
		
		'!!#staging!!' as schema_name,
		'staging' as schema_type
	) as schemas 
	left join information_schema.schemata as iss on schemas.schema_name=iss.schema_name
where	
	iss.schema_name is null
;

-- !x! if(hasrows(#ups_ctrl_invl_schema))
	-- !x! execute script string_agg with (table_name=#ups_ctrl_invl_schema, string_col=schema_string, order_col=schema_type, delimiter='; ', string_var=!!#error_list!!)
-- !x! endif
		
-- !x! END SCRIPT
-- ####################  End of VALIDATE_SCHEMAS  #################
-- ################################################################


-- ################################################################
--			Script VALIDATE_ONE
-- ===============================================================
--
-- Utility script to validate one table in both base and staging schema
-- Halts script processing if any either of the schemas are non-existent,
--  or if either of the tables are not present within those schemas.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		table_name  	: The name of the table.
--		script          : The name of the script in which the
--							schemas and table were referenced 
--							(for error reporting).
--		script_line		: The script line in which they were referenced
--							(for error reporting).
-- ===============================================================
-- !x! BEGIN SCRIPT VALIDATE_ONE with parameters (base_schema, staging, table, script, script_line)

-- Initialize the strings used to compile error information
-- !x! sub_empty ~err_info
-- !x! sub_empty ~error_list


-- Validate schemas
-- !x! execute script validate_schemas with args (base_schema=!!#base_schema!!, staging=!!#staging!!, error_list=+err_info)


-- If no schemas are invalid, check tables
-- !x! if(is_null("!!~err_info!!"))
	if object_id('tempdb..#ups_invl_table') is not null drop table #ups_invl_table;
	select
		tt.schema_name,
		tt.schema_type,
		tt.schema_name + '.' + tt.table_name + ' (' + tt.schema_type + ')' as schema_table
	into #ups_invl_table
	from
		(
		select	
			'!!#base_schema!!' as schema_name,
			'base' as schema_type,
			'!!#table!!' as table_name
		union
		select
			
			'!!#staging!!' as schema_name,
			'staging' as schema_type,
			'!!#table!!' as table_name
		) as tt
		left join information_schema.tables as iss on tt.schema_name=iss.table_schema and tt.table_name=iss.table_name
	where	
		iss.table_name is null
	;

	-- !x! if(hasrows(#ups_invl_table))
		-- !x! execute script string_agg with (table_name=#ups_invl_table, string_col=schema_table, order_col=schema_table, delimiter='; ', string_var=+err_info)
		-- !x! sub ~error_list Non-existent table: !!~err_info!!	
	-- !x! endif
	
-- !x! else
	-- !x! sub ~error_list Non-existent schema(s): !!~err_info!!
-- !x! endif


-- Halt script if any schemas or tables were found to be invalid
-- !x! if(not is_null("!!~error_list!!"))

	-- !x! sub ~error_message ERROR - INVALID OBJECT IN SCRIPT ARGUMENT
	
	-- !x! if(sub_defined(log_changes))
	-- !x! andif(is_true(!!log_changes!!))
		-- !x! write "==================================================================" to !!logfile!!
		-- !x! write "!!$current_time!! -- !!~error_message!!" to !!logfile!!
		-- !x! write "Script: !!#script!!; Line: !!#script_line!!" to !!logfile!!
		-- !x! write "!!~error_list!!" to !!logfile!!
	-- !x! endif
	
	-- !x! sub_append ~error_message Script: !!#script!!; Line: !!#script_line!!
	-- !x! sub_append ~error_message !!~error_list!!
	-- !x! halt message "!!~error_message!!"
	
-- !x! endif


-- !x! END SCRIPT
-- ####################  End of VALIDATE_ONE  #####################
-- ################################################################



-- ################################################################
--			Script VALIDATE_CONTROL
-- ===============================================================
--
-- Utility script to validate contents of control table against about
--	base and staging schema
--
-- Required input arguments:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		control_table  	: The name of a temporary table as created by the
--							script STAGED_TO_LOAD.
--		script          : The name of the script in which the
--							schemas and control table were referenced 
--							(for error reporting).
--		script_line		: The script line in which they were referenced
--							(for error reporting).
-- ===============================================================
-- !x! BEGIN SCRIPT VALIDATE_CONTROL with parameters (base_schema, staging, control_table, script, script_line)

-- Initialize the strings used to compile error information
-- !x! sub_empty ~err_info
-- !x! sub_empty ~error_list

-- !x! execute script validate_schemas with args (base_schema=!!#base_schema!!, staging=!!#staging!!, error_list=+err_info)


-- If no schemas are invalid, check tables
-- !x! if(is_null("!!~err_info!!"))
	if object_id('tempdb..#ups_validate_control') is not null drop table #ups_validate_control;
	select
		'!!#base_schema!!' as base_schema,
		'!!#staging!!' as staging_schema,
		table_name, 
		cast(0 as bit) as base_exists,
		cast(0 as bit) as staging_exists
	into #ups_validate_control
	from !!#control_table!!
	;

	-- Update the control table
	update vc
		set base_exists = cast(case when bt.table_name is null then 0 else 1 end as bit),
			staging_exists = cast(case when st.table_name is null then 0 else 1 end as bit)
	from #ups_validate_control as vc
			left join information_schema.tables as bt on vc.base_schema=bt.table_schema and vc.table_name=bt.table_name
				and bt.table_type='BASE TABLE'
			left join information_schema.tables as st on vc.staging_schema=st.table_schema and vc.table_name=st.table_name
				and st.table_type='BASE TABLE'
	;

	if object_id('tempdb..#ups_ctrl_invl_table') is not null drop table #ups_ctrl_invl_table;
		select
			schema_table
	into #ups_ctrl_invl_table
	from
			(
			select base_schema + '.' + table_name as schema_table
			from #ups_validate_control
			where not base_exists=1
			union
			select staging_schema + '.' + table_name as schema_table
			from #ups_validate_control
			where not staging_exists=1	
			) as it
	;

	-- Any table is invalid
	-- !x! if(hasrows(#ups_ctrl_invl_table))
		-- !x! execute script string_agg with (table_name=#ups_ctrl_invl_table, string_col=schema_table, order_col=schema_table, delimiter='; ', string_var=+err_info)
		-- !x! sub ~error_list Non-existent table(s): !!~err_info!!	
	-- !x! endif
	
-- !x! else
	-- !x! sub ~error_list Non-existent schema(s): !!~err_info!!
-- !x! endif


-- Halt script if any invalid objects found in control table
-- !x! if(not is_null("!!~error_list!!"))

	-- !x! sub ~error_message ERROR - INVALID OBJECTS IN CONTROL TABLE
	
	-- !x! if(sub_defined(log_changes))
	-- !x! andif(is_true(!!log_changes!!))
		-- !x! write "==================================================================" to !!logfile!!
		-- !x! write "!!$current_time!! -- !!~error_message!!" to !!logfile!!
		-- !x! write "Script: !!#script!!; Line: !!#script_line!!" to !!logfile!!
		-- !x! write "!!~error_list!!" to !!logfile!!
	-- !x! endif
	
	-- !x! sub_append ~error_message Script: !!#script!!; Line: !!#script_line!!
	-- !x! sub_append ~error_message !!~error_list!!
	-- !x! halt message "!!~error_message!!"
	
-- !x! endif
		

-- !x! END SCRIPT
-- ####################  End of VALIDATE_CONTROL  #################
-- ################################################################


-- ################################################################
--			Script STAGED_TO_LOAD
-- ===============================================================
--
-- Creates a table having the structure that is used to drive other
-- scripts that perform QA checks and the upsert operation on multiple
-- staging tables.
--
-- Input parameters:
--		control_table	: The name of the temporary table to be created.
--							In SQL Server, this *must* include a '#'
--							prefix. A validation step will raise 
--							an error if an invalid temp table name is passed.
--		table_list		: A string of comma-separated table names,
--							identifying the staging tables to be
--							checked or loaded.
--
-- Columns in the table created:
--		table_name		: The name of a base table that has a
--							corresponding table in a staging schema
--							containing data to be used to modify
--							the base table.
--		exclude_cols	: Contains a comma-separated list of columns
--							in the base table that are not to be
--							updated from the staging table.  This is
--							uninitialized.
--		exclude_null_checks :
--						  Contains a comma-separated list of single-quoted
--							column names identifying column in the
--							staging table for which null checks
--							should not be performed.  This is
--							uninitialized.
--		display_changes	: A Boolean indicating whether the 'upsert'
--							operation should display the changes to
--							be made to the base table.  A separate
--							GUI display is used for updates and inserts.
--							Initialized to True.
--		display_final	: A Boolean indicating whether the 'upsert'
--							operation should display the entire base
--							table after updates and inserts have been
--							made.  Initialized to False.
--		null_errors		: Will contain a comma-separated list of
--							columns that are non-nullable in the base
--							table but that have nulls in the staging
--							table.  Initialized to null; may be filled
--							by the QA routines.
--		pk_errors		: Will contain a count of the number of distinct
--							primary key values having duplicate rows,
--							followed by the total row count for the
--							duplicated keys. Initialized to null; may
--							be filled by the QA routines.  
--		fk_errors		: Will contain a comma-separated list of
--							foreign-key constraint names that are
--							not met by data in the staging table.
--							Initialized to null; may be filled by the
--							QA routines.
--		rows_updated	: Will contain a count of the rows updated
--							in the table by the upsert operation.
--		rows_inserted	: Will contain a count of the rows inserted
--							in the table by the upsert operation.
--
-- Example:
--		-- !x! execute script staged_to_load with (control_table=#stagingtables, table_list="tbla, tblb, tblc")
-- ===============================================================

-- !x! BEGIN SCRIPT STAGED_TO_LOAD with parameters (control_table, table_list)

-- Run script to validate that control table name is valid temp table name
-- !x! execute script temptable_isvalid with args (temptable_name=!!#control_table!!)


if object_id('tempdb..!!#control_table!!') is not null drop table !!#control_table!!;
create table !!#control_table!! (
	table_name varchar(1700) not null unique,
	exclude_cols varchar(8000),
	exclude_null_checks varchar(8000),
	display_changes varchar(3) not null default 'Yes',
	display_final varchar(3) not null default 'No',
	null_errors varchar(8000),
	pk_errors varchar(8000),
	fk_errors varchar(8000),
	rows_updated integer,
	rows_inserted integer,
	check (display_changes in ('Yes', 'No')),
	check (display_final in ('Yes', 'No'))
);


-- Recursive CTE to parse table list argument
-- NOTE: SQL Server 2017 includes the trim() function, but SQL Server 2016 does not,
-- so a combination of ltrim and rtrim is used here instead.
with itemtable as (
	select 
		case when charindex(',',  table_string) = 0
			then rtrim(ltrim(table_string))
			else rtrim(ltrim(substring(table_string, 1,charindex(',', table_string)-1)))
			end as table_name,
		case when charindex(',',  table_string) = 0
			then NULL
			else rtrim(ltrim(substring(table_string, charindex(',', table_string)+1, len(table_string))))
			end as remaining_list
	from
		(select '!!#table_list!!' as table_string) as ts
	UNION ALL
	select 
		case when charindex(',', remaining_list) = 0
			then rtrim(ltrim(remaining_list))
			else rtrim(ltrim(substring(remaining_list, 1,charindex(',', remaining_list)-1)))
			end as table_name,
		case when charindex(',',  remaining_list) = 0
			then NULL
			else rtrim(ltrim(substring(remaining_list, charindex(',', remaining_list)+1, len(remaining_list))))
			end as remaining_list
	from 
		itemtable
	where 
		remaining_list is not null
		--Guards against entries with trailing commas:
		-- e.g,  'table1, table2,'
		and rtrim(ltrim(remaining_list))<>''
	)
insert into !!#control_table!!
	(table_name)
select table_name as item
from itemtable;


-- !x! END SCRIPT
-- ##################  End of STAGED_TO_LOAD  #####################
-- ################################################################



-- ################################################################
--			Script LOAD_STAGING
-- ===============================================================
--
-- Performs QA checks for nulls in non-null columns, for duplicated
-- primary key values, and for invalid foreign keys in a set of staging
-- tables to be loaded into base tables.  If there are failures in the 
-- QA checks, loading is not attempted.  If the loading step is carried 
-- out, it is done within a transaction.
--
-- The "null_errors", "pk_errors", and "fk_errors" columns of the 
-- control table will be updated to identify any errors that occur,
-- so that this information is available to the caller.
--
-- The "rows_updated" and "rows_inserted" columns of the control table
-- will be updated with counts of the number of rows affected by the
-- upsert operation for each table.
--
-- When the upsert operation updates the base table, all columns of the
-- base table that are also in the staging table are updated.  The
-- update operation does not test to see if column contents are different,
-- and so does not update only those values that are different.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		control_table	: The name of a temporary table as created by the
--							script STAGED_TO_LOAD.
--		do_commit		: Whether or not the script should commit
--							the changes; should be 'Yes' or 'No'.
-- Global variables:
--		logfile			: The name of a log file to which error
--							messages will be written.  Optional.
--		log_sql			: A value of 'Yes' or 'No' to indicate whether
--							the SQL that is generated for each foreign
--							key check, and for each update and insert
--							statement, is written to the logfile.  Optional.
--		log_errors		: A value of 'Yes' or 'No' to indicate whether
--							foreign key errors are written to the logfile.
--							Optional.
--		log_changes		: A value of 'Yes' or 'No' indicating whether
--							the updated and inserted data should be
--							written to the logfile.  Optional.
--
-- Tables and views created or modified:
--		#ups_qa_fails			: temporary table
-- ===============================================================

-- !x! BEGIN SCRIPT LOAD_STAGING with parameters (base_schema, staging, control_table, do_commit)


-- Clear the columns of return values from the control table, in case this control
-- table has been used previously.
update !!#control_table!!
set
	null_errors = null,
	pk_errors = null,
	fk_errors = null,
	rows_updated = null,
	rows_inserted = null
	;
	
-- Run QA checks.
-- !x! execute script QA_ALL with arguments (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)
if object_id('tempdb..#ups_qa_fails') is not null drop table #ups_qa_fails;
select *
into #ups_qa_fails
from !!#control_table!!
where null_errors is not null or pk_errors is not null or fk_errors is not null;
-- !x! if(not hasrows(#ups_qa_fails))
	-- !x! sub ~preautocommit !!$autocommit_state!!
	-- !x! autocommit off
	-- Run the UPSERT operation.
	-- !x! execute script UPSERT_ALL with arguments (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)
	-- Commit the changes and then restore the previous autocommit state.
	-- !x! if(is_true(!!#do_commit!!))
		-- !x! autocommit on with commit
		-- !x! write "CHANGES COMMITTED."
		-- !x! if(sub_defined(log_changes))
		-- !x! andif(is_true(!!log_changes!!))
			-- !x! write "" to !!logfile!!
			-- !x! write "==================================================================" to !!logfile!!
			-- !x! write "!!$current_time!! -- CHANGES COMMITTED." to !!logfile!!
		-- !x! endif
	-- !x! else 
		-- !x! write "CHANGES NOT COMMITTED ('do_commit' argument = !!#do_commit!!)"
		-- !x! if(sub_defined(log_changes))
		-- !x! andif(is_true(!!log_changes!!))
			-- !x! write "" to !!logfile!!
			-- !x! write "==================================================================" to !!logfile!!
			-- !x! write "!!$current_time!! -- CHANGES NOT COMMITTED ('do_commit' argument = !!#do_commit!!)" to !!logfile!!
		-- !x! endif
	-- !x! endif
	-- !x! autocommit !!~preautocommit!!
-- !x! endif
	

-- !x! END SCRIPT
-- ###################  End of LOAD_STAGING  ######################
-- ################################################################



-- ################################################################
--			Script NULLQA_ONE
-- ===============================================================
--
-- Checks that non-nullable columns are fully populated in a
-- staging table that is an image of the base table.
-- Reports any non-conforming columns to the console and optionally
-- to a log file.
--
-- Required input arguments:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		table			: The table name--same for base and staging.
-- Optional input arguments:
--		exclude_null_checks : A comma-separated list of singly-quoted
--								column names to be excluded from null checks.
--
--	Required output arguments:
--		error_list		: The name of the variable to receive a comma-
--							delimited list of the names of non-null
--							columns that contain nulls; each column name
--							will be followed by the number of rows with
--							nulls, in parentheses.
--
-- Global variables:
--		logfile			: The name of a log file to which error
--							messages will be written.  Optional.
--
-- Tables and views created or modified:
--		#ups_nonnull_cols		: temporary table
--		#ups_next_column		: temporary table
--		#ups_null_error_list	: temporary table
--		#ups_qa_nonnull_col		: temporary table
-- ===============================================================

-- !x! BEGIN SCRIPT NULLQA_ONE with parameters (base_schema, staging, table, error_list)

-- Write an initial header to the logfile.
-- !x! if(sub_defined(logfile))
	-- !x! write "" to !!logfile!!
	-- !x! write "==================================================================" to !!logfile!!
	-- !x! write "!!$current_time!! -- Non-null QA checks on table !!#staging!!.!!#table!!" to !!logfile!!
-- !x! endif


-- !x! write "Conducting non-null QA checks on table !!#staging!!.!!#table!!"

-- Validate inputs: base/staging schemas and table
-- !x! execute script validate_one with args (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!#table!!, script=!!$CURRENT_SCRIPT!!, script_line=!!$SCRIPT_LINE!!)

-- Initialize the return value to empty (no null errors)
-- !x! sub_empty !!#error_list!!

-- Create a table listing the columns of the base table that must
-- be non-null and that do not have a default expression.
-- Include a column for the number of rows with nulls in the staging table.
-- Include a 'processed' column for loop control.
-- !x! if(sub_defined(#exclude_null_checks))
	-- !x! sub ~omitnull and column_name not in (!!#exclude_null_checks!!)
-- !x! else
	-- !x! sub_empty ~omitnull
-- !x! endif
if object_id('tempdb..#ups_nonnull_cols') is not null drop table #ups_nonnull_cols;
select
	column_name,
	cast(0 as integer) as null_rows,
	cast(0 as bit) as processed
into
	#ups_nonnull_cols
from
	information_schema.columns
where
	table_schema = '!!#base_schema!!'
	and table_name = '!!#table!!'
	and is_nullable = 'NO'
	and column_default is null
	!!~omitnull!!
;


-- Create a script to select one column to process.
-- !x! begin script ups_get_next_column
if object_id('tempdb..#ups_next_column') is not null drop table #ups_next_column;
select top 1 column_name
into #ups_next_column
from #ups_nonnull_cols
where not processed=1;
-- !x! end script

-- Process all non-nullable columns.
-- !x! execute script nullqa_one_innerloop with (staging=!!#staging!!, table=!!#table!!)

-- Create the return value
if object_id('tempdb..#ups_null_error_list') is not null drop table #ups_null_error_list;
select 
	column_name + ' (' + cast(null_rows as varchar(max)) + ')' as prepped_col
into #ups_null_error_list
from
	#ups_nonnull_cols	
where
	coalesce(null_rows, 0) > 0
;
-- !x! execute script string_agg with (table_name="#ups_null_error_list", string_col=prepped_col, order_col=prepped_col, delimiter=", ", string_var=!!#error_list!!)


-- !x! END SCRIPT
-- End of          NULLQA_ONE
-- ****************************************************************
-- ****************************************************************
-- ****************************************************************
--			Script NULLQA_ONE_INNERLOOP
-- ---------------------------------------------------------------
-- !x! BEGIN SCRIPT NULLQA_ONE_INNERLOOP with parameters (staging, table)

-- !x! execute script ups_get_next_column
-- !x! if(hasrows(#ups_next_column))
	-- !x! subdata ~column_name #ups_next_column

	-- !x! if(sub_defined(logfile))
		-- !x! write "Checking column !!~column_name!!." to !!logfile!!
	-- !x! endif
	
	if object_id('tempdb..#ups_qa_nonnull_col') is not null drop table #ups_qa_nonnull_col;
	select top 1 nrows
	into #ups_qa_nonnull_col
	from (
		select count(*) as nrows
		from !!#staging!!.!!#table!!
		where !!~column_name!! is null
		) as nullcount
	where nrows > 0;
	-- !x! if(hasrows(#ups_qa_nonnull_col))
		-- !x! subdata ~nullrows #ups_qa_nonnull_col
		-- !x! write "    Column !!~column_name!! has !!~nullrows!! nulls."
		-- !x! if(sub_defined(logfile))
			-- !x! write "    Column !!~column_name!! has !!~nullrows!! nulls." to !!logfile!!
		-- !x! endif
		update #ups_nonnull_cols
		set null_rows = (select top 1 nrows from #ups_qa_nonnull_col)
		where column_name = '!!~column_name!!';
	-- !x! endif
	
	
	-- Mark this constraint as processed.
	update #ups_nonnull_cols
	set processed = 1
	where column_name = '!!~column_name!!';
	
	-- Loop.
	-- !x! execute script nullqa_one_innerloop with (staging=!!#staging!!, table=!!#table!!)
	
-- !x! endif

-- !x! END SCRIPT
-- ###################  End of NULL_QA_ONE  #######################
-- ################################################################



-- ################################################################
--			Script PKQA_ONE
--
-- Check data a staging table for violations of the primary key 
-- of the corresponding base table.
-- Reports any PK violations found to the console and optionally
-- to a log file.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		table			: The table name--same for base and staging.
--		display_errors	: A value of 'Yes' or 'No' to indicate whether
--							unrecognized values should be displayed
--							in a GUI.
--	Output parameters:
--		error_list		: The name of the variable to receive a count
--							of the total number of distinct PK values
--							having violations, followed by a count of
--							the total rows associated with each.
--
-- Global variables:
--		logfile			: The name of a log file to which update
--							messages will be written.  Optional.
--		log_sql			: A value of 'Yes' or 'No' to indicate whether
--							the SQL that is generated for each foreign
--							key check is written to the logfile.  Optional.
--		log_errors		: A value of 'Yes' or 'No' to indicate whether
--							foreign key errors are written to the logfile.
--							Optional.
--
-- Tables and views created or modified:
--		#ups_primary_key_columns		: temporary table
--		#ups_pk_check					: temporary table
--		#ups_ercnt						: temporary table
-- ===============================================================

-- !x! BEGIN SCRIPT PKQA_ONE with parameters (base_schema, staging, table, display_errors, error_list)

-- Write an initial header to the logfile.
-- !x! if(sub_defined(logfile))
	-- !x! write "" to !!logfile!!
	-- !x! write "==================================================================" to !!logfile!!
	-- !x! write "!!$current_time!! -- Primary key QA checks on table !!#staging!!.!!#table!!" to !!logfile!!
-- !x! endif

-- !x! write "Conducting primary key QA checks on table !!#staging!!.!!#table!!"

-- Validate inputs: base/staging schemas and table
-- !x! execute script validate_one with args (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!#table!!, script=!!$CURRENT_SCRIPT!!, script_line=!!$SCRIPT_LINE!!)

-- Initialize the return value to False (no primary key errors)
-- !x! sub_empty !!#error_list!!

-- Create a table of primary key columns on this table
if object_id('tempdb..#ups_primary_key_columns') is not null drop table #ups_primary_key_columns;
select k.constraint_name, k.column_name, k.ordinal_position
into #ups_primary_key_columns
from information_schema.table_constraints as tc
inner join information_schema.key_column_usage as k
    on tc.constraint_type = 'PRIMARY KEY' 
    and tc.constraint_name = k.constraint_name
    and tc.constraint_catalog = k.constraint_catalog
    and tc.constraint_schema = k.constraint_schema
    and tc.table_schema = k.table_schema
    and tc.table_name = k.table_name
	and tc.constraint_name = k.constraint_name
where
	k.table_name = '!!#table!!'
	and k.table_schema = '!!#base_schema!!'
order by k.ordinal_position
;

-- !x! if(hasrows(#ups_primary_key_columns))
	-- !x! subdata ~constraint_name #ups_primary_key_columns
	
	-- !x! if(sub_defined(logfile))
		-- !x! write "Checking constraint !!~constraint_name!!." to !!logfile!!
	-- !x! endif
	
	-- Get a comma-delimited list of primary key columns to build SQL selection for duplicate keys
	-- !x! sub_empty ~pkcollist
	-- !x! execute script string_agg with (table_name=#ups_primary_key_columns, string_col=column_name, order_col=ordinal_position, delimiter=", ", string_var=+pkcollist)
	
	-- Construct a query to test for duplicate values for pk columns.
	-- !x! sub 			  ~pk_check   if object_id('tempdb..#ups_pk_check') is not null drop table #ups_pk_check;
	-- !x! sub_append     ~pk_check   select !!~pkcollist!!, count(*) as row_count
	-- !x! sub_append     ~pk_check   into #ups_pk_check
	-- !x! sub_append     ~pk_check   from !!#staging!!.!!#table!! as s
	-- !x! sub_append     ~pk_check   group by !!~pkcollist!!
	-- !x! sub_append     ~pk_check   having count(*) > 1

	-- Write the SQL to the log file if requested.
	-- !x! if(sub_defined(logfile))
	-- !x! andif(sub_defined(log_sql))
	-- !x! andif(is_true(!!log_sql!!))
		-- !x! write "SQL for primary key check:" to !!logfile!!
		-- !x! write [!!~pk_check!!] to !!logfile!!
	-- !x! endif

	-- Run the check.
	!!~pk_check!!;
	-- !x! if(hasrows(#ups_pk_check))
		-- !x! write "    Duplicate key error on columns: !!~pkcollist!!."
		if object_id('tempdb..#ups_ercnt') is not null drop table #ups_ercnt;
		select count(*) as errcnt, sum(row_count) as total_rows
		into #ups_ercnt
		from #ups_pk_check;
		-- !x! select_sub #ups_ercnt
		-- !x! sub !!#error_list!! !!@errcnt!! duplicated key(s) (!!@total_rows!! rows)
		-- !x! if(sub_defined(logfile))
			-- !x! write "Duplicate primary key values in !!#staging!!.!!#table!!" to !!logfile!!
			-- !x! if(sub_defined(log_errors))
			-- !x! andif(is_true(!!log_errors!!))
				-- !x! export #ups_pk_check append to !!logfile!! as txt
			-- !x! endif
		-- !x! endif
		-- !x! if(is_true(!!#display_errors!!))
			-- !x! prompt message "Primary key violations in !!#table!!" display #ups_pk_check
		-- !x! endif
	-- !x! endif
-- !x! endif


-- !x! END SCRIPT
-- ####################  End of PKQA_ONE  ########################
-- ################################################################



-- ################################################################
--			Script FKQA_ONE
--
-- Checks foreign keys from a staging table against a base table
-- and, if it exists, another staging table that is an image of the
-- base table.
-- Reports any bad references found to the console and optionally
-- to a log file.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		table			: The table name--same for base and staging.
--		display_errors	: A value of 'Yes' or 'No' to indicate whether
--							unrecognized values should be displayed
--							in a GUI.
--	Output parameters:
--		error_list		: The name of the variable to receive a comma-
--							delimited list of the names of foreign key
--							constraints that are not met.
--
-- Global variables:
--		logfile			: The name of a log file to which update
--							messages will be written.  Optional.
--		log_sql			: A value of 'Yes' or 'No' to indicate whether
--							the SQL that is generated for each foreign
--							key check is written to the logfile.  Optional.
--		log_errors		: A value of 'Yes' or 'No' to indicate whether
--							foreign key errors are written to the logfile.
--							Optional.
--
-- Tables and views created or modified:
--		ups_foreign_key_columns		: temporary table
--		ups_sel_fks					: temporary table
--		ups_fk_constraints			: temporary table
--		ups_next_constraint			: temporary table
--		ups_fk_error_list			: temporary table
--		ups_one_fk					: temporary table
--		ups_fk_joins				: temporary table
--		ups_fk_check				: temporary table
--		ups_ercnt					: temporary table
-- ===============================================================

-- !x! BEGIN SCRIPT FKQA_ONE with parameters (base_schema, staging, table, display_errors, error_list)

-- Write an initial header to the logfile.
-- !x! if(sub_defined(logfile))
	-- !x! write "" to !!logfile!!
	-- !x! write "==================================================================" to !!logfile!!
	-- !x! write "!!$current_time!! -- Foreign key QA checks on table !!#staging!!.!!#table!!" to !!logfile!!
-- !x! endif

-- !x! write "Conducting foreign key QA checks on table !!#staging!!.!!#table!!"

-- Validate inputs: base/staging schemas and table
-- !x! execute script validate_one with args (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!#table!!, script=!!$CURRENT_SCRIPT!!, script_line=!!$SCRIPT_LINE!!)

-- Initialize the return value to False (no foreign key errors)
-- !x! sub_empty !!#error_list!!


-- Create a table of *all* foreign key dependencies in this database.
-- Because this may be an expensive operation (in terms of time), the
-- table is not re-created if it already exists.  "Already exists"
-- means that a table with the expected name exists.  No check is
-- done to ensure that this table has the correct structure.  The
-- goal is to create the table of all foreign keys only once to
-- minimize the time required if QA checks are to be run on multiple
-- staging tables.
if object_id('tempdb..#ups_foreign_key_columns') is null
select
	rc.constraint_name,
	cu.table_schema,
	cu.table_name,
	cu.column_name,
	cu.ordinal_position,
	cu_uq.table_schema as uq_schema,
	cu_uq.table_name as uq_table,
	cu_uq.column_name as uq_column
into #ups_foreign_key_columns
from
	(select distinct constraint_catalog, constraint_schema, constraint_name,
		unique_constraint_catalog, unique_constraint_schema, unique_constraint_name
		from information_schema.referential_constraints) as rc
	inner join (select * from information_schema.table_constraints
		where constraint_type = 'FOREIGN KEY') as tc
		on tc.constraint_catalog = rc.constraint_catalog
		and tc.constraint_schema = rc.constraint_schema
		and tc.constraint_name = rc.constraint_name
	inner join (select * from information_schema.table_constraints
		where constraint_type not in ('FOREIGN KEY', 'CHECK') ) as tc_uq
		on tc_uq.constraint_catalog = rc.unique_constraint_catalog
		and tc_uq.constraint_schema = rc.unique_constraint_schema
		and tc_uq.constraint_name = rc.unique_constraint_name
	inner join information_schema.key_column_usage as cu
		on cu.constraint_catalog = tc.constraint_catalog
		and cu.constraint_schema = tc.constraint_schema
		and cu.constraint_name = tc.constraint_name
		and cu.table_catalog = tc.table_catalog
		and cu.table_schema = tc.table_schema
		and cu.table_name = tc.table_name
	inner join information_schema.key_column_usage as cu_uq
		on cu_uq.constraint_catalog = tc_uq.constraint_catalog
		and cu_uq.constraint_schema = tc_uq.constraint_schema
		and cu_uq.constraint_name = tc_uq.constraint_name
		and cu_uq.table_catalog = tc_uq.table_catalog
		and cu_uq.table_schema = tc_uq.table_schema
		and cu_uq.ordinal_position = cu.ordinal_position
;


-- Create a temporary table of just the foreign key relationships for the base
-- table corresponding to the staging table to check.
if object_id('tempdb..#ups_sel_fks') is not null drop table #ups_sel_fks;
select
	constraint_name, column_name,
	ordinal_position,
	uq_schema, uq_table, uq_column
into #ups_sel_fks
from
	#ups_foreign_key_columns
where
	table_schema = '!!#base_schema!!'
	and table_name = '!!#table!!';

-- Create a temporary table of all unique constraint names for
-- this table, with an integer column to be populated with the
-- number of rows failing the foreign key check, and a 'processed'
-- 	flag to control looping.
if object_id('tempdb..#ups_fk_constraints') is not null drop table #ups_fk_constraints;
select distinct
	constraint_name, 
	cast(0 as integer) as fkerror_rows,
	cast(0 as bit) as processed
into #ups_fk_constraints
from #ups_sel_fks;

-- Create a script to select one constraint to process
-- !x! begin script ups_get_next_constraint
if object_id('tempdb..#ups_next_constraint') is not null drop table #ups_next_constraint;
select top 1 constraint_name
into #ups_next_constraint
from #ups_fk_constraints
where not processed=1;
-- !x! end script


-- Process all constraints: check every foreign key.
-- !x! execute script fk_qa_one_innerloop with (staging=!!#staging!!, table=!!#table!!, display_errors=!!#display_errors!!)

-- Create the return value.
if object_id('tempdb..#ups_fk_error_list') is not null drop table #ups_fk_error_list;
select
	constraint_name + ' (' + cast(fkerror_rows as varchar(max)) + ')' as fkc_errors,
	constraint_name
into #ups_fk_error_list
from #ups_fk_constraints
where coalesce(fkerror_rows, 0) > 0;

-- !x! execute script string_agg with (table_name=#ups_fk_error_list, string_col=fkc_errors, order_col=constraint_name, delimiter=", ", string_var=!!#error_list!!)

-- !x! END SCRIPT
-- End of          FKQA_ONE
-- ****************************************************************
-- ****************************************************************
--			Script FK_QA_ONE_INNERLOOP
-- ----------------------------------------------------------------
-- !x! BEGIN SCRIPT FK_QA_ONE_INNERLOOP with parameters (staging, table, display_errors)

-- !x! execute script ups_get_next_constraint
-- !x! if(hasrows(#ups_next_constraint))
	-- !x! subdata ~constraint_name #ups_next_constraint

	-- !x! if(sub_defined(logfile))
		-- !x! write "Checking constraint !!~constraint_name!!." to !!logfile!!
	-- !x! endif

	
	if object_id('tempdb..#ups_one_fk') is not null drop table #ups_one_fk;
	select column_name, uq_schema, uq_table, uq_column, ordinal_position
	into #ups_one_fk
	from #ups_sel_fks
	where 
		constraint_name = '!!~constraint_name!!';

	-- Get the unique table schema and name into data variables.
	-- !x! select_sub #ups_one_fk

	-- Create join expressions from staging table (s) to unique table (u)
	-- and to staging table equivalent to unique table (su) (though we
	-- don't know yet if the latter exists).  Also create a 'where'
	-- condition to ensure that all columns being matched are non-null.
	-- Also create a comma-separated list of the columns being checked.
	if object_id('tempdb..#ups_fk_joins') is not null drop table #ups_fk_joins;
	select
		cast('s.' + column_name + ' = u.' + uq_column as varchar(max)) as u_join,
		cast('s.' + column_name + ' = su.' + uq_column as varchar(max)) as su_join,
		cast('s.' + column_name + ' is not null' as varchar(max)) as s_not_null,
		cast('s.' + column_name as varchar(max)) as s_checked,
		ordinal_position
	into #ups_fk_joins
	from 
		#ups_one_fk;
	
	-- Create local variables for the different parts of the join expressions
	-- !x! sub_empty ~u_join
	-- !x! sub_empty ~su_join
	-- !x! sub_empty ~s_not_null
	-- !x! sub_empty ~s_checked
	
	-- Then populate them using the string aggregation script
	-- !x! execute script string_agg with(table_name=#ups_fk_joins, string_col=u_join, order_col=ordinal_position, delimiter =" and ", string_var=+u_join)
	-- !x! execute script string_agg with(table_name=#ups_fk_joins, string_col=su_join, order_col=ordinal_position, delimiter =" and ", string_var=+su_join)
	-- !x! execute script string_agg with(table_name=#ups_fk_joins, string_col=s_not_null, order_col=ordinal_position, delimiter =" and ", string_var=+s_not_null)
	-- !x! execute script string_agg with(table_name=#ups_fk_joins, string_col=s_checked, order_col=ordinal_position, delimiter =", ", string_var=+s_checked)
	
	
	-- Determine whether a staging-table equivalent of the unique table exists.
	-- !x! sub su_exists No
	-- !x! if(table_exists(!!#staging!!.!!@uq_table!!))
		-- !x! sub su_exists Yes
	-- !x! endif

	-- Construct a query to test for missing unique values for fk columns.
	-- !x! sub 			  ~fk_check   if object_id('tempdb..#ups_fk_check') is not null drop table #ups_fk_check;
	-- !x! sub_append     ~fk_check   select !!~s_checked!!, count(*) as row_count
	-- !x! sub_append     ~fk_check   into #ups_fk_check
	-- !x! sub_append     ~fk_check   from !!#staging!!.!!#table!! as s
	-- !x! sub_append     ~fk_check   left join !!@uq_schema!!.!!@uq_table!! as u on !!~u_join!!
	-- !x! if(is_true(!!su_exists!!))
		-- !x! sub_append ~fk_check   left join !!#staging!!.!!@uq_table!! as su on !!~su_join!!
	-- !x! endif
	-- !x! sub_append     ~fk_check   where u.!!@uq_column!! is null
	-- !x! if(is_true(!!su_exists!!))
		-- !x! sub_append ~fk_check   and su.!!@uq_column!! is null
	-- !x! endif
	-- !x! sub_append     ~fk_check   and !!~s_not_null!!
	-- !x! sub_append     ~fk_check   group by !!~s_checked!!

	-- Write the SQL to the log file if requested.
	-- !x! if(sub_defined(logfile))
	-- !x! andif(sub_defined(log_sql))
	-- !x! andif(is_true(!!log_sql!!))
		-- !x! write "SQL for foreign key check:" to !!logfile!!
		-- !x! write [!!~fk_check!!] to !!logfile!!
	-- !x! endif

	-- Run the check.
	!!~fk_check!!;
	-- !x! if(hasrows(#ups_fk_check))
		-- !x! write "    Foreign key error referencing !!@uq_table!!."
		if object_id('tempdb..#ups_ercnt') is not null drop table #ups_ercnt;
		select count(*) as ercnt
		into #ups_ercnt
		from #ups_fk_check;
		-- !x! subdata ~errcnt #ups_ercnt
		update #ups_fk_constraints
		set fkerror_rows = !!~errcnt!!
		where constraint_name = '!!~constraint_name!!';
		-- !x! if(sub_defined(logfile))
			-- !x! write " Foreign key errors in !!#table!! referencing !!@uq_table!!" to !!logfile!!
			-- !x! if(sub_defined(log_errors))
			-- !x! andif(is_true(!!log_errors!!))
				-- !x! export #ups_fk_check append to !!logfile!! as txt
			-- !x! endif
		-- !x! endif
		-- !x! if(is_true(!!#display_errors!!))
			-- !x! prompt message "Foreign key errors in !!#table!! referencing !!@uq_table!!" display #ups_fk_check
		-- !x! endif
	-- !x! endif


	-- Mark this constraint as processed.
	update #ups_fk_constraints
	set processed = 1
	where constraint_name = '!!~constraint_name!!';

	-- Loop.
	-- !x! execute script fk_qa_one_innerloop with (staging=!!#staging!!, table=!!#table!!, display_errors=!!#display_errors!!)

-- !x! endif

-- !x! END SCRIPT
-- ####################  End of FK_QA_ONE  ########################
-- ################################################################



-- ################################################################
--			Script UPSERT_ONE
--
-- Adds data from a staging table to a base table, using UPDATE
-- and INSERT statements.  Displays data to be modified to the
-- user before any modifications are done.  Reports the changes
-- made to the console and optionally to a log file.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		table			: The table name--same for base and staging.
--		exclude_cols	: A comma-delimited list of single-quoted
--							column names within enclosing double quotes,
--							identifying the columns
--							of the base table that are not to be
--							modified.  These may be autonumber
--							columns or columns filled by triggers.
--		display_changes	: A boolean variable indicating whether
--							or not the changes to be made to the 
--							base table should be displayed in a GUI.
--		display_final	: A boolean variable indicating whether or
--							not the base table should be displayed
--							after updates and inserts are completed.
--		updcntvar		: The name of a substitution variable that
--							will be set to the number of rows updated.
--		inscntvar		: The name of a substitution variable that
--							will be set to the number of rows inserted.
--
--	Global variables:
--		logfile			: The name of a log file to which update
--							messages will be written.  Optional.
--		log_sql			: A value of 'Yes' or 'No' indicating whether
--							the update and insert statements should
--							also be written to the logfile.  Optional.
--		log_changes		: A value of 'Yes' or 'No' indicating whether
--							the updated and inserted data should be
--							written to the logfile.  Optional.
--
-- Tables and views created or modified:
--		ups_cols				: temporary table
--		ups_pks					: temporary table
--		ups_allcollist			: temporary table
--		ups_allbasecollist		: temporary table
--		ups_allstgcollist		: temporary table
--		ups_pkcollist			: temporary table
--		ups_joinexpr			: temporary table
--		ups_basematches			: temporary table
--		ups_stgmatches			: temporary table
--		ups_nk					: temporary table
--		ups_assexpr				: temporary table
--		ups_newrows				: temporary table
-- ===============================================================

-- !x! BEGIN SCRIPT UPSERT_ONE with parameters (base_schema, staging, table, exclude_cols, display_changes, display_final, updcntvar, inscntvar)

-- Remove substitution variables that will contain the generated
-- update and insert statements so that the existence of valid
-- statements can be later tested based on the existence of these variables.
-- !x! rm_sub ~updatestmt
-- !x! rm_sub ~insertstmt

-- !x! sub ~do_updates Yes
-- !x! sub ~do_inserts Yes

-- !x! if(sub_defined(logfile))
	-- !x! write "" to !!logfile!!
	-- !x! write "==================================================================" to !!logfile!!
	-- !x! write "!!$current_time!! -- Performing upsert on table !!#base_schema!!.!!#table!!" to !!logfile!!
-- !x! endif


-- !x! write "Performing upsert on table !!#base_schema!!.!!#table!!"


-- Validate inputs: base/staging schemas and table
-- !x! execute script validate_one with args (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!#table!!, script=!!$CURRENT_SCRIPT!!, script_line=!!$SCRIPT_LINE!!)


-- Populate a (temporary) table with the names of the columns
-- in the base table that are to be updated from the staging table.
-- Include only those columns from staging table that are also in base table.
-- !x! if(is_null("!!#exclude_cols!!"))
	-- !x! sub_empty ~col_excl
-- !x! else
	-- !x! sub ~col_excl and column_name not in (!!#exclude_cols!!)
-- !x! endif
if object_id('tempdb..#ups_cols') is not null drop table #ups_cols;
select s.column_name, s.ordinal_position
into #ups_cols
from 
	information_schema.columns as s
	inner join information_schema.columns as b on s.column_name=b.column_name
where
	s.table_schema = '!!#staging!!'
	and s.table_name = '!!#table!!'
	and b.table_schema = '!!#base_schema!!' 
	and b.table_name = '!!#table!!'
	!!~col_excl!!
;


-- Populate a (temporary) table with the names of the primary key
-- columns of the base table.
if object_id('tempdb..#ups_pks') is not null drop table #ups_pks;
select k.column_name, k.ordinal_position
into #ups_pks
from information_schema.table_constraints as tc
inner join information_schema.key_column_usage as k
    on tc.constraint_type = 'PRIMARY KEY' 
    and tc.constraint_name = k.constraint_name
    and tc.constraint_catalog = k.constraint_catalog
    and tc.constraint_schema = k.constraint_schema
    and tc.table_schema = k.table_schema
    and tc.table_name = k.table_name
	and tc.constraint_name = k.constraint_name
where
	k.table_name = '!!#table!!'
	and k.table_schema = '!!#base_schema!!'
;

-- Get all base table columns that are to be updated into a comma-delimited list.
if object_id('tempdb..#ups_allcollist') is not null drop table #ups_allcollist;
select
	cast(column_name as varchar(max)) as column_list,
	ordinal_position
into #ups_allcollist
from #ups_cols
;
-- !x! sub_empty ~allcollist
-- !x! execute script string_agg with(table_name=#ups_allcollist, string_col=column_list, order_col=ordinal_position, delimiter=", ", string_var=+allcollist)


-- Get all base table columns that are to be updated into a comma-delimited list
-- with a "b." prefix.
if object_id('tempdb..#ups_allbasecollist') is not null drop table #ups_allbasecollist;
select 
	cast('b.' + column_name as varchar(max)) as column_list,
	ordinal_position
into #ups_allbasecollist
from #ups_cols;
-- !x! sub_empty ~allbasecollist 
-- !x! execute script string_agg with(table_name=#ups_allbasecollist, string_col=column_list, order_col=ordinal_position, delimiter=", ", string_var=+allbasecollist)


-- Get all staging table column names for columns that are to be updated
-- into a comma-delimited list with an "s." prefix.
if object_id('tempdb..#ups_allstgcollist') is not null drop table #ups_allstgcollist;
select 
	cast('s.' + column_name as varchar(max)) as column_list,
	ordinal_position
into #ups_allstgcollist
from #ups_cols;
-- !x! sub_empty ~allstgcollist
-- !x! execute script string_agg with(table_name=#ups_allstgcollist, string_col=column_list, order_col=ordinal_position, delimiter=", ", string_var=+allstgcollist)


-- Get the primary key columns in a comma-delimited list.
if object_id('tempdb..#ups_pkcollist') is not null drop table #ups_pkcollist;
select 
	cast(column_name as varchar(max)) as column_list,
	ordinal_position
into #ups_pkcollist
from #ups_pks;
-- !x! sub_empty ~pkcollist 
-- !x! execute script string_agg with(table_name=#ups_pkcollist, string_col=column_list, order_col=ordinal_position, delimiter=", ", string_var=+pkcollist)


-- Create a join expression for key columns of the base (b) and
-- staging (s) tables.
if object_id('tempdb..#ups_joinexpr') is not null drop table #ups_joinexpr;
select
	cast('b.' + column_name + ' = s.' + column_name as varchar(max)) as column_list,
	ordinal_position
into #ups_joinexpr
from
	#ups_pks;
-- !x! sub_empty ~joinexpr 
-- !x! execute script string_agg with(table_name=#ups_joinexpr, string_col=column_list, order_col=ordinal_position, delimiter=" and ", string_var=+joinexpr)


-- Create a FROM clause for an inner join between base and staging
-- tables on the primary key column(s).
-- !x! sub ~fromclause FROM !!#base_schema!!.!!#table!! as b INNER JOIN !!#staging!!.!!#table!! as s ON !!~joinexpr!!


-- Create SELECT queries to pull all columns with matching keys from both
-- base and staging tables.
if object_id('tempdb..#ups_basematches') is not null drop table #ups_basematches;
select !!~allbasecollist!!
into #ups_basematches
!!~fromclause!!;

if object_id('tempdb..#ups_stgmatches') is not null drop table #ups_stgmatches;
select !!~allstgcollist!!
into #ups_stgmatches
!!~fromclause!!;

--Get non-key columns to be updated
if object_id('tempdb..#ups_nk') is not null drop table #ups_nk;
select column_name
into #ups_nk
from 
		(
		select column_name from #ups_cols
		except
		select column_name from #ups_pks
		) as nk
;


-- Prompt user to examine matching data and commit, don't commit, or quit.
-- !x! if(hasrows(#ups_stgmatches))
-- !x! andif(hasrows(#ups_nk))	
	-- Prompt user to examine matching data and commit, don't commit, or quit.
	-- !x! if(is_true(!!#display_changes!!))
		-- !x! prompt ask "Do you want to make these changes? For table !!#table!!, new data are shown in the top table below; existing data are in the lower table." sub ~do_updates compare #ups_stgmatches and #ups_basematches key (!!~pkcollist!!)
	-- !x! endif
	-- !x! if(is_true(!!~do_updates!!))
		-- Create an assignment expression to update non-key columns of the
		-- base table (as b) from columns of the staging table (as s).
		if object_id('tempdb..#ups_assexpr') is not null drop table #ups_assexpr;
		select
			cast('b.' + column_name + ' = s.' + column_name as varchar(max)) as column_list
		into #ups_assexpr
		from #ups_nk;
		-- !x! sub_empty ~assexpr
		-- !x! execute script string_agg with(table_name=#ups_assexpr, string_col=column_list, order_col=column_list, delimiter=", ", string_var=+assexpr)
		-- Create an UPDATE statement to update the base table with
		-- non-key columns from the staging table.  No semicolon terminating generated SQL.
		-- !x! sub ~updatestmt UPDATE b SET !!~assexpr!! FROM !!#base_schema!!.!!#table!! as b INNER JOIN !!#staging!!.!!#table!! as s on !!~joinexpr!! 
	-- !x! endif
-- !x! endif

-- Create a select statement to find all rows of the staging table
-- that are not in the base table.
if object_id('tempdb..#ups_newrows') is not null drop table #ups_newrows;
with newpks as (
	select !!~pkcollist!! from !!#staging!!.!!#table!!
	except
	select !!~pkcollist!! from !!#base_schema!!.!!#table!!
	)
select
	--s.*
	!!~allstgcollist!!
into #ups_newrows
from 
	!!#staging!!.!!#table!! as s
	inner join newpks as b on !!~joinexpr!!;
	
-- Prompt user to examine new data and continue or quit.
-- !x! if(hasrows(#ups_newrows))
	-- !x! if(is_true(!!#display_changes!!))
		-- !x! prompt ask "Do you want to add these new data to the !!#base_schema!!.!!#table!! table?" sub ~do_inserts display #ups_newrows
	-- !x! endif

	-- !x! if(is_true(!!~do_inserts!!))
		-- Create an insert statement.  No semicolon terminating generated SQL.
		-- !x! sub ~insertstmt INSERT INTO !!#base_schema!!.!!#table!! (!!~allcollist!!) SELECT !!~allcollist!! FROM #ups_newrows
	-- !x! endif
-- !x! endif
	
-- Run the update and insert statements.	


-- !x! if(sub_defined(~updatestmt))
-- !x! andif(is_true(!!~do_updates!!))
	-- !x! write "Updating !!#base_schema!!.!!#table!!"
	-- !x! if(sub_defined(logfile))
		-- !x! write "" to !!logfile!!
		-- !x! if(sub_defined(log_sql))
		-- !x! andif(is_true(!!log_sql!!))
			-- !x! write "UPDATE statement for !!#base_schema!!.!!#table!!:" to !!logfile!!
			-- !x! write [!!~updatestmt!!] to !!logfile!!
		-- !x! endif
		-- !x! if(sub_defined(log_changes))
		-- !x! andif(is_true(!!log_changes!!))
			-- !x! write "Updates:" to !!logfile!!
			-- !x! export #ups_stgmatches append to !!logfile!! as txt
		-- !x! endif
		-- !x! write "" to !!logfile!!
	-- !x! endif
	!!~updatestmt!!;
	-- !x! sub !!#updcntvar!! !!$last_rowcount!!
	-- !x! if(sub_defined(logfile))
		-- !x! write "!!$last_rowcount!! rows of !!#base_schema!!.!!#table!! updated." to !!logfile!!
	-- !x! endif
	-- !x! write "    !!$last_rowcount!! rows updated."
-- !x! endif

-- !x! if(sub_defined(~insertstmt))
-- !x! andif(is_true(!!~do_inserts!!))
	-- !x! write "Adding data to !!#base_schema!!.!!#table!!"
	-- !x! if(sub_defined(logfile))
		-- !x! write "" to !!logfile!!
		-- !x! if(sub_defined(log_sql))
		-- !x! andif(is_true(!!log_sql!!))
			-- !x! write "INSERT statement for !!#base_schema!!.!!#table!!:" to !!logfile!!
			-- !x! write [!!~insertstmt!!] to !!logfile!!
		-- !x! endif
		-- !x! if(sub_defined(log_changes))
		-- !x! andif(is_true(!!log_changes!!))
			-- !x! write "New data:" to !!logfile!!
			-- !x! export #ups_newrows append to !!logfile!! as txt
		-- !x! endif
		-- !x! write "" to !!logfile!!
	-- !x! endif
	!!~insertstmt!!;
	-- !x! sub !!#inscntvar!! !!$last_rowcount!!
	-- !x! if(sub_defined(logfile))
		-- !x! write "!!$last_rowcount!! rows added to !!#base_schema!!.!!#table!!." to !!logfile!!
	-- !x! endif
	-- !x! write "    !!$last_rowcount!! rows added."
-- !x! endif

-- !x! if(is_true(!!#display_final!!))
	-- !x! prompt message "Table !!#base_schema!!.!!#table!! after updates and inserts." display !!#base_schema!!.!!#table!!
-- !x! endif

-- !x! END SCRIPT
-- ###################  End of UPSERT_ONE  ########################
-- ################################################################




-- ################################################################
--			Script UPSERT_ALL
--
-- Updates multiple base tables with new or revised data from
-- staging tables, using the UPSERT_ONE script.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		control_table	: The name of a table containing at least the
--							following four columns:
--								table_name	: The name of a table
--												  to be updated.
--								exclude_cols	: A comma-delimited
--													list of single-
--													quoted column
--													names, as required
--													by UPDATE_ANY.
--								display_changes	: A value of "Yes" or
--													"No" indicating
--													whether the changes
--													for the table should
--													be displayed.
--								display_final	: A value of "Yes" or
--													"No" indicating
--													whether the final
--													state of the table
--													should be displayed.
--							A table with these columns will be created
--							by the script STAGED_TO_LOAD.
--
-- Global variables:
--		logfile			: The name of a log file to which update
--							messages will be written.  Optional.
--		log_sql			: A boolean variable indicating whether
--							the update and insert statements should
--							also be written to the logfile.
--		upsert_progress_denom	: Created or modified.
--
-- Tables and views used:
--		ups_dependencies		: temporary table
--		ups_ordered_tables		: temporary table
--		ups_upsert_rows			: temporary table
--
-- Counter variables:
--		221585944		: Progress indicator.
--
-- ===============================================================

-- !x! BEGIN SCRIPT UPSERT_ALL with parameters (base_schema, staging, control_table)


-- Validate contents of control table
-- !x! execute script validate_control with (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!, script=!!$CURRENT_SCRIPT_NAME!!, script_line=!!$SCRIPT_LINE!!)

-- Initialize the status and progress bars if the console is running.
-- !x! if(console_on)
	-- !x! reset counter 221585944
	-- !x! console status "Merging data"
	-- !x! console progress 0
	if object_id('tempdb..#ups_upsert_rows') is not null drop table #ups_upsert_rows;
	select count(*) + 1 as upsert_rows
	into #ups_upsert_rows
	from !!#control_table!!;
	-- !x! subdata upsert_progress_denom #ups_upsert_rows
-- !x! endif

-- Get a table of all dependencies for the base schema.
if object_id('tempdb..#ups_dependencies') is not null drop table #ups_dependencies;
select 
	tc.table_name as child,
	tp.table_name as parent
into #ups_dependencies
from 
	information_schema.table_constraints as tc
	inner join information_schema.referential_constraints as cu on tc.constraint_name=cu.constraint_name
	inner join information_schema.table_constraints as tp on tp.constraint_name=cu.unique_constraint_name
where 
	tc.constraint_type = 'FOREIGN KEY'
	and tc.table_schema = '!!#base_schema!!'
	--Exclude cases where parent and child are same table (to protect against infinite recursion in table ordering)
	and tc.table_name<>tp.table_name;

	
-- Create a list of tables in the base schema ordered by dependency.
if object_id('tempdb..#ups_ordered_tables') is not null drop table #ups_ordered_tables;
with dep_depth as (
	select
		dep.child as first_child,
  		dep.child,
  		dep.parent,
  		1 as lvl
	from
		#ups_dependencies as dep
	union all
	select
		dd.first_child,
		dep.child,
		dep.parent,
		dd.lvl + 1 as lvl
	from
		dep_depth as dd
		inner join #ups_dependencies as dep on dep.parent = dd.child 
			and dep.child <> dd.parent
			and not (dep.parent = dd.first_child and dd.lvl > 2)
 	)
select
	table_name,
	table_order
into
	#ups_ordered_tables
from
	(
	--All parents
	select
		dd.parent as table_name,
		max(lvl) as table_order
	from
		dep_depth as dd
	group by
		dd.parent
	union
	--Children that are not parents
	select
		dd.child as table_name,
		max(lvl) + 1 as level
	from
		dep_depth as dd
		left join #ups_dependencies as dp on dp.parent = dd.child
	where
		dp.parent is null
	group by
		dd.child
	union
	--Neither parents nor children
		select distinct
		t.table_name,
		0 as level
	from
		information_schema.tables as t
		left join #ups_dependencies as p on t.table_name=p.parent
		left join #ups_dependencies as c on t.table_name=c.child
	where
		t.table_schema = '!!#base_schema!!'
		and t.table_type = 'BASE TABLE'
		and p.parent is null
		and c.child is null
	) as all_levels;

	
-- Create a list of the selected tables with ordering information.
if object_id('tempdb..#ups_proctables') is not null drop table #ups_proctables;
select
	ot.table_order,
	tl.table_name,
	tl.exclude_cols,
	tl.display_changes,
	tl.display_final,
	tl.rows_updated,
	tl.rows_inserted,
	cast(0 as bit) as processed
into
	#ups_proctables
from
	!!#control_table!! as tl
	inner join #ups_ordered_tables as ot on ot.table_name = tl.table_name
	;
	
	
-- Create a selection (this would be a view if temp views were allowed) returning a single unprocessed table, in order.
-- !x! begin script ups_get_toprocess
if object_id('tempdb..#ups_toprocess') is not null drop table #ups_toprocess;
select top 1
	table_name, exclude_cols,
	display_changes, display_final,
	rows_updated, rows_inserted
into #ups_toprocess
from #ups_proctables
where not processed=1
order by table_order;
-- !x! end script


-- Process all tables in order.
-- !x! execute script upsert_all_innerloop with (base_schema=!!#base_schema!!, staging=!!#staging!!)

-- Move the update/insert counts back into the control table.
update ct
set
	rows_updated = pt.rows_updated,
	rows_inserted = pt.rows_inserted
from !!#control_table!! as ct
inner join #ups_proctables as pt
on pt.table_name = ct.table_name;


-- Update the status bar if the console is running.
-- !x! if(console_on)
	-- !x! console status "Data merge complete"
	-- !x! console progress 0
-- !x! endif


-- !x! END SCRIPT
--					UPSERT_ALL
-- ****************************************************************
-- ****************************************************************
--		Script UPSERT_ALL_INNERLOOP
-- ---------------------------------------------------------------

-- !x! BEGIN SCRIPT UPSERT_ALL_INNERLOOP with parameters (base_schema, staging)

-- Update the status bar if the console is running.
-- !x! if(console_on)
	-- !x! console progress !!$counter_221585944!! / !!upsert_progress_denom!!
-- !x! endif

-- !x! execute script ups_get_toprocess
-- !x! if(hasrows(#ups_toprocess))
	-- Create local variables to store the row counts from updates and inserts.
	-- !x! sub ~rows_updated 0
	-- !x! sub ~rows_inserted 0

	-- !x! select_sub #ups_toprocess
	-- !x! execute script upsert_one with (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!@table_name!!, exclude_cols=[!!@exclude_cols!!], display_changes=!!@display_changes!!, display_final=!!@display_final!!, updcntvar=+rows_updated, inscntvar=+rows_inserted)

	update #ups_proctables
	set rows_updated = !!~rows_updated!!,
		rows_inserted = !!~rows_inserted!!
	where table_name = '!!@table_name!!';
	
	update #ups_proctables
	set processed = cast(1 as bit)
	where table_name = '!!@table_name!!';
	-- !x! execute script upsert_all_innerloop with (base_schema=!!#base_schema!!, staging=!!#staging!!)
-- !x! endif

-- !x! END SCRIPT
-- ###############  End of UPSERT_ALL_INNERLOOP  ##################
-- ################################################################


-- ################################################################
--			Script QA_ALL
--
-- Conducts null and foreign key checks on multiple staging tables
-- containing new or revised data for staging tables, using the
-- NULLQA_ONE, PKQA_ONE, and FKQA_ONE scripts.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		control_table	: The name of a table containing at least the
--							following three columns:
--								table_name		: The name of a table
--												  to be updated.
--								null_errors		: For a comma-separated
--													list of columns that
--													are non-nullable in
--													the base table but
--													null in the staging
--													table.
--								pk_errors		: For a count of the number 
--													of distinct primary key
--													values that are duplicated,
--													followed by the total row
--													count for the duplicated keys.
--								fk_errors		: For a comma-separated
--													list of foreign-key
--													constraint names that
--													are not met by the
--													staging table.
--							A table with these columns will be created
--							by the script STAGED_TO_LOAD.
--
-- Global variables:
--		logfile			: The name of a log file to which error
--							messages will be written.  Optional.
--		log_sql			: A value of 'Yes' or 'No' to indicate whether
--							the SQL that is generated for each foreign
--							key check is written to the logfile.  Optional.
--		log_errors		: A value of 'Yes' or 'No' to indicate whether
--							foreign key errors are written to the logfile.
--							Optional.
--
-- Tables and views used:
--		ups_proctables				: temporary table
--		ups_toprocess				: temporary table
--		ups_upsert_rows				: temporary table
--
-- Counters used:
--		221585944					: Progress bar position
--
-- Global variables modified:
--		upsert_progress_denom		: Created or redefined.
--
-- ===============================================================

-- !x! BEGIN SCRIPT QA_ALL with parameters (base_schema, staging, control_table)


-- Initialize the status and progress bars if the console is running.
-- !x! begin script update_console_qa with parameters (check_type, control_table)
-- !x! if(console_on)
	-- !x! reset counter 221585944
	-- !x! console status "Performing !!#check_type!! QA checks"
	-- !x! console progress 0
	if object_id('tempdb..#ups_upsert_rows') is not null drop table #ups_upsert_rows;
	select count(*) + 1 as upsert_rows
	into #ups_upsert_rows
	from !!#control_table!!;
	-- !x! subdata upsert_progress_denom #ups_upsert_rows
-- !x! endif
-- !x! end script

-- Create a list of the selected tables with a loop control flag.
if object_id('tempdb..#ups_proctables ') is not null drop table #ups_proctables ;
select
	tl.table_name,
	tl.exclude_null_checks,
	tl.display_changes,
	cast(0 as bit) as processed
into #ups_proctables 
from
	!!#control_table!! as tl
;

-- Create a script returning a single unprocessed table, in order
-- !x! begin script ups_qa_get_toprocess
if object_id('tempdb..#ups_toprocess') is not null drop table #ups_toprocess;
select top 1
	table_name, exclude_null_checks, display_changes
into #ups_toprocess
from #ups_proctables
where not processed=1
;
-- !x! end script

-- Perform null QA checks on all tables.
-- !x! execute script update_console_qa with args (check_type=NULL, control_table=!!#control_table!!)
-- !x! execute script qa_all_nullloop with (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)


-- Perform primary QA checks on all tables.
update #ups_proctables set processed = 0;
-- !x! execute script update_console_qa with args (check_type=primary key, control_table=!!#control_table!!)
-- !x! execute script qa_all_pkloop with (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)

-- Perform foreign key QA checks on all tables.
update #ups_proctables set processed = 0;
-- !x! execute script update_console_qa with args (check_type=foreign key, control_table=!!#control_table!!)
-- !x! execute script qa_all_fkloop with (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)


-- Update the status bar if the console is running.
-- !x! if(console_on)
	-- !x! console status "Data QA checks complete"
	-- !x! console progress 0
-- !x! endif


-- !x! END SCRIPT
--					QA_ALL
-- ****************************************************************
-- ****************************************************************
--		Script QA_ALL_NULLLOOP
-- ---------------------------------------------------------------

-- !x! BEGIN SCRIPT QA_ALL_NULLLOOP with parameters (base_schema, staging, control_table)

-- !x! sub_empty ~ups_null_error_list

-- Update the status bar if the console is running.
-- !x! if(console_on)
	-- !x! console progress !!$counter_221585944!! / !!upsert_progress_denom!!
-- !x! endif

-- !x! execute script ups_qa_get_toprocess
-- !x! if(hasrows(#ups_toprocess))
	-- !x! select_sub #ups_toprocess
	-- !x! if(is_null("!!@exclude_null_checks!!"))
		-- !x! execute script nullqa_one with (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!@table_name!!, error_list=+ups_null_error_list)
	-- !x! else 
		-- !x! execute script nullqa_one with (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!@table_name!!, error_list=+ups_null_error_list, exclude_null_checks=[!!@exclude_null_checks!!])
	-- !x! endif
	-- !x! if(not is_null("!!~ups_null_error_list!!"))
		update !!#control_table!!
		set null_errors = '!!~ups_null_error_list!!'
		where table_name = '!!@table_name!!';
	-- !x! endif

	update #ups_proctables
	set processed = 1
	where table_name = '!!@table_name!!';
	-- !x! execute script qa_all_nullloop with (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)
-- !x! endif


-- !x! END SCRIPT
--					QA_ALL_NULLLOOP
-- ****************************************************************
-- ****************************************************************
--		Script QA_ALL_PKLOOP
-- ---------------------------------------------------------------

-- !x! BEGIN SCRIPT QA_ALL_PKLOOP with parameters (base_schema, staging, control_table)

-- !x! sub_empty ~ups_pk_error_list
-- Update the status bar if the console is running.
-- !x! if(console_on)
	-- !x! console progress !!$counter_221585944!! / !!upsert_progress_denom!!
-- !x! endif

-- !x! execute script ups_qa_get_toprocess
-- !x! if(hasrows(#ups_toprocess))
	-- !x! select_sub #ups_toprocess
	-- !x! execute script pkqa_one with (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!@table_name!!, display_errors=!!@display_changes!!, error_list=+ups_pk_error_list)
	-- !x! if(not is_null("!!~ups_pk_error_list!!"))
		update !!#control_table!!
		set pk_errors = '!!~ups_pk_error_list!!'
		where table_name = '!!@table_name!!';
	-- !x! endif
	
	update #ups_proctables
	set processed = 1
	where table_name = '!!@table_name!!';
	-- !x! execute script qa_all_pkloop with (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)
-- !x! endif


-- !x! END SCRIPT
--					QA_ALL_PKLOOP
-- ****************************************************************
-- ****************************************************************
--		Script QA_ALL_FKLOOP
-- ---------------------------------------------------------------
-- !x! BEGIN SCRIPT QA_ALL_FKLOOP with parameters (base_schema, staging, control_table)

-- !x! sub_empty ~ups_error_list

-- Update the status bar if the console is running.
-- !x! if(console_on)
	-- !x! console progress !!$counter_221585944!! / !!upsert_progress_denom!!
-- !x! endif

-- !x! execute script ups_qa_get_toprocess
-- !x! if(hasrows(#ups_toprocess))
	-- !x! select_sub #ups_toprocess
	-- !x! execute script fkqa_one with (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!@table_name!!, display_errors=!!@display_changes!!, error_list=+ups_error_list)
	-- !x! if(not is_null("!!~ups_error_list!!"))
		update !!#control_table!!
		set fk_errors = '!!~ups_error_list!!'
		where table_name = '!!@table_name!!';
	-- !x! endif

	update #ups_proctables
	set processed = 1
	where table_name = '!!@table_name!!';
	-- !x! execute script qa_all_fkloop with (base_schema=!!#base_schema!!, staging=!!#staging!!, control_table=!!#control_table!!)
-- !x! endif

-- !x! END SCRIPT
-- #####################  End of QA_ALL  ###########################
-- #################################################################


-- ################################################################
--			Script UPDTPK_ONE
--
-- Updates primary keys in base table, based on new and older
-- values of PK columns in a staging table, using UPDATE
-- statements.  Displays data to be modified to the
-- user before any modifications are done.  Reports the changes
-- made to the console and optionally to a log file.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		table			: The table name--same for base and staging.
--		display_errors	: A value of 'Yes' or 'No' to indicate whether
--							any errors should be displayed in a GUI.
--		display_changes	: A value of 'Yes' or 'No' to indicate whether
--							or not the changes to be made to the 
--							base table should be displayed in a GUI.
--
--	Global variables:
--		logfile			: The name of a log file to which update
--							messages will be written.  Optional.
--		log_sql			: A value of 'Yes' or 'No' indicating whether
--							the update and insert statements should
--							also be written to the logfile.  Optional.
--		log_changes		: A value of 'Yes' or 'No' indicating whether
--							the updated and inserted data should be
--							written to the logfile.  Optional.
--
-- Tables and views created or modified:
--		ups_pkqa_errors			: temporary table
--		ups_pkcollinfo			: temporary table
--		ups_pkupdates			: temporary table
-- ===============================================================

-- !x! BEGIN SCRIPT UPDTPK_ONE with parameters (base_schema, staging, table, display_errors, display_changes)

-- !x! if(console_on)
	-- !x! console status "Primary key updates"
-- !x! endif

-- Validate inputs: base/staging schemas and table
-- !x! execute script validate_one with args (base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!#table!!, script=!!$CURRENT_SCRIPT!!, script_line=!!$SCRIPT_LINE!!)

-- Write an initial header to the logfile.
-- !x! if(sub_defined(logfile))
	-- !x! write "" to !!logfile!!
	-- !x! write "==================================================================" to !!logfile!!
	-- !x! write "!!$current_time!! -- Performing primary key updates on table !!#base_schema!!.!!#table!! from !!#staging!!.!!#table!!" to !!logfile!!
-- !x! endif

-- !x! write "Performing primary key updates on table !!#base_schema!!.!!#table!! from !!#staging!!.!!#table!!"

-- Create a temp table to store the results of the PK update QA checks
if object_id('tempdb..#ups_pkqa_errors') is not null drop table #ups_pkqa_errors; 
create table #ups_pkqa_errors (
	error_code varchar(40),
	error_description varchar(500)
);

-- Populate a (temporary) table with the names of the primary key columns of the base table.
-- Get the old and new primary key columns from staging table into various formats
-- to use later to construct SQL statement to select records in various ways for both updates and QA checks.
-- Include column lists, join expression, and where clause
if object_id('tempdb..#ups_pkcol_info') is not null drop table #ups_pkcol_info;
select 
	k.table_schema,
	k.table_name,
	k.column_name, 
	cast('b.' + column_name as varchar(max)) as base_aliased,
	cast('s.' + column_name as varchar(max)) as staging_aliased,
	cast('s.' + column_name + ' as staging_' + column_name as varchar(max)) as staging_aliased_prefix,
	cast('b.' + column_name + ' = s.' + column_name as varchar(max)) as join_expr,
	cast('new_' + column_name as varchar(max)) as newpk_col,
	cast('s.new_' + column_name as varchar(max)) as newpk_col_aliased,
	cast('new_' + column_name + ' is null' as varchar(max)) as newpk_col_empty,
	cast('new_' + column_name + ' is not null' as varchar(max)) as newpk_col_not_empty,
	cast('b.' + column_name + ' = s.new_' + column_name as varchar(max)) as assmt_expr,
	cast('s.new_' + column_name + ' = b.new_' + column_name as varchar(max)) as join_expr_new,
	k.ordinal_position
into #ups_pkcol_info
from information_schema.table_constraints as tc
inner join information_schema.key_column_usage as k
	on tc.constraint_type = 'PRIMARY KEY' 
	and tc.constraint_name = k.constraint_name
	and tc.constraint_catalog = k.constraint_catalog
	and tc.constraint_schema = k.constraint_schema
	and tc.table_schema = k.table_schema
	and tc.table_name = k.table_name
	and tc.constraint_name = k.constraint_name
where
	k.table_name = '!!#table!!'
	and k.table_schema = '!!#base_schema!!'
;

-- Run QA checks
-- !x! execute script UPDTPKQA_ONE with arguments(base_schema=!!#base_schema!!, staging=!!#staging!!, table=!!#table!!, pkinfo_table=#ups_pkcol_info, qaerror_table=#ups_pkqa_errors, display_errors=!!#display_errors!!)


-- Run the PK update ONLY if QA check script returned no errors
-- !x! if(not hasrows(#ups_pkqa_errors))
	-- !x! rm_sub ~updatestmt

	-- !x! sub ~do_updates Yes

	-- !x! if(sub_defined(logfile))
		-- !x! write "" to !!logfile!!
		-- !x! write "==================================================================" to !!logfile!!
		-- !x! write "!!$current_time!! -- Performing primary key update on table !!#base_schema!!.!!#table!!" to !!logfile!!
	-- !x! endif

	-- !x! if(console_on)
		-- !x! console status "Performing PK updates"
		-- !x! console progress 0
	-- !x! endif

	-- !x! write "Performing primary key update on table !!#base_schema!!.!!#table!!"

	-- Get list of old primary key columns prefixed with alias
	-- !x! sub_empty ~oldpk_cols 
	-- !x! execute script string_agg with(table_name=#ups_pkcol_info, string_col=base_aliased, order_col=ordinal_position, delimiter=", ", string_var=+oldpk_cols)

	-- Get list of columns containing new primary key values
	-- !x! sub_empty ~newpk_cols
	-- !x! execute script string_agg with(table_name=#ups_pkcol_info, string_col=newpk_col, order_col=ordinal_position, delimiter=", ", string_var=+newpk_cols)

	-- Create a join expression for an inner join between base and staging
	-- !x! sub_empty ~joinexpr 
	-- !x! execute script string_agg with(table_name=#ups_pkcol_info, string_col=join_expr, order_col=ordinal_position, delimiter=" and ", string_var=+joinexpr)

	-- Create a FROM clause for an inner join between base and staging
	-- tables on the primary key column(s).
	-- !x! sub ~fromclause FROM !!#base_schema!!.!!#table!! as b INNER JOIN !!#staging!!.!!#table!! as s ON !!~joinexpr!!


	-- Create a WHERE clause for the rows to include in the selection (only those having new PK columns populated in the staging table)
	-- !x! sub_empty ~wherecondition
	-- !x! execute script string_agg with(table_name=#ups_pkcol_info, string_col=newpk_col_not_empty, order_col=ordinal_position, delimiter=" and ", string_var=+wherecondition)
	-- !x! sub ~whereclause WHERE !!~wherecondition!!


	-- Select all matches for PK update into temp table
	if object_id('tempdb..#ups_pkupdates') is not null drop table #ups_pkupdates;
	select
		!!~oldpk_cols!!,
		!!~newpk_cols!!
	into #ups_pkupdates
	!!~fromclause!!
	!!~whereclause!!
	;


	-- Prompt user to examine matching data and commit, don't commit, or quit.
	-- !x! if(hasrows(#ups_pkupdates))
		-- !x! if(is_true(!!#display_changes!!))
			-- !x! prompt ask "Do you want to make these changes to primary key values for table !!#table!!?" sub ~do_updates display #ups_pkupdates
		-- !x! endif
		-- !x! if(is_true(!!~do_updates!!))
			-- Create an assignment expression to update key columns of the
			-- base table (as b) from "new_" columns of the staging table (as s).
			-- !x! sub_empty ~assmt_expr
			-- !x! execute script string_agg with(table_name=#ups_pkcol_info, string_col=assmt_expr, order_col=ordinal_position, delimiter=", ", string_var=+assmt_expr)
			
			-- Create an UPDATE statement to update PK columns of the base table with
			-- "new" PK columns from the staging table.  No semicolon terminating generated SQL.
			-- !x! sub ~updatestmt UPDATE b SET !!~assmt_expr!! FROM !!#base_schema!!.!!#table!! as b INNER JOIN !!#staging!!.!!#table!! as s on !!~joinexpr!! !!~whereclause!!
			
			
			-- !x! write "Updating !!#base_schema!!.!!#table!!"
			-- !x! if(sub_defined(logfile))
				-- !x! write "" to !!logfile!!
				-- !x! if(sub_defined(log_sql))
				-- !x! andif(is_true(!!log_sql!!))
					-- !x! write "UPDATE statement for !!#base_schema!!.!!#table!!:" to !!logfile!!
					-- !x! write [!!~updatestmt!!] to !!logfile!!
				-- !x! endif
				-- !x! if(sub_defined(log_changes))
				-- !x! andif(is_true(!!log_changes!!))
					-- !x! write "Updates:" to !!logfile!!
					-- !x! export #ups_pkupdates append to !!logfile!! as txt
				-- !x! endif
				-- !x! write "" to !!logfile!!
			-- !x! endif
			!!~updatestmt!!;
			-- -- !x! sub !!#updcntvar!! !!$last_rowcount!!
			-- !x! if(sub_defined(logfile))
				-- !x! write "!!$last_rowcount!! rows of !!#base_schema!!.!!#table!! updated." to !!logfile!!
			-- !x! endif
			-- !x! write "    !!$last_rowcount!! rows updated."	
		-- !x! endif	
	-- !x! else
		--!x! write "No primary key updates specified for existing records in !!#base_schema!!.!!#table!!"	
	-- !x! endif
-- !x! endif


-- !x! END SCRIPT
-- ###################  End of UPDTPK_ONE  ########################
-- ################################################################



-- ################################################################
--			Script UPDTPKQA_ONE
--
-- Performs QA checks on requested primary key updates to a table,
-- based on old and new values of the table's primary key columns
-- in a staging table.
--
-- Input parameters:
--		base_schema		: The name of the base table schema.
--		staging			: The name of the staging schema.
--		table			: The table name--same for base and staging.
--		pkinfo_table	: The name of a temporary table to be passed by
--							the caller that contains information about the table PK,
--							including strings to be used in constructing
--							SQL for checks
--		qaerror_table  	: The name of a temporary table to
--							store any errors found by QA checks.
--		display_errors	: A value of 'Yes' or 'No' to indicate whether
--							any errors should be displayed in a GUI.
--	Output parameters:
--		error_list		: The name of the variable to receive FILL IN.
--
--	Global variables:
--		logfile			: The name of a log file to which update
--							messages will be written.  Optional.
--		log_sql			: A value of 'Yes' or 'No' indicating whether
--							the update and insert statements should
--							also be written to the logfile.  Optional.
--							Currently only writes SQL for foreign key checks
--							(final check) to log.
--		log_errors		: A value of 'Yes' or 'No' to indicate whether
--							errors are written to the logfile. Optional.
--
-- Tables and views created or modified:
--		ups_missing_pk_cols			: temporary table
--		ups_any_pk_cols				: temporary table
-- ===============================================================

-- !x! BEGIN SCRIPT UPDTPKQA_ONE with parameters (base_schema, staging, table, pkinfo_table, qaerror_table, display_errors)


-- Write an initial header to the logfile.
-- !x! if(sub_defined(logfile))
	-- !x! write "" to !!logfile!!
	-- !x! write "==================================================================" to !!logfile!!
	-- !x! write "!!$current_time!! -- QA checks for primary key updates on table !!#table!!" to !!logfile!!
-- !x! endif

-- !x! write "Conducting QA checks on table !!#staging!!.!!#table!! for primary key updates to table !!#base_schema!!.!!#table!!"

-- Initialize the status and progress bars if the console is running.
-- !x! if(console_on)
	-- !x! console status "QA checks for PK updates on !!#base_schema!!.!!#table!!"
-- !x! endif


-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- Check 1 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- No primary key constraint on base table
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- !x! if(not hasrows(!!#pkinfo_table!!))

	-- !x! sub ~error_description No primary key constraint on base table !!#base_schema!!.!!#table!!
	-- !x! write "    !!~error_description!!"
	-- !x! if(sub_defined(logfile))
		-- !x! write "" to !!logfile!!
		-- !x! write "!!~error_description!!" to !!logfile!!
	-- !x! endif
	insert into !!#qaerror_table!! (error_code, error_description)
	values ('No PK on base table', '!!~error_description!!')
	;

-- No other QA checks are conducted if this check fails:
-- Remaining QA checks are conducted ONLY if base table has PK
-- !x! else

	-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
	-- Check 2 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
	-- A "new" PK column exists in staging table for every PK column of base table
	-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
	
	-- Find any MISSING PK columns in staging table
	if object_id('tempdb..#ups_missing_pk_cols') is not null drop table #ups_missing_pk_cols;
	select
		newpk_col, ordinal_position
	into #ups_missing_pk_cols
	from
		--Base table PK columns, with expected name in staging table ("new_" prepended to column name)
		!!#pkinfo_table!! as pk
		--Staging table columns
		left join 
			(
			select table_name, column_name 
			from information_schema.columns
			where
				table_schema = '!!#staging!!'
			) as stag on pk.table_name=stag.table_name and pk.newpk_col=stag.column_name
	where
		stag.column_name is null		
	; 

	-- !x! if(hasrows(#ups_missing_pk_cols))
	
		-- !x! sub_empty ~error_info
		-- !x! execute script string_agg with(table_name=#ups_missing_pk_cols, string_col=newpk_col, order_col=ordinal_position, delimiter=", ", string_var=+error_info)
		
		-- !x! sub ~error_description New primary key column(s) missing from staging table: !!~error_info!!
		
		-- !x! write "    !!~error_description!!"
		-- !x! if(sub_defined(logfile))
			-- !x! write "" to !!logfile!!
			-- !x! write "!!~error_description!!" to !!logfile!!
		-- !x! endif
		insert into !!#qaerror_table!! (error_code, error_description)
		values ('Missing new PK column(s)', '!!~error_description!!')
	;
		
	-- No other QA checks are conducted if this check fails:
	-- Remaining QA checks are all conducted ONLY if all expected "new PK" columns exist in staging table	
	-- !x! else 
	
		-- Library of aggregated strings used to construct SQL for the remaining checks
	
		-- "Old" PK columns, column names only
		-- !x! sub_empty ~old_pkcol
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=column_name, order_col=ordinal_position, delimiter=", ", string_var=+old_pkcol)
		
		-- "Old" PK columns from aliased staging table
		-- !x! sub_empty ~old_pkcol_aliased
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=staging_aliased, order_col=ordinal_position, delimiter=", ", string_var=+old_pkcol_aliased)
		
		-- "Old" PK columns from aliased staging table, with a prefix indicating they're from staging table
		-- !x! sub_empty ~old_pkcol_aliased_prefix
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=staging_aliased_prefix, order_col=ordinal_position, delimiter=", ", string_var=+old_pkcol_aliased_prefix)
		
		-- "New" PK columns from staging table
		-- !x! sub_empty ~new_pkcol
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=newpk_col, order_col=ordinal_position, delimiter=", ", string_var=+new_pkcol)
		
		-- "NEW" PK columns from aliased staging table
		-- !x! sub_empty ~new_pkcol_aliased
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=newpk_col_aliased, order_col=ordinal_position, delimiter=", ", string_var=+new_pkcol_aliased)
		
		-- Just base table
		-- !x! sub ~base_table !!#base_schema!!.!!#table!!
		
		-- Just staging table
		-- !x! sub ~staging_table !!#staging!!.!!#table!!
		
		-- Join condition: Base to staging on original (not "new_") PK columns
		-- !x! sub_empty ~joincond_origorig
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=join_expr, order_col=ordinal_position, delimiter=" and ", string_var=+joincond_origorig)
		
		-- Join condition: Base ORIG PK to staging NEW PK
		-- !x! sub_empty ~joincond_oldnew
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=assmt_expr, order_col=ordinal_position, delimiter=" and ", string_var=+joincond_oldnew)
		
		-- Join condition: Two instances of NEW PK columns from staging table
		-- !x! sub_empty ~joincond_newnew 
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=join_expr_new, order_col=ordinal_position, delimiter=" and ", string_var=+joincond_newnew)
		
		-- Clause: ANY new PK cols populated
		-- !x! sub_empty ~any_newpk_col_not_empty
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=newpk_col_not_empty, order_col=ordinal_position, delimiter=" or ", string_var=+any_newpk_col_not_empty)
		
		-- Clause: ALL new PK cols populated
		-- !x! sub_empty ~all_newpk_col_not_empty
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=newpk_col_not_empty, order_col=ordinal_position, delimiter=" and ", string_var=+all_newpk_col_not_empty)
		
		-- Clause: ANY new PK cols empty
		-- !x! sub_empty ~any_newpk_col_empty
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=newpk_col_empty, order_col=ordinal_position, delimiter=" or ", string_var=+any_newpk_col_empty)
		
		-- Clause: ALL new PK cols empty
		-- !x! sub_empty ~all_newpk_col_empty
		-- !x! execute script string_agg with(table_name=!!#pkinfo_table!!, string_col=newpk_col_empty, order_col=ordinal_position, delimiter=" and ", string_var=+all_newpk_col_empty)
		
		-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
		-- Check 3 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
		-- There are any rows with PK updates specified.
		-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
	
		-- Find any populated new PK columns in staging table
		if object_id('tempdb..#ups_any_pk_cols') is not null drop table #ups_any_pk_cols;
		select * into #ups_any_pk_cols
		from !!~staging_table!! 
		where !!~any_newpk_col_not_empty!!;
		-- !x! if(not hasrows(#ups_any_pk_cols))
			-- !x! sub ~error_description No primary key updates specified in !!#staging!!.!!#table!!
			-- !x! write "    !!~error_description!!"
			-- !x! if(sub_defined(logfile))
				-- !x! write "" to !!logfile!!
				-- !x! write "!!~error_description!!" to !!logfile!!
			-- !x! endif
			insert into !!#qaerror_table!! (error_code, error_description)
			values ('No PK updates specified in staging table', '!!~error_description!!')
			;
		-- No other QA checks are conducted if this check fails
		-- !x! else
			
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Check 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Where any "new" PK column is populated in the staging table, they are all populated.
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

			-- Construct SQL statement looking for any NULLs in "new" PK columns in rows where any PK columns are populated
			-- Find any EMPTY PK columns in staging table
			if object_id('tempdb..#ups_empty_pk_cols') is not null drop table #ups_empty_pk_cols;
			select
				!!~old_pkcol!!,
				!!~new_pkcol!!
			into #ups_empty_pk_cols
			from	
				!!~staging_table!! 
			where
				not (!!~all_newpk_col_empty!!)
				and (!!~any_newpk_col_empty!!)
			;

			-- !x! if(hasrows(#ups_empty_pk_cols))
				if object_id('tempdb..#ups_empty_pk_cols_rwcnt') is not null drop table #ups_empty_pk_cols_rwcnt;
				select count(*) as rwcnt
				into #ups_empty_pk_cols_rwcnt
				from #ups_empty_pk_cols
				;
				-- !x! subdata ~rowcount #ups_empty_pk_cols_rwcnt
				-- !x! sub ~error_description Missing values in new PK columns in !!#staging!!.!!#table!!: !!~rowcount!! row(s)
				-- !x! write "    !!~error_description!!"
				insert into !!#qaerror_table!! (error_code, error_description)
				values ('Incomplete mapping', '!!~error_description!!')
				;	
				-- !x! if(sub_defined(logfile))
					-- !x! write "" to !!logfile!!
					-- !x! write "!!~error_description!!" to !!logfile!!
					-- !x! if(sub_defined(log_errors))
					-- !x! andif(is_true(!!log_errors!!))
						-- !x! export #ups_empty_pk_cols append to !!logfile!! as txt
					-- !x! endif
				-- !x! endif
				-- !x! if(is_true(!!#display_errors!!))
					-- !x! prompt message "Missing values in new PK columns in !!#staging!!.!!#table!!" display #ups_empty_pk_cols
				-- !x! endif	
			-- !x! endif
			
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Check 5 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Where any "new" PK column is populated in the staging table, the value of the original PK for that row is valid
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			
			-- New PK col in staging table are not empty
			if object_id('tempdb..#ups_old_pks_wc') is not null drop table #ups_old_pks_wc;
			select top 1 base_aliased
			into #ups_old_pks_wc
			from !!#pkinfo_table!!
			order by ordinal_position;
			-- !x! subdata ~old_pk_firstcol #ups_old_pks_wc	
			
			
			if object_id('tempdb..#ups_invalid_old_pks') is not null drop table #ups_invalid_old_pks;
			select
				!!~old_pkcol_aliased!!,
				!!~new_pkcol!!
			into #ups_invalid_old_pks
			from !!~staging_table!! as s
					left join !!~base_table!! as b on !!~joincond_origorig!!
			where !!~all_newpk_col_not_empty!! and !!~old_pk_firstcol!! is null
			;
			
			-- !x! if(hasrows(#ups_invalid_old_pks))
				if object_id('tempdb..#ups_invld_pk_rwcnt') is not null drop table #ups_invld_pk_rwcnt;
				select count(*) as rwcnt
				into #ups_invld_pk_rwcnt
				from #ups_invalid_old_pks
				;
				-- !x! subdata ~rowcount #ups_invld_pk_rwcnt
				-- !x! sub ~error_description Invalid original PK in !!#staging!!.!!#table!!: !!~rowcount!! row(s)
				-- !x! write "    !!~error_description!!"
				insert into !!#qaerror_table!! (error_code, error_description)
				values ('Invalid old PK value', '!!~error_description!!')
				;	
				-- !x! if(sub_defined(logfile))
					-- !x! write "" to !!logfile!!
					-- !x! write "!!~error_description!!" to !!logfile!!
					-- !x! if(sub_defined(log_errors))
					-- !x! andif(is_true(!!log_errors!!))
						-- !x! export #ups_invalid_old_pks append to !!logfile!! as txt
					-- !x! endif
				-- !x! endif
				-- !x! if(is_true(!!#display_errors!!))
					-- !x! prompt message "Invalid original PK in !!#staging!!.!!#table!!" display #ups_invalid_old_pks
				-- !x! endif		
			-- !x! endif
			
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Check 6 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- None of the "new" PK values already exist in the base table
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			if object_id('tempdb..#ups_existing_new_pks') is not null drop table #ups_existing_new_pks;
			select 
				!!~old_pkcol_aliased_prefix!!,
				!!~new_pkcol!!,
				b.*
			into #ups_existing_new_pks
			from !!~staging_table!! as s
					inner join !!~base_table!! as b on !!~joincond_oldnew!!
			;
				
			-- !x! if(hasrows(#ups_existing_new_pks))
				if object_id('tempdb..#ups_exst_nwpk_rwcnt') is not null drop table #ups_exst_nwpk_rwcnt;
				select count(*) as rwcnt
				into #ups_exst_nwpk_rwcnt
				from #ups_existing_new_pks
				;
				-- !x! subdata ~rowcount #ups_exst_nwpk_rwcnt
				-- !x! sub ~error_description New PK already exists in !!#base_schema!!.!!#table!!: !!~rowcount!! row(s)
				-- !x! write "    !!~error_description!!"
				insert into !!#qaerror_table!! (error_code, error_description)
				values ('Existing new PK value', '!!~error_description!!')
				;	
				-- !x! if(sub_defined(logfile))
					-- !x! write "" to !!logfile!!
					-- !x! write "!!~error_description!!" to !!logfile!!
					-- !x! if(sub_defined(log_errors))
					-- !x! andif(is_true(!!log_errors!!))
						-- !x! export #ups_existing_new_pks append to !!logfile!! as txt
					-- !x! endif
				-- !x! endif
				-- !x! if(is_true(!!#display_errors!!))
					-- !x! prompt message "New PK already exists in !!#base_schema!!.!!#table!!" display #ups_existing_new_pks
				-- !x! endif		
			-- !x! endif
			
			
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Check 7 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- No two (or more) original PK values map to same new PK value
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			if object_id('tempdb..#ups_pk_mapping_conflict') is not null drop table #ups_pk_mapping_conflict;
			select
				!!~old_pkcol_aliased!!,
				!!~new_pkcol_aliased!!
			into #ups_pk_mapping_conflict
			from !!~staging_table!! as s
				inner join 
				(
				select 
					!!~new_pkcol!!
				from 
				(select distinct !!~old_pkcol!!, !!~new_pkcol!! from !!~staging_table!! where !!~all_newpk_col_not_empty!!) as a
				group by 
					!!~new_pkcol!!
				having count(*) >1
				) as b on !!~joincond_newnew!!
			;
			
			-- !x! if(hasrows(#ups_pk_mapping_conflict))
				if object_id('tempdb..#ups_map_conf_rwcnt') is not null drop table #ups_map_conf_rwcnt;
				select count(*) as rwcnt
				into #ups_map_conf_rwcnt
				from #ups_pk_mapping_conflict
				;
				-- !x! subdata ~rowcount #ups_map_conf_rwcnt
				-- !x! sub ~error_description Multiple original PKs mapped to same new PK in !!#staging!!.!!#table!!: !!~rowcount!! row(s)
				-- !x! write "    !!~error_description!!"
				insert into !!#qaerror_table!! (error_code, error_description)
				values ('Mapping conflict', '!!~error_description!!')
				;	
				-- !x! if(sub_defined(logfile))
					-- !x! write "" to !!logfile!!
					-- !x! write "!!~error_description!!" to !!logfile!!
					-- !x! if(sub_defined(log_errors))
					-- !x! andif(is_true(!!log_errors!!))
						-- !x! export #ups_pk_mapping_conflict append to !!logfile!! as txt
					-- !x! endif
				-- !x! endif
				-- !x! if(is_true(!!#display_errors!!))
					-- !x! prompt message "Multiple original PKs mapped to same new PK in !!#staging!!.!!#table!!" display #ups_pk_mapping_conflict
				-- !x! endif		
			-- !x! endif
			
			
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Check 8 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- No single original PK value maps to multiple new PK values
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			
			if object_id('tempdb..#ups_pk_duplicate_keys') is not null drop table #ups_pk_duplicate_keys;
			select
				!!~old_pkcol_aliased!!,
				!!~new_pkcol_aliased!!
			into #ups_pk_duplicate_keys
			from !!~staging_table!! as s
				inner join
				(
				select
					!!~old_pkcol!!
				from
				(select distinct !!~old_pkcol!!, !!~new_pkcol!! from !!~staging_table!! where !!~all_newpk_col_not_empty!!) as a
				group by 
					!!~old_pkcol!!
				having count(*)>1
				) as b on !!~joincond_origorig!!
			;
			
			-- !x! if(hasrows(#ups_pk_duplicate_keys))
				if object_id('tempdb..#ups_dup_key_rwcnt') is not null drop table #ups_dup_key_rwcnt;
				select count(*) as rwcnt
				into #ups_dup_key_rwcnt
				from #ups_pk_duplicate_keys
				;
				-- !x! subdata ~rowcount #ups_dup_key_rwcnt
				-- !x! sub ~error_description Original PK mapped to multiple new PKs in !!#staging!!.!!#table!!: !!~rowcount!! row(s)
				-- !x! write "    !!~error_description!!"
				insert into !!#qaerror_table!! (error_code, error_description)
				values ('Duplicate keys', '!!~error_description!!')
				;	
				-- !x! if(sub_defined(logfile))
					-- !x! write "" to !!logfile!!
					-- !x! write "!!~error_description!!" to !!logfile!!
					-- !x! if(sub_defined(log_errors))
					-- !x! andif(is_true(!!log_errors!!))
						-- !x! export #ups_pk_duplicate_keys append to !!logfile!! as txt
					-- !x! endif
				-- !x! endif
				-- !x! if(is_true(!!#display_errors!!))
					-- !x! prompt message "Original PK mapped to multiple new PKs in !!#staging!!.!!#table!!" display #ups_pk_duplicate_keys
				-- !x! endif		
			-- !x! endif
			
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- Check 9 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			-- If any of the PK columns reference a parent table, all the "new" values of that column are valid
			-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
			
			-- Get ALL column references for the base table
			if object_id('tempdb..#ups_fkcol_refs') is not null drop table #ups_fkcol_refs;
			select 
				object_name(fk.constraint_object_id) as fk_constraint,
				'!!#staging!!' as staging_schema,
				schema_name(t.schema_id) as table_schema,
				t.name as table_name,
				cc.name as column_name,
				cc.column_id,
				schema_name(op.schema_id) as parent_schema,
				object_name(referenced_object_id) as parent_table,
				cp.name as parent_column,
				cp.column_id as parent_column_id
			into #ups_fkcol_refs
			from
				sys.tables as t
				inner join sys.foreign_key_columns as fk on fk.parent_object_id=t.object_id
				inner join sys.columns as cc on fk.parent_object_id=cc.object_id and fk.parent_column_id=cc.column_id
				inner join sys.columns as cp on fk.referenced_object_id=cp.object_id and fk.referenced_column_id=cp.column_id
				inner join sys.objects as op on op.object_id=cp.object_id
			where
				schema_name(t.schema_id)='!!#base_schema!!'
				and t.name = '!!#table!!'
			;	
			
			-- Narrow the list down to ONLY dependencies that affect PK columns
			-- Include not just the PK columns themselves, but ALL columns included in FKs
			-- that include ANY PK columns (probably rare/unlikely that a non-PK column would be
			-- part of the same foreign key as a PK column, but this ensures that ALL columns of the FK 
			-- are included, whether or not the column is part of the PK)
			if object_id('tempdb..#ups_pkcol_deps') is not null drop table #ups_pkcol_deps;
			select
				refs.*
			into #ups_pkcol_deps
			from 
				#ups_fkcol_refs as refs
				inner join 
				--Distinct list of FK constraints on the table that include ANY PK columns
				(
				select distinct 
					fk_constraint, r.table_schema, r.table_name 
				from 
					#ups_fkcol_refs as r
					inner join 	#ups_pkcol_info as p on r.table_schema=p.TABLE_SCHEMA and r.table_name=p.table_name and r.column_name=p.column_name	
				) as const on refs.fk_constraint=const.fk_constraint and refs.table_schema=const.table_schema and refs.table_name=const.table_name
			;
			
			-- Create a control table for looping to check each fk
			-- Include (for later use) some of the constructed strings that apply to the entire PK (not
			-- just the FK being checked)
			if object_id('tempdb..#ups_pkfk_ctrl') is not null drop table #ups_pkfk_ctrl;
			select 
				fk_constraint, 
				staging_schema, table_schema, table_name, parent_schema, parent_table,
				min(parent_column) as any_referenced_column,
				'!!~old_pkcol_aliased!!' as old_pkcol_aliased,
				'!!~new_pkcol!!' as new_pkcol,
				'!!~all_newpk_col_not_empty!!' as all_newpk_col_not_empty,
				cast(0 as bit) as processed
			into #ups_pkfk_ctrl
			from #ups_pkcol_deps
			group by	
				fk_constraint, 
				staging_schema, table_schema, table_name, parent_schema, parent_table;
				
			-- Create a script to select one constraint to process
			-- !x! begin script ups_get_next_fk
			if object_id('tempdb..#ups_next_fk') is not null drop table #ups_next_fk;
			select top 1 *
			into #ups_next_fk
			from #ups_pkfk_ctrl
			where not processed=1;
			-- !x! end script	
			
			--Process all constraints: check every foreign key
			--!x! execute script updtpkqa_one_innerloop with (qaerror_table=!!#qaerror_table!!, display_errors=!!#display_errors!!)
		-- !x! endif
	-- !x! endif
-- !x! endif
-- !x! END SCRIPT
-- ###################  UPDTPKQA_ONE  ########################
-- ################################################################
--			Script UPDTPKQA_ONE_INNERLOOP
-- ----------------------------------------------------------------
-- !x! BEGIN SCRIPT UPDTPKQA_ONE_INNERLOOP with parameters(qaerror_table, display_errors)

-- !x! execute script ups_get_next_fk
-- !x! if(hasrows(#ups_next_fk))

	-- !x! select_sub #ups_next_fk
	
	-- Compile FK info for the selected constraint
	if object_id('tempdb..#ups_sel_fk_cols') is not null drop table #ups_sel_fk_cols;
	select
		staging_schema, fk_constraint, table_schema, table_name, column_name, column_id, parent_schema,
		parent_table, parent_column, parent_column_id,
		's.new_' + column_name + '=' + 'b.' + parent_column as join_condition
	into #ups_sel_fk_cols
	from #ups_pkcol_deps
	where fk_constraint='!!@fk_constraint!!'
	;
	
	-- Construct SQL to check the selected FK
	
	-- !x! sub_empty ~referenced_cols
	-- !x! execute script string_agg with (table_name=#ups_sel_fk_cols, string_col=parent_column, order_col=column_id, delimiter=", ", string_var=+referenced_cols)
	
	-- !x! sub_empty ~join_condition
	-- !x! execute script string_agg with (table_name=#ups_sel_fk_cols, string_col=join_condition, order_col=column_id, delimiter=" and ", string_var=+join_condition)
	
	
	-- !x! sub ~select_stmt select !!@old_pkcol_aliased!!, !!@new_pkcol!! into #ups_pk_fk_check from !!@staging_schema!!.!!@table_name!! as s
	-- !x! sub ~join_stmt left join !!@parent_schema!!.!!@parent_table!! as b on !!~join_condition!!
	-- !x! sub ~where_clause where !!@all_newpk_col_not_empty!! and b.!!@any_referenced_column!! is null
	
	-- !x! sub ~fk_check if object_id('tempdb..#ups_pk_fk_check') is not null drop table #ups_pk_fk_check;
	-- !x! sub_append ~fk_check !!~select_stmt!!
	-- !x! sub_append ~fk_check !!~join_stmt!!
	-- !x! sub_append ~fk_check !!~where_clause!!
	
	-- Write the SQL to the log file if requested.
	-- !x! if(sub_defined(logfile))
	-- !x! andif(sub_defined(log_sql))
	-- !x! andif(is_true(!!log_sql!!))
		-- !x! write "" to !!logfile!!
		-- !x! write "SQL for checking foreign key !!@fk_constraint!! for PK update to !!@table_schema!!.!!@table_name!!:" to !!logfile!!
		-- !x! write [!!~fk_check!!] to !!logfile!!
	-- !x! endif
	
	-- Run the check
	!!~fk_check!!;
		
	-- !x! if(hasrows(#ups_pk_fk_check))
		
		if object_id('tempdb..#ups_pk_fk_check_rwcnt') is not null drop table #ups_pk_fk_check_rwcnt;
		select count(*) as rwcnt
		into #ups_pk_fk_check_rwcnt
		from #ups_pk_fk_check
		;
		-- !x! subdata ~rowcount #ups_pk_fk_check_rwcnt
		-- !x! sub ~error_description !!@parent_schema!!.!!@parent_table!! (!!~referenced_cols!!): !!~rowcount!! row(s)
		
		-- !x! write "    Violation of foreign key !!@fk_constraint!! in new primary key columns in !!@staging_schema!!.!!@table_name!! referencing !!@parent_schema!!.!!@parent_table!!: !!~rowcount!! row(s)"
		
		if object_id('tempdb..#ups_pk_fk_qa_error') is not null drop table #ups_pk_fk_qa_error;
		select 
			error_code, error_description
		into #ups_pk_fk_qa_error
		from !!#qaerror_table!!
		where error_code='Invalid reference to parent table(s)';
		-- !x! if(hasrows(#ups_pk_fk_qa_error))
			update !!#qaerror_table!!
			set error_description=error_description + '; !!~error_description!!'
			where error_code='Invalid reference to parent table(s)';
		
		-- !x! else
			insert into !!#qaerror_table!! (error_code, error_description)
			values ('Invalid reference to parent table(s)', '!!~error_description!!')
			;			
		-- !x! endif
		
	
		-- !x! if(sub_defined(logfile))
			-- !x! write "" to !!logfile!!
			-- !x! write "Violation of foreign key !!@fk_constraint!! in new primary key columns in !!@staging_schema!!.!!@table_name!! referencing !!@parent_schema!!.!!@parent_table!!: !!~rowcount!! row(s)" to !!logfile!!
			-- !x! if(sub_defined(log_errors))
			-- !x! andif(is_true(!!log_errors!!))
				-- !x! export #ups_pk_fk_check append to !!logfile!! as txt
			-- !x! endif
		-- !x! endif
		-- !x! if(is_true(!!#display_errors!!))
			-- !x! prompt message "Violation of foreign key !!@fk_constraint!! in  new primary key columns in !!@staging_schema!!.!!@table_name!! referencing !!@parent_schema!!.!!@parent_table!!" display #ups_pk_fk_check
		-- !x! endif	
	
	-- !x! endif

	-- Mark constraint as processed
	update #ups_pkfk_ctrl
	set processed=1
	where fk_constraint='!!@fk_constraint!!';

	--LOOP
	-- !x! execute script updtpkqa_one_innerloop with (qaerror_table=!!#qaerror_table!!, display_errors=!!#display_errors!!)
	
-- !x! endif

-- !x! END SCRIPT
-- ####################  End of UPDTPKQA_ONE  ########################
-- ################################################################