Process Change Data Capture in SQL Server Integration Services

By:   |   Updated: 2009-05-20   |   Comments (24)   |   Related: 1 | 2 | 3 | 4 | > Change Data Capture


Problem

I have a requirement to track all changes to specific tables.  The changes need to be stored in an audit table.  It looks like the Change Data Capture capability in SQL Server 2008 could be used to get the changes but I need a way of copying the changes to the audit table.  How can I do that? 

Solution

Change Data Capture (CDC) is a new capability in SQL Server 2008 Enterprise Edition and can be used to track all inserts, updates and deletes to a table.  A SQL Server Integration Services (SSIS) package would be a good choice for populating an audit table with the CDC data.  Let's discuss a scenario then walk through a solution.

Assume that we want to store all changes to our customer table in a customer_audit table.  We'll write an SSIS package to query the CDC data and copy it to the customer_audit table.  We would like the option of running the SSIS package on demand and/or on a schedule (e.g. a SQL Agent job).  Each time we run the SSIS package we want to pickup whatever changed since the last time we ran the package.

Customer Tables

We will use the following customer table:

create table dbo.customer (
 customer_id int identity primary key not null
, name  nvarchar(50) not null
, sales_rep nvarchar(50) not null
, region  nvarchar(50) not null
, credit_limit int not null
)

We will use the following customer_audit table to store changes to the customer table:

create table dbo.customer_audit (
 customer_audit_id int identity primary key not null
, customer_id  int not null
, name   nvarchar(50) not null
, sales_rep  nvarchar(50) not null
, region   nvarchar(50) not null
, credit_limit  int not null
, effective_date  datetime not null
, __$start_lsn  binary(10) not null
, __$seqval  binary(10) not null
, __$operation  int not null
, __$update_mask  varbinary(128) not null
)

The customer_audit table has each column from the customer table as well as the following additional columns provided by CDC:

  • effective_date will be populated from the cdc.lsn_time_mapping table (described later).  It is the date and time of the transaction that modified the source table.

  • __$start_lsn is the log sequence number from the transaction log.

  • __$seqval provides an ordering of the changes within a transaction.

  • __$operation has one of the following values: 1=delete, 2=insert, 3=update (before values) and 4=update (after values).

  • __$update_mask is a bit mask where every column that was changed is set to 1.

Enabling CDC

CDC needs to be enabled at the database level and for each table that you want to track changes. CDC provides a stored procedure for each.  Here is a sample script to enable CDC for our mssqltips database and the customer table:

use mssqltips
go
exec sys.sp_cdc_enable_db
exec sys.sp_cdc_enable_table
 @source_schema = N'dbo'
,@source_name = N'customer'
,@role_name = N'cdc_admin'
,@capture_instance = N'customer_all'
,@supports_net_changes = 1
,@index_name = NULL
,@captured_column_list = NULL
,@filegroup_name = NULL

The main points about the above script are:

  • The stored procedure sys.sp_cdc_enable_db enables CDC for the current database.

  • The stored procedure sys.sp_cdc_enable_table enables CDC for a table.

  • A database role is created for the role_name parameter; members of this role have access to the CDC data.

  • The capture_instance parameter is used to identify the CDC data; you can have two per table.

  • Set supports_net_changes to 1 to optionally get the accumulation of changes to a row in a single row.

  • You must specify a unique index if the table does not have a primary key.

  • You can specify a list of columns to track or NULL to track all columns.

  • You can specify a filegroup for the CDC files or NULL to use the default.

For additional details check Books on Line for the sys.sp_cdc_enable_db and sys.sp_cdc_enable_table stored procedures. 

Make sure that SQL Agent is running; CDC creates two SQL Agent jobs; one scans the transaction log and copies changes to enabled tables to individual change tables in the cdc schema.  A second SQL Agent job clears out the individual change tables in the cdc schema.  You can view the default values used in these SQL Agent jobs by executing the sys.sp_cdc_help_jobs stored procedure.  You can change the default values by executing the cdc_jobs stored procedure.  By default the cleanup job removes change data after it is 3 days old.

If you enable change data capture on a table and SQL Agent is not running, you will see the following warning message:

SQLServerAgent is not currently running so it cannot be notified of this action.

Logging

In order to allow our SSIS package to pickup just the changes since the last time it was run, we'll populate a log table with the data we need.  We'll use the following log table:

create table dbo.cdc_capture_log (
 cdc_capture_log_id int identity not null
, capture_instance nvarchar(50) not null 
, start_time  datetime not null 
, min_lsn   binary(10) not null
, max_lsn   binary(10) not null
, end_time  datetime null
, insert_count  int not null default 0
, update_count  int not null default 0
, delete_count  int not null default 0
, status_code  int not null default 0
)

The main points about the log table are:

  • capture_instance is the value specified when enabling CDC on the table.

  • start_time and end_time are recorded to allow us to track how long it takes to copy the CDC data to our audit table.

  • min_lsn and max_lsn represent the range of log sequence numbers (LSN) to copy.  LSNs uniquely identify changes in the transaction log.  CDC records the LSN with each change.  We derive the min_lsn from the max_lsn of the last time our SSIS package was run.  CDC provides the function sys.fn_cdc_get_max_lsn to retrieve the maximum LSN.

  • insert_count, update_count and delete_count record the counts each time we run the SSIS package.

  • status_code is set to 1 when the SSIS package completes successfully.

We'll create two stored procedures to maintain the log:

  • init_cdc_capture_log will create a new row.

  • end_cdc_capture_log will update the row.

The init_cdc_capture_log is called at the beginning of our SSIS package.  It is shown below:

create procedure dbo.init_cdc_capture_log
  @capture_instance nvarchar(50)
as
begin
 set nocount on;
   
 declare 
   @begin_lsn binary(10)
 , @end_lsn binary(10)
 , @prev_max_lsn binary(10)
 -- get the max LSN for the capture instance from
 -- the last extract
 select @prev_max_lsn = max(max_lsn)
 from dbo.cdc_capture_log
 where capture_instance = @capture_instance
 -- if no row found in cdc_capture_log get the min lsn 
 -- for the capture instance
 if @prev_max_lsn is null
  set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
 else
  set @begin_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)
 -- get the max lsn
 set @end_lsn = sys.fn_cdc_get_max_lsn()
 
 insert into dbo.cdc_capture_log
  (capture_instance,start_time,min_lsn,max_lsn)
 values
  (@capture_instance,getdate(),@begin_lsn,@end_lsn) 
 select cast(scope_identity() as int) cdc_capture_log_id
end

The main points about the above stored procedure are:

  • We query the log to get the max_lsn from the last time we ran the SSIS package.  If we find the row from our previous run we call the CDC function sys.fn.cdc_increment_lsn to increment the LSN, else we call the CDC function sys.fn_cdc_get_min_lsn to get the LSN of the first change record for our table.

  • We call the CDC function sys.fn_cdc_get_max_lsn to get the highest LSN (i.e. the LSN of the latest transaction).  We get all CDC data up to and including this LSN on the current run.

  • We insert a new row into the log and return the identity value; we need the identity value to update this row.

The end_cdc_capture_log stored procedure updates the row created  by the init_cdc_capture_log stored procedure.  It is shown below:

create procedure dbo.end_cdc_capture_log
  @cdc_capture_log_id int
, @insert_count int
, @update_count int
, @delete_count int
as
begin
 set nocount on;
 update dbo.cdc_capture_log set
   end_time = getdate()
 , insert_count = @insert_count
 , update_count = @update_count
 , delete_count = @delete_count
 , status_code = 1
 where cdc_capture_log_id = @cdc_capture_log_id
end

The main points about the above stored procedure are:

  • cdc_capture_log_id is the value returned by the init_cdc_capture_log stored procedure.

  • We update the row with the counts, end time, and set the status_code to 1.

Creating the SSIS Package

We will create an SSIS package that has the following control flow:

ssis control flow

The main points about the above control flow are:

  • Init Log is an Execute SQL task; it calls the init_cdc_capture_log stored procedure (described above) and saves the identity value of the created cdc_capture_log row in a package variable.

  • Process Changes is a Data Flow task that retrieves the latest changes from the CDC table and copies them to our audit table.

  • End Log is an Execute SQL task that calls the end_cdc_capture_log stored procedure (described above) to update the cdc_capture_log row.

The Process Changes Data Flow task is implemented as shown below:

ssis data flow

The main points about the above data flow are:

  • Extract Customer Changes is an OLE DB Source that executes the stored procedure extract_customer_capture_log to retrieve the customer changes since the last run.

  • Count Inserts Updates and Deletes is a Script Component Transform task that just counts the number of inserts, updates and deletes in the change data.

  • Save Customer Changes to Custom Audit Table is an OLE DB Destination used to insert each change row into the customer_audit table.

The extract_customer_capture_log stored procedure is shown below:

create procedure dbo.extract_customer_capture_log
  @cdc_capture_log_id int
as
begin
 set nocount on;
   
 declare 
   @begin_lsn binary(10)
 , @end_lsn  binary(10)
 
 -- get the lsn range to process
 select 
   @begin_lsn = min_lsn
 , @end_lsn = max_lsn
 from dbo.cdc_capture_log
 where cdc_capture_log_id = @cdc_capture_log_id   
 
 -- extract and return the changes
 select m.tran_end_time modified_ts, x.* 
 from cdc.fn_cdc_get_all_changes_customer_all(
  @begin_lsn, @end_lsn, 'all'
 ) x
 join cdc.lsn_time_mapping m 
 on m.start_lsn = x.__$start_lsn ; 
end

The main points about the above stored procedure are:

  • The cdc_capture_log_id parameter value is the value returned from the call to the init_cdc_capture_log stored procedure (described above in the Logging section). 

  • Retrieve the LSN range from the cdc_capture_log table row.  The LSN range represents all changes that have occurred since the last run of the SSIS package.

  • The cdc.fn_cdc_get_all_changes_customer_all function is generated when you enable CDC.  The function name includes the capture instance.  The function returns the changes that occurred in the LSN range.

  • The cdc.lsn_time_mapping table is populated by CDC with the mapping of transaction times to LSNs.  The join retrieves the transaction time.  This alleviates the need to manually track this in the source table.

Take a look at cdc.fn_cdc_get_all_changes_<capture_instance> in Books on Line for additional information on retrieving the CDC change data,

Testing the SSIS Package

Before running the SSIS package, we need to execute a script that performs some inserts, updates and deletes.  We'll use the following script:

use mssqltips
go
insert into dbo.customer 
(name,sales_rep,region,credit_limit)
values
(N'BGM Systems', N'Cane', N'South', 2500) 
update dbo.customer 
set sales_rep = N'Smith'
where [name] = N'BGM Systems'
update dbo.customer 
set credit_limit = 3000
where [name] = N'BGM Systems'
delete dbo.customer
where [name] = N'BGM Systems'

After running the above script, execute the SSIS package, then check the customer_audit table to see that there are four rows; one for each change made in the script.  The partial contents of the customer_audit table are shown below:

results

The main points about the above table are:

  • effective_date is the date and time of the transaction, as retrieved from the cdc.lsn_time_mapping table.

  • Row 1 shows the insert; __$operation=2 for an insert.

  • Row 2 shows the update of the sales_rep; __$operation=4 for the update showing the values after the update.

  • Row 3 shows the update of the credit_limit.

  • Row 4 shows the delete; __$operation=1 for a delete.

The effective_date column provides the ability to query the customer_audit table and see the customer values at any point in time by filtering on the maximum effective_date that is less than or equal to some value.

Next Steps
  • Change Data Capture is a potentially useful feature in SQL Server 2008.  It's like a custom replication scheme.  CDC captures all of the changes, then you determine what to do with them.
  • Download the sample code and experiment with it.  The archive includes the SSIS project and the scripts used in this tip.
  • Take a look at this Microsoft whitepaper for details on tuning the performance of CDC.


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Ray Barley Ray Barley is a Principal Architect at IT Resource Partners and a MSSQLTips.com BI Expert.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2009-05-20

Comments For This Article




Wednesday, July 2, 2014 - 7:46:25 AM - Ray Barley Back To Top (32509)

The intent of the audit table is to maintain the complete history of every insert, update or delete of a customer.  If you enabled CDC on an existing table, then you could get rows in the audit table for updates or deletes without any inserts on the existing table.  Other than that would be some sort of bug.


Tuesday, July 1, 2014 - 7:40:10 PM - choppa Back To Top (32505)

Hi Ray,

Thanks for the article. I have une question: why would there be anything in the audit table when you did not issue an insert command?


Wednesday, May 8, 2013 - 10:39:05 AM - Raymond Barley Back To Top (23815)

Change Data Capture scans the transaction log looking for changes to tables / columns that have been configured for CDC.  Any changes found are copied to the CDC tables.  When you query cdc.fn_cdc_get_all_changes_<capture_instance> you can only retrieve rows that have changed; you can't retrive any row that has not changed.


Wednesday, May 8, 2013 - 4:55:16 AM - Shreyans Halani Back To Top (23799)

Hi Ray Barley,

Can we load data  which are not change  in source table  data to destination table using CDC?

 

Thanks,

Shreyans


Tuesday, February 12, 2013 - 3:55:57 PM - Raymond Barley Back To Top (22079)

I send emails from an SSIS package by using database mail in an Execute SQL task.  Here's a tip that discusses the setup of database mail: http://www.mssqltips.com/sqlservertip/1100/setting-up-database-mail-for-sql-2005/  Once you have it setup use the sp_send_dbmail stored procedure in your Execute SQL task.

Here's a link to sp_send_dbmail: http://msdn.microsoft.com/en-us/library/ms190307.aspx

 


Tuesday, February 12, 2013 - 1:27:33 PM - JBP Back To Top (22075)

IN SSIS I have implemented change data capture. I have made initial load and incremental load for the entire table which is related to data warehouse. Also made two master packages for initial and incremental. Initial consider as full load which would be loaded once and incremental loaded as per business require mental.

I need some information related to risk and error handling or any other related information maintained of packages, how to send email notification do you have script???


Thursday, July 12, 2012 - 11:14:08 AM - Luis Back To Top (18476)

Perfect, now it's woking like a charm.

Thank you for all your support, and wonce again congratulations for your great article.

 

Regards,

Luís Ivars


Thursday, July 12, 2012 - 9:57:23 AM - Ray Barley Back To Top (18475)

I tested again with your change to the init_cdc_capture_log stored proc:

 if @prev_max_lsn is null

  set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
 else
  set @begin_lsn =  @prev_max_lsn  --- this is the issue


This change causes you to pick up a changed row a second time since the begin_lsn is the same as the end_lsn from the previous time you ran the package.

An alternative way to fix the error 208 ("An insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes.") is to do the following:

1) add a precedence constraint after the Init Log Excecute SQL task; set the evaluation option to Expression and constraint, value = Success, and Expression = @[User::cdc_capture_log_id] > -1

2) change the stored proc to below to return -1 if the end_lsn is not > the end_lsn: 

replace code AFTER set @end_lsn = sys.fn_cdc_get_max_lsn() with:


if @begin_lsn <= @end_lsn

begin

insert into dbo.cdc_capture_log

(capture_instance,start_time,min_lsn,max_lsn)

values

(@capture_instance,getdate(),@begin_lsn,@end_lsn)

 

select cast(scope_identity() as int) cdc_capture_log_id

end

else

-- return -1 to indicate there is nothing to do

select CAST(-1 as int) cdc_capture_log_id

 

 



Thursday, July 12, 2012 - 6:22:25 AM - Luis Back To Top (18468)

I leave an example for testing:

3 updates, sincronization, and after one update.

The result of the table cdc_capture_log is:

93    TiposArtigo_all    2012-07-12 11:18:41.730    0x000092090000004C0001    0x00009209000001250004    2012-07-12 11:18:42.237    0    3    0    1
94    TiposArtigo_all    2012-07-12 11:19:56.263    0x00009209000001250004    0x00009209000001430004    2012-07-12 11:19:56.563    0    2    0    1

and should be:

93    TiposArtigo_all    2012-07-12 11:18:41.730    0x000092090000004C0001    0x00009209000001250004    2012-07-12 11:18:42.237    0    3    0    1
94    TiposArtigo_all    2012-07-12 11:19:56.263    0x00009209000001250004    0x00009209000001430004    2012-07-12 11:19:56.563    0    1    0    1

 

I hope i helped.

Regards


Thursday, July 12, 2012 - 6:06:25 AM - Luis Back To Top (18466)

Hi, Thank for your answer.

If you try to make one update and then check the log table you'll find the right count, but after that one, the counting will be increased by one, due to the count of the first row.

I've also made one change to one of the procedures (init_cdc_capture_log), for it to be running periodically without errors.

 if @prev_max_lsn is null
  set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
 else
  set @begin_lsn =  sys.fn_cdc_increment_lsn(@prev_max_lsn)

changed to:

 if @prev_max_lsn is null
  set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
 else
  set @begin_lsn =  @prev_max_lsn

 


Wednesday, July 11, 2012 - 3:38:41 PM - Ray Barley Back To Top (18451)

I just did a couple of test runs and I can't reproduce the behavior "after the first sincronization, you will count one more transaction than it's really appening"


Wednesday, July 11, 2012 - 10:20:36 AM - Luis Back To Top (18446)

Hi Ray, thank you for your great article.

I used it as a guide for me, and i found one issue implementing it.

Have you regarded that after the first sincronization, you will count one more transaction than it's really appening?

If yes can you mention it and how you solved it.

If i find a work arround i'll let you know.

 

Regards.


Monday, May 28, 2012 - 5:51:56 AM - JKJLKJ Back To Top (17681)
  1. create procedure dbo.extract_customer_capture_log
      @cdc_capture_log_id int
    as
    begin
     set nocount on;
       
     declare 
       @begin_lsn binary(10)
     , @end_lsn  binary(10)
     
     -- get the lsn range to process
     select 
       @begin_lsn = min_lsn
     , @end_lsn = max_lsn
     from dbo.cdc_capture_log
     where cdc_capture_log_id = @cdc_capture_log_id   
     
     -- extract and return the changes
     select m.tran_end_time modified_ts, x.* 
     from cdc.fn_cdc_get_all_changes_customer_all(
      @begin_lsn, @end_lsn, 'all'
     ) x
     join cdc.lsn_time_mapping m 
     on m.start_lsn = x.__$start_lsn ; 
    end

Friday, December 10, 2010 - 10:57:42 AM - Ali Bousselot Back To Top (10446)

Thanks for your feedback and help. I really appreciate it. We want to run a SSIS package (that copies the change data to a database on a separate server) every 1 minute. Is that a bad idea to run the pacakge every 1 minute? Do you think it would be better to run it every 3 minutes? The database that we're copying the change data to, will be used for reporting. We want reports to use this reporting database instead of connecting to the OLTP (source) database.


Thursday, December 9, 2010 - 5:11:36 PM - Ray Barley Back To Top (10442)

I would only insert a row into cdc_capture_log if there are changes to process.  As fas as using WAITFOR that is certainly an option especially if you want to process changes as soon as possible.


Thursday, December 9, 2010 - 3:44:46 PM - Ali Bousselot Back To Top (10440)

Or do you recommend performing WAITFORs in the dbo.init_cdc_capture_log stored proc, to delay the execution of the stored procedure, if necessary?


Thursday, December 9, 2010 - 3:31:37 PM - Ali Bousselot Back To Top (10439)

Thanks for your help. The issue you described is exactly what happened when I ran the pacakge a second time. Do you recommend that the dbo.init_cdc_capture_log not insert a value in the cdc_capture_log table if the @begin_lsn is greater than the @end_lsn? Thanks again.


Thursday, December 9, 2010 - 3:20:41 PM - Ray Barley Back To Top (10438)

I think the problem is caused by this code in the dbo.init_cdc_capture_log stored proc:

 

if @prev_max_lsn is null

  set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)

 else

  set @begin_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)

 -- get the max lsn

 set @end_lsn = sys.fn_cdc_get_max_lsn()

 

When there are no changes, set @begin_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)

returns a LSN that is greater than the @end_lsn and you get error 208 

("An insufficient number of arguments were supplied for the procedure or function 

cdc.fn_cdc_get_all_changes.")

 

I think what I did is to change the dbo.init_cdc_capture_log stored proc to

return a value that indicates the @begin_lsn is (or is not) greater than the @end_lsn.  the

SSIS package would not continue if the @begin_lsn is greater than the @end_lsn since

this means that there is nothing to do.


Thursday, December 9, 2010 - 12:37:11 PM - Ali Bousselot Back To Top (10434)

This is a great article! I downloaded the sample code and it ran successfully. Then, I immediately ran the sample code again and it failed because no data had changed in the customer table. Is there a way to handle that issue? I'm trying to design a CDC package that will run every 1 minute and I want it to run successfully even if the data hasn't changed in the source tables (with CDC enabled).

Thanks,

Ali


Wednesday, September 29, 2010 - 11:18:46 AM - Ray Barley Back To Top (10216)
There are 2 functions available - sys.fn_cdc_map_lsn_to_time(lsn_value ) returns the date and time value from the tran_end_time column in the cdc.lsn_time_mapping system table for the specified log sequence number (LSN) and sys.fn_cdc_map_time_to_lsn ('<relational_operator>', tracking_time ) which returns the log sequence number (LSN) value from the start_lsn column in the cdc.lsn_time_mapping table for the specified time and relational operator.

In my case I just wanted to process changes based on the LSN and not deal with dates and times at all. 

I don't like using date and time because I've actually had an instance where a last modified date and time was set on a row and the process that extracts data based on last modeified date and time ran before the transaction was committed and uncommitted updates were missed.  Sounds hard to believe but I did see it.

 


Wednesday, September 29, 2010 - 10:35:25 AM - Ram Back To Top (10215)
Thanks for posting this article. The code you provided gave me an excellent head start. I was able to customize it. Works great! The one comment I have is Microsoft advices not to use the CDC tables directly and instead use the functions they provide to get to the data. You have used the lsn_time_mapping table directly - is there a way to use the function instead?

Thanks for this article again. Great Post!

Ram

 


Monday, September 20, 2010 - 5:54:48 PM - Chris Skorlinski Back To Top (10187)

I enjoyed reading about your CDC\SSIS posting in MSSQLTIPS.  I thought it was very well written, easy to follow, and very clearly walk through CDC and SSIS.  I’ve linked your presentation to my MSDN “ReplTalk” blog.

 

http://blogs.msdn.com/b/repltalk/archive/2010/04/19/references-and-links-on-change-data-capture.aspx

 

Thanks for a great CDC\SSIS article

 

Chris Skorlinski

Microsoft SQL Server Escalation Services

 


Monday, February 22, 2010 - 1:12:32 PM - raybarley Back To Top (4933)
There are a number of things that you have to double check - did you enable CDC on the database, did you enable CDC on the table, is SQL Server Agent running? After you get through those, have you performed any inserts, updates or deletes in the table that has CDC enabled? Query the cdc.lsn_time_mapping table. There will be rows in there if any inserts, updates or deletes have happened on a table that has CDC enabled. Lastly you can query the actual changes that have been captured; take a look at this topic in books on line: cdc._CT

Monday, February 22, 2010 - 11:03:06 AM - [email protected] Back To Top (4932)

Hi Ray Barley,

 

I have used your article How To Process Change Data Capture (CDC) in SQL Server Integration Services SSIS 2008.

Same procedure is followed by creating new database.

SSIS Package works fine. But in the end I dont see any Values in the Customer_Audit table.

I TRIED SETTING UP ROLES AND ALL THE CONNECTIONS WERE PERFECT EVEN THE PACKAGE IS GETTING SUCCESS, BUT  ID ONT SEE ANY UPDATED VALUES INTO CUSTOMER_AUDIT TABLE.

 

pLEASE HELP ME OUT ITS VERY URGENT.

 

Thanks,

Varma

 















get free sql tips
agree to terms