By: Ian Fogelman | Updated: 2020-07-07 | Comments (4) | Related: > Change Data Capture
Problem
In this tip we review a concept of change data capture in SQL Server utilizing a custom-made stored procedure. This concept is inspired by the CDC control flow task in SQL Server Integration Services (SSIS). Change data capture aka CDC is a feature enabled at a SQL Server database and table level, it allows you to monitor changes (UPDATES, INSERTS, DELETES) from a target table to help monitor data changes. CDC is a great feature, but I wanted to capture the same functionality in a stored procedure and this tip will review how that can be done.
Solution
The below diagram shows what we want to accomplish. Take data from our OLTP system and move the data (UPDATES, INSERTS, DELETES) to our OLAP system.
To accomplish this data comparison, we will utilize the system views and retrieve all columns except for identity columns and the checksum column of our source table. All those values will be concatenated and plugged into the HASHBYTES function returning a singular checksum value. That checksum value will be the determining factor if a row will be updated or not.
The stored procedure will then check an OLTP view and dynamically merge the data into our data OLTP Employees.
First the requirements:
- 1 Target Table with a primary key in this case EmployeeID
- 1 Source Table
- 1 Stored Procedure
- Access to sys.obects / sys.columns DMVs
Create database, tables and data
First, we will create a sample database, two tables and insert one record as shown below.
CREATE DATABASE Example GO USE Example GO -- this is the target with an additional CHECKSUM column CREATE TABLE EMPLOYEES_Target ( EMPLOYEEID VARCHAR(12) PRIMARY KEY, FIRSTNAME VARCHAR(50), LASTNAME VARCHAR(50), PHONENUM VARCHAR(20), BUILDING INT, CHECKSUM VARCHAR(MAX) ) -- this is the source it can be on any server or database CREATE TABLE EMPLOYEES_Source ( EMPLOYEEID VARCHAR(12) PRIMARY KEY, FIRSTNAME VARCHAR(50), LASTNAME VARCHAR(50), PHONENUM VARCHAR(20), BUILDING INT ) -- insert one source record INSERT INTO EMPLOYEES_Source VALUES('1','IAN','FOGELMAN','',100) -- select data from tables SELECT * FROM EMPLOYEES_Target SELECT * FROM EMPLOYEES_Source
Create procedure that will refresh target table
Below is the SQL Server stored procedure code that will determine if an INSERT or UPDATE should occur.
CREATE PROC [dbo].[SPX_REFRESH_TABLE] (@TARGET VARCHAR(MAX),@SOURCE VARCHAR(MAX),@PK VARCHAR(MAX)) --PARAMETERS INCLUDE TARGET (TABLE), SOURCE (VIEW OF INCOMING DATA), PRIMARY KEY THE SINGLE COLUMN ID OF THE INCOMING DATA. AS IF OBJECT_ID('tempdb..#temp_CHECKSUM') IS NOT NULL DROP TABLE #temp_CHECKSUM CREATE TABLE #temp_CHECKSUM ( PKEY VARCHAR(MAX), --CHANGE TO DYNAMIC FOR PK CHECKSUM VARCHAR(MAX) ) DECLARE @COLUMNS VARCHAR(MAX),@SQL VARCHAR(MAX),@OID BIGINT --TARGET AND SOURCE HAVE THE SAME COLUMNS... SET @OID = (SELECT object_id FROM sys.objects WHERE name = @TARGET AND TYPE = 'U') SET @COLUMNS = (SELECT STUFF((SELECT ',[' + NAME + ']' FROM sys.all_columns WHERE OBJECT_ID = @OID AND is_identity <> 1 AND NAME != 'CheckSum' AND NAME != 'STARTTIME' AND NAME != 'ENDTIME' ORDER BY NAME FOR XML PATH('')), 1, 1, '')) SET @SQL = 'SELECT [' + @PK + '],CONVERT(VARCHAR(32),HASHBYTES(''MD5'',CONCAT(' + @COLUMNS + ')),2) AS CHECKSUM FROM ' + @SOURCE INSERT INTO #temp_CHECKSUM EXEC(@SQL) IF OBJECT_ID('tempdb..##TEMP_B') IS NOT NULL DROP TABLE ##TEMP_B PRINT '---' SET @SQL = 'SELECT X.*,TIC.CHECKSUM INTO ##TEMP_OLTP FROM(SELECT * FROM '+ @SOURCE + ') AS X JOIN #temp_CHECKSUM AS TIC ON X.['+@PK+'] = TIC.PKEY' EXEC(@SQL) DECLARE @COLUMNSUPDATE VARCHAR(MAX),@COLUMNSINSERT VARCHAR(MAX),@COLUMNSVALUES VARCHAR(MAX),@SQL2 VARCHAR(MAX),@OUTPUTCOLUMNS VARCHAR(MAX) SET @OID = (SELECT object_id FROM sys.objects WHERE name = @TARGET AND TYPE = 'U') SET @COLUMNSUPDATE = (SELECT STUFF(( SELECT ',TARGET.[' + NAME + '] = SOURCE.[' + NAME + '] AND ' FROM sys.all_columns WHERE OBJECT_ID = @OID AND is_identity != 1 ORDER BY COLUMN_ID FOR XML PATH('')), 1, 1, '')) SET @COLUMNSINSERT = (SELECT STUFF((SELECT ',[' + NAME + ']' FROM sys.all_columns WHERE OBJECT_ID = @OID AND is_identity != 1 ORDER BY COLUMN_ID FOR XML PATH('')), 1, 1, '')) SET @COLUMNSVALUES = (SELECT STUFF((SELECT ',SOURCE.[' + NAME + ']' FROM sys.all_columns WHERE OBJECT_ID = @OID AND is_identity != 1 ORDER BY COLUMN_ID FOR XML PATH('')), 1, 1, '')) SET @OUTPUTCOLUMNS = (SELECT STUFF((SELECT ',CASE WHEN $ACTION = ''INSERT'' THEN INSERTED.[' + NAME + '] ELSE DELETED.[' + NAME + '] END AS [' + NAME + ']' FROM sys.all_columns WHERE OBJECT_ID = @OID AND is_identity != 1 ORDER BY COLUMN_ID FOR XML PATH('')), 1, 1, '')) SET @SQL2 = 'MERGE '+ @TARGET +' AS TARGET USING ##TEMP_OLTP AS SOURCE ON (TARGET.['+ @PK +'] = SOURCE.[' + @PK + ']) WHEN MATCHED AND (TARGET.CHECKSUM != SOURCE.CHECKSUM) THEN UPDATE SET ' + REPLACE(@COLUMNSUPDATE,'AND','') + ' WHEN NOT MATCHED BY SOURCE THEN DELETE WHEN NOT MATCHED BY TARGET THEN INSERT ('+ @COLUMNSINSERT +') VALUES ('+ @COLUMNSVALUES +') OUTPUT $ACTION, ' + @OUTPUTCOLUMNS + ';' PRINT @SQL2 EXEC(@SQL2) DROP TABLE ##TEMP_OLTP GO
Executing Stored Procedure
Now we fire the stored procedure, using the OUTPUT clause on the MERGE statement, we will return the appropriate action for the data difference between our view and target table. In this instance since the view representing our production data is different, we can expect an update to the target table from the source view.
--FIRST EXECUTE, SINGLE INSERT EXEC SPX_REFRESH_TABLE 'EMPLOYEES_Target','EMPLOYEES_Source','EmployeeId'
Below we can see the row was inserted, which was the row we inserted above into the source table.
Let's say the source record was updated as follows and then we run the stored procedure again.
--SECOND EXECUTE, SINGLE UPDATE UPDATE EMPLOYEES_Source SET LastName = 'FOG', PhoneNum = '1231231234' EXEC SPX_REFRESH_TABLE 'EMPLOYEES_Target','EMPLOYEES_Source','EmployeeId'
And voila, an update is indeed made, the employees table had a value of "Fogelman" while the OLTP live view has a value of "FOG". This will cause the checksums to be different and in return trigger an UPDATE.
Then let's insert a record to the source table and run the procedure again.
--THIRD EXECUTE AN ADDITION EMPLOYEE TO SOURCE INSERT INTO EMPLOYEES_Source VALUES ('2', 'JIM', 'BROWN', '4354354356', 200) EXEC SPX_REFRESH_TABLE 'EMPLOYEES_Target','EMPLOYEES_Source','EmployeeId'
We can see the new row was inserted.
--FOURTH SHOW DELETE AND INSERT LOGGING, EMPID 2 IS DELETED, 3 & 4 ARE INSERTED INSERT INTO EMPLOYEES_Source VALUES('3','BOB','WILSON','341234123',300) INSERT INTO EMPLOYEES_Source VALUES('4','SAMMY','WHITE','127896543',300) UPDATE EMPLOYEES_Source SET LASTNAME = 'Fogelman' WHERE EMPLOYEEID = 1 DELETE FROM EMPLOYEES_Source WHERE EMPLOYEEID = 2 EXEC SPX_REFRESH_TABLE 'EMPLOYEES_Target','EMPLOYEES_Source','EmployeeID'
We can then use this to see what the final tables look like.
--FINAL CHECK ON EMPLOYEES_Target Table SELECT * FROM EMPLOYEES_Target SELECT * FROM EMPLOYEES_Source
And that’s it, this is a small working concept of CDC achieved with some dynamic SQL.
Next Steps
- Test this out with some of your tables.
- This was a small dataset, so try working with larger tables to see how this works.
About the author
This author pledges the content of this article is based on professional experience and not AI generated.
View all my tips
Article Last Updated: 2020-07-07