How to Compare Database Schema in SQL Server - Part 1

By:   |   Updated: 2020-05-26   |   Comments (3)   |   Related: 1 | 2 | 3 | More > Comparison Data and Objects


Problem

In this article, which is divided in three parts, I present a way to compare two MS SQL Server databases using database schema, and change the database based on the schema differences, without losing data. This database comparison is focused only on tables, columns and constraints (including indexes). It does not deal with other elements as such as views, stored procedures, etc. You can extend the program by adding these elements in the comparison.

I used for this presentation SQL Server 2019 and latest Visual Studio 2019. Some of the C# code will not work if you don’t use the latest .NET (minimum 4.8)

  • Part 1 - Presents how to save a database schema to a JSON file and how to create a database using that JSON file.
  • Part 2 - Presents how to compare a database against the schema saved in Part 1 and shows all differences, if any.
  • Part 3 - Presents how to change a database, without losing data, in order to match the schema. The program has full support for Always Encrypted columns. This means you can add a missing encrypted column, delete an extra one, or either encrypt or decrypt a column using Always Encrypted columns without using SSMS or PowerShell. Part 3 will have the whole project attached.
Solution

About 14 years ago I worked for a company who used Microsoft Access as the database. The database was not encrypted, so anybody with Access installed could open and change the database. Before I joined the company, several customers played with their database in such a way they added new tables or columns to existing ones, deleted some columns or tables they thought were not important, or even changed the order of columns in existing tables. Why column order was important? Because the program had been developed over the years such that in many places the columns were referred by their index in the table instead of their name (you can do this in Microsoft Access). Basically, Microsoft Access was used like an x-base database (dBase, FoxPro for those who remember). Those customers later complained that our software doesn’t work anymore, and they said nobody touched the database.

Long story short, my colleagues discovered the changes in the database, and thought of a way to prevent this in the future. As a reminder, this happened a while before I join the company. They came up with a solution. They created a separate module which was run at the startup of the application, which checked the consistency of the database. The database was compared against a database schema (all customers used the same database structure) such that:

  • Everything which was extra (tables, columns and constraints) were just deleted
  • Everything which was missing (tables, columns and constraints) were added
  • If the database structure was not exactly as the well-known structure, the database was modified to actually match that structure. That implied change the columns type and/or size, change the column order, etc.

After I joined the company, there were several customers who said they wanted to use SQL Server 2005 as database instead of Microsoft Access, since they already had paid for the license. My assignment was to create a library for dealing with SQL Server such that the program does not need to change (calls to database, queries used, etc.). Apart from having to write the comparison module, I also had to make SQL Server behave like Microsoft Access, meaning things like referring to a row in a table by its index, columns by their index, add first an empty row to a table with primary key, and only after that populating it, etc. It was a real challenge. You don’t need to believe me, just try it yourself and enjoy. This might be the story for another article if anybody would be interested.

The company product was written in C, and I converted it (as much as it was possible) to C++.

After some research on how to deal with SQL Server database comparison, I came up with the solution to write a C++ dll where the database handling was written in C++/CLI. That was another challenge, as I didn’t find any examples of C++/CLI programs dealing with SQL Server databases. All examples were either in C#, or VB.NET, and a direct write in C++/CLI of the same calls didn’t work for many functions.

First, I tried to find if there were any products which compared two SQL databases, and all I could find at the time were only commercial ones. But definitely none of them were able to make changes to the database according to the database schema provided. I first developed the comparison program in C#, as C++/CLI support in Visual Studio debugger was very poor (and still is in the latest Visual Studio 2019 version 16.5.3). As database schema I used XML. Recently I redesigned this program, and now I use JSON instead of XML.

Getting the SQL Server Database Schema

If you want to see if two databases have the same structure, you need a schema to compare with. This means two operations are also very important: reading a database schema, and saving the schema to a file. In this article I will show you how to read a database schema as a JSON file, how to save the schema to a JSON file and how to create the database from the schema.

In one of my previous articles I presented how to get the SQL database schema as a JSON structure. Here I present a slightly modified version of those SQL queries, which are better suited for making changes to database.

Get Database Columns

Getting the database columns can be obtained by using the string. As you can see we get all columns, including Always Encrypted ones.

string getColumns =
  "select \n" +
  "   [Db Schema].TABLE_SCHEMA [Table Schema]," +
  "   [Tables].name [Table Name], \n" +
  "   [Table Columns].name [Column Name], \n" +
  "   [Table Columns].is_nullable[is_nullable], \n" +
  "   [Table Columns].is_identity[is_identity], \n" +
  "   [Table Columns].encryption_algorithm_name, \n" +
  "   [Table Columns].encryption_type_desc, \n" +
  "   case when cek.name is null then null else cek.name end as CEK_Name, \n" +
  "   [Column Type].name [data_type], \n" +
  "   cast \n" +
  "       (case when [Column Type].name = 'text' \n" +
  "           then null \n" +
  "           else \n" +
  "               case when [Table Columns].precision = 0 and [Column Type].name <> 'text' \n" +
  "                   then [Table Columns].max_length \n" +
  "                   else null \n" +
  "               end \n" +
  "           end \n" +
  "       as smallint) [max_length], \n" +
  "   cast(case when [Table Columns].precision>0 and [Column Type].precision=[Column Type].scale \n" +
  "           then [Table Columns].precision else null end as tinyint) [precision], \n" +
  "   cast(case when [Table Columns].precision>0 and [Column Type].precision=[Column Type].scale \n" +
  "           then [Table Columns].scale else null end as tinyint) [scale], \n" +
  "   cast(case when [Table Columns].is_identity= 1 \n" +
  "           then seed_value else null end as sql_variant) [seed_value], \n" +
  "   cast(case when [Table Columns].is_identity= 1 \n" +
  "           then increment_value else null end as sql_variant) [increment_value], \n" +
  "   cast(case when [Table Columns].default_object_id>0 \n" +
  "           then definition else null end as varchar(4000)) [default_value] \n" +
  "from INFORMATION_SCHEMA.TABLES [Db Schema] \n" +
  "   join sys.objects [Tables] on [Db Schema].TABLE_SCHEMA = schema_name([Tables].[schema_id]) \n" +
  "       and [Db Schema].TABLE_NAME = [Tables].name \n" +
  "   join sys.columns [Table Columns] on [Tables].object_id=[Table Columns].object_id \n" +
  "   left join sys.column_encryption_keys cek on [Table Columns].column_encryption_key_id = \n" +
  "       CEK.column_encryption_key_id \n" +
  "   left join sys.identity_columns id on [Tables].object_id= id.object_id \n" +
  "   join sys.types [Column Type] on [Table Columns].system_type_id=[Column Type].system_type_id \n" +
  "       and [Column Type].system_type_id=[Column Type].user_type_id \n" +
  "   left join sys.default_constraints d on [Table Columns].default_object_id= d.object_id \n" +
  "where [Tables].type= 'u' \n" +
  "order by [Table Schema], [Table Name] \n" +
  "for json auto, root('DBColumns') \n";		

Get Foreign Keys

Getting foreign keys can be done with the string. As a note here, in SQL Server we can have a foreign key defined in a table from a schema which refers to a key defined in a table in a different schema, so we need to keep information about each table schema.

string getFkConstraints =
  "select distinct \n" +
  "   ConstraintColumns.TABLE_SCHEMA [Table Schema], \n" +
  "   ConstraintColumns.Table_Name [Table Name], \n" +
  "   [Constraints].CONSTRAINT_NAME [Constraint Name], \n" +
  "   [Constraints].[Referenced Schema], \n" +
  "   [Constraints].[Referenced Table], \n" +
  "   Columns.[Column Name], \n" +
  "   Columns.[Referenced Column], \n" +
  "   Columns.Colfk_id [Column id], \n" +
  "   Columns.RefCol_id [Referenced Column id] \n" +
  "from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ConstraintColumns \n" +
  "join \n" +
  "  ( \n" +
  "    select \n" +
  "      tc.TABLE_SCHEMA, \n" +
  "      tc.TABLE_NAME, \n" +
  "      tc.CONSTRAINT_NAME, \n" +
  "      rc.UNIQUE_CONSTRAINT_SCHEMA [Referenced Schema], \n" +
  "      ccolumns.[Referenced Table] \n" +
  "    from INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc \n" +
  "    join INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc \n" +
  "      on tc.CONSTRAINT_SCHEMA= rc.CONSTRAINT_SCHEMA and tc.CONSTRAINT_NAME= rc.CONSTRAINT_NAME \n" +
  "    join \n" +
  "       ( \n" +
  "           select \n" +
  "               schema_name(fk.schema_id) Table_Schema, \n" +
  "               object_name(fkc.parent_object_id) Table_Name, \n" +
  "               object_name(fkc.constraint_object_id) Constraint_Name, \n" +
  "               object_name(fkc.referenced_object_id) [Referenced Table] \n" +
  "           from sys.foreign_key_columns fkc \n" +
  "           join sys.foreign_keys fk on fk.object_id= fkc.constraint_object_id \n" +
  "               and fk.parent_object_id=fkc.parent_object_id \n" +
  "       ) ccolumns \n" +
  "          on tc.TABLE_SCHEMA=ccolumns.TABLE_SCHEMA \n" +
  "              and tc.TABLE_NAME=ccolumns.TABLE_NAME \n" +
  "  ) [Constraints] \n" +
  "  on ConstraintColumns.CONSTRAINT_NAME=[Constraints].CONSTRAINT_NAME \n" +
  "      and ConstraintColumns.TABLE_NAME=[Constraints].TABLE_NAME \n" +
  "join \n" +
  "  ( \n" +
  "      select \n" +
  "          schema_name(fk.schema_id) Table_Schema, \n" +
  "          object_name(fkc.parent_object_id) Table_Name, \n" +
  "          object_name(fkc.constraint_object_id) Constraint_Name, \n" +
  "          colfk.name [Column Name], \n" +
  "          object_name(fkc.referenced_object_id) [Referenced Table], \n" +
  "          colref.name [Referenced Column],\n" +
  "          colfk.column_id [Colfk_id],\n" +
  "          colref.column_id [RefCol_id]\n" +
  "      from sys.foreign_key_columns fkc\n" +
  "      join sys.foreign_keys fk on fk.object_id= fkc.constraint_object_id\n" +
  "          and fk.parent_object_id= fkc.parent_object_id\n" +
  "      join sys.columns colfk on fkc.parent_object_id= colfk.object_id\n" +
  "          and fkc.parent_column_id= colfk.column_id\n" +
  "      join sys.columns colref on fkc.referenced_object_id= colref.object_id\n" +
  "          and fkc.referenced_column_id= colref.column_id\n" +
  "  ) Columns \n" +
  "      on ConstraintColumns.TABLE_SCHEMA=Columns.TABLE_SCHEMA \n" +
  "          and ConstraintColumns.TABLE_NAME=Columns.TABLE_NAME \n" +
  "          and ConstraintColumns.CONSTRAINT_NAME=Columns.Constraint_Name\n" +
  "order by ConstraintColumns.TABLE_SCHEMA, ConstraintColumns.Table_Name, \n" +
  "   [Constraint Name], [Referenced Schema], [Referenced Table], [Column id]\n" +
  "for json auto, root('DBFKConstraints') \n";
			

Get Constraints

The rest of the constraints (apart from foreign keys) can be obtained by using the follow string:

string getConstraints =
  "select distinct \n" +
  "    ConstraintColumns.TABLE_SCHEMA [Table Schema],\n" +
  "    ConstraintColumns.Table_Name [Table Name],\n" +
  "    [Constraints].CONSTRAINT_NAME [Constraint Name],\n" +
  "    [Constraints].type_desc,\n" +
  "    [Constraints].ignore_dup_key,\n" +
  "    [Constraints].CONSTRAINT_TYPE [Constraint Type],\n" +
  "    [Constraints].CHECK_CLAUSE,\n" +
  "    Columns.COLUMN_NAME,\n" +
  "    Columns.is_descending_key,\n" +
  "    Columns.ordinal_position\n" +
  "from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ConstraintColumns\n" +
  "join \n" +
  "    (\n" +
  "        select tc.TABLE_SCHEMA, tc.CONSTRAINT_CATALOG, tc.CONSTRAINT_NAME, \n" +
  "            tc.CONSTRAINT_TYPE, tc.TABLE_NAME, ck.CHECK_CLAUSE, \n" +
  "            i.type_desc, I.ignore_dup_key\n" +
  "        from INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc\n" +
  "        left join sys.indexes i on OBJECT_name(i.object_id) = tc.TABLE_NAME\n" +
  "            and i.name = tc.CONSTRAINT_NAME\n" +
  "        left join INFORMATION_SCHEMA.CHECK_CONSTRAINTS ck \n" +
  "            on tc.CONSTRAINT_CATALOG=ck.CONSTRAINT_CATALOG\n" +
  "                and tc.CONSTRAINT_NAME=ck.CONSTRAINT_NAME\n" +
  "    ) [Constraints] \n" +
  "        on ConstraintColumns.CONSTRAINT_NAME=[Constraints].CONSTRAINT_NAME \n" +
  "            and ConstraintColumns.CONSTRAINT_CATALOG=[Constraints].CONSTRAINT_CATALOG\n" +
  "            and ConstraintColumns.TABLE_NAME=[Constraints].TABLE_NAME\n" +
  "join \n" +
  "    (\n" +
  "        select CheckColumns.TABLE_SCHEMA,\n" +
  "            CheckColumns.TABLE_NAME, \n" +
  "            CheckColumns.COLUMN_NAME, \n" +
  "            CheckColumns.CONSTRAINT_NAME, \n" +
  "            null as is_descending_key,\n" +
  "            null as ordinal_position\n" +
  "        from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE CheckColumns \n" +
  "        where CheckColumns.CONSTRAINT_NAME in \n" +
  "            (\n" +
  "                select CONSTRAINT_NAME from INFORMATION_SCHEMA.CHECK_CONSTRAINTS\n" +
  "            )\n" +
  "        union all\n" +
  "        select \n" +
  "            ConstraintColumns.TABLE_SCHEMA,\n" +
  "            ConstraintColumns.TABLE_NAME, \n" +
  "            ConstraintColumns.COLUMN_NAME, \n" +
  "            ConstraintColumns.CONSTRAINT_NAME, \n" +
  "            ConstraintColumns.is_descending_key,\n" +
  "            ConstraintColumns.ordinal_position\n" +
  "        from \n" +
  "            (\n" +
  "                select\n" +
  "                    schema_name(obj.schema_id) TABLE_SCHEMA,\n" +
  "                    obj.name TABLE_NAME,\n" +
  "                    i.name CONSTRAINT_NAME,\n" +
  "                    index_column_id ordinal_position,\n" +
  "                    (\n" +
  "                        select name from sys.columns cols \n" +
  "                        where cols.object_id=obj.object_id and ic.column_id=cols.column_id\n" +
  "                    ) column_name,\n" +
  "                    is_descending_key\n" +
  "                from sys.indexes i\n" +
  "                    join sys.objects obj on obj.object_id=i.object_id and obj.type='u'\n" +
  "                    join sys.index_columns ic on obj.object_id=ic.object_id\n" +
  "                        and ic.index_id=i.index_id\n" +
  "                where is_primary_key=1 or is_unique_constraint=1\n" +
  "            ) ConstraintColumns\n" +
  "    ) Columns \n" +
  "        on ConstraintColumns.TABLE_SCHEMA=Columns.TABLE_SCHEMA\n" +
  "            and ConstraintColumns.TABLE_NAME=Columns.TABLE_NAME \n" +
  "            and ConstraintColumns.CONSTRAINT_NAME=Columns.CONSTRAINT_NAME\n" +
  "order by \n" +
  "    ConstraintColumns.TABLE_SCHEMA, \n" +
  "    ConstraintColumns.Table_Name, \n" +
  "    [Constraint Name], \n" +
  "    Columns.ordinal_position\n" +
  "for json auto, root('DBConstraints')";
			

Get Indexes

The final piece of the puzzle we need are indexes. We can get them by using the string:

string getIndexes =
  "select \n" +
  "   schema_name(obj.schema_id) [Table Schema], \n" +
  "   obj.name [Table Name], \n" +
  "   [Indexes].name index_name, \n" +
  "   [Indexes].type_desc, \n" +
  "   index_column_id [column_id], \n" +
  "   ( \n" +
  "       select name from sys.columns cols \n" +
  "       where cols.object_id=obj.object_id and Columns.column_id= cols.column_id \n" +
  "   ) column_name, \n" +
  "   ignore_dup_key, \n" +
  "   is_descending_key \n" +
  "from sys.indexes [Indexes] \n" +
  "   join sys.objects obj on obj.object_id=[Indexes].object_id and obj.type= 'u' \n" +
  "   join sys.index_columns Columns on obj.object_id= Columns.object_id \n" +
  "       and Columns.index_id=[Indexes].index_id \n" +
  "where is_primary_key=0 and is_unique_constraint=0 \n" +
  "order by [Table Schema], [Table Name], index_name, column_id \n" +
  "for json auto, root('DBIndexes') \n";
			

Saving Database Schema to a JSON File

If you want to compare a SQL database, first you need its schema. We will read the database elements mentioned above.

Reading database columns can be done with the following code:

  private void GetColumns(SqlCommand cmd)
{
_columnsFromDb.Clear();
cmd.CommandText = getColumns;
try
{
SqlDataReader reader = cmd.ExecuteReader();
StringBuilder sb = new StringBuilder();
// Need this cycle to read the full content, otherwise it gets truncated!!!
while(reader.Read())
{
sb.Append(reader.GetSqlString(0).Value);
}
reader.Close();
string json = sb.ToString();
dynamic cols = JsonConvert.DeserializeObject(json);

_columnsFromDb = cols == null ? new JArray() : (JArray)cols["DBColumns"];
// If we have an empty database, we don't have columns
if(_columnsFromDb.Count == 0)
{
_error += "\r\nMissing columns from schema file";
_haveErrors = true;
}
}
catch(Exception e)
{
_error += "\r\nError reading database: " + e.Message + ")";
_haveErrors = true;
}
}
}

As a note, a schema can be larger than SqlDataReader can return in one call, so we need to do a while loop in order to get the whole schema.

We collect all errors which might appear and present them to the user at the end of the program.

Reading other elements of the database, as foreign keys, other constraints and indexes, can be done with code similar to the code above.

If we don’t want the user to see the schema, we can use a simple "encryption" of the schema when we save it to the JSON file, like saving the schema as base64. If we don’t want the user to alters the schema saved in the JSON file, we can add a checksum of the schema as the first line of JSON file. I choose as checksum the base64 of SHA512 hash of the JSON file.

Once we have the database elements, we can save the schema to a JSON file with a code similar to this:

dynamic dbschema = new JObject();
try
{
_conn.ChangeDatabase(_dbName);
GetDatabaseElements();
dbschema.DBColumns = _columnsFromDb;
dbschema.DBFKConstraints = _fkConstraintsFromDb;
dbschema.DBConstraints = _constraintsFromDb;
dbschema.DBIndexes = _indexesFromDb;

// Delete the previous database structure file
File.Delete(_jsonFile);
string jsonstr = dbschema.ToString() + "\n";
byte[] data = Encoding.UTF8.GetBytes(jsonstr);
string json64 = Convert.ToBase64String(data);

// Make sure the database structure hasn't been tampered
using SHA512 sha512Hash = SHA512.Create();
byte[] tempSha512;
tempSha512 = sha512Hash.ComputeHash(data);
string hash = Convert.ToBase64String(tempSha512) + "\n";
using StreamWriter jsonout = new StreamWriter(_jsonFile);
if(!_ignoreChecksum)
jsonout.Write(hash);
jsonout.Write(_encryptSchema ? json64 : jsonstr);
}
catch(Exception e)
{
_error += "\r\nError saving database schema: " + e.Message + "\n";
_haveErrors = true;
}

Create a Database from a Schema

As an example, I used the database created in the article about importing a MS Access database (see references). This database has Always Encrypted columns. I will not talk again how to create CMK and CEK, you can find this information in this article.

The schema for an encrypted column looks like:

"Table Schema": "dbo",
"Tables": [
    {
      "Table Name": "Company",
      "Table Columns": [
        {
          "Column Name": "IBAN",
          "is_nullable": true,
          "is_identity": false,
          "encryption_algorithm_name": "AEAD_AES_256_CBC_HMAC_SHA_256",
          "encryption_type_desc": "DETERMINISTIC",
          "CEK_Name": "CEK_TestAlwaysEncrypted",
          "Column Type": [
            {
              "data_type": "varchar",
              "max_length": 34
             }
        }
      ]
    }
]			

If we want to have an encrypted column in a new database, we just need to add to the schema of the database these three lines and tell the program which creates the database that we use encrypted columns.

"encryption_algorithm_name": "AEAD_AES_256_CBC_HMAC_SHA_256",
"encryption_type_desc": "DETERMINISTIC",
"CEK_Name": "CEK_TestAlwaysEncrypted",

As a note, we will use CEK as the key we specify in the config file of the program, and ignore the CEK specified for a particular column. We assume we use the same key.

Read Database Schema

Reading database schema from the JSON file created previously can be done with the following code:

private void ReadDBStructure()
{
string jsonstr = string.Empty;
string hashstr = string.Empty;
try
{
using(StreamReader reader = new StreamReader(_jsonFile))
{
if(!_ignoreChecksum) // get the hash which is in first line
hashstr = reader.ReadLine();
if(_encryptSchema)
{
string tmpstr = reader.ReadToEnd();
byte[] data = Convert.FromBase64String(tmpstr);
jsonstr = Encoding.ASCII.GetString(data);
}
else
{
jsonstr = reader.ReadToEnd();
}
}
if(!_ignoreChecksum)
{
byte[] jsondata = Encoding.UTF8.GetBytes(jsonstr);

// Make sure the database structure hasn't been tampered
using SHA512 sha512Hash = SHA512.Create();
byte[] temphash512;
temphash512 = sha512Hash.ComputeHash(jsondata);
string hashjson = Convert.ToBase64String(temphash512);
if(hashstr != hashjson) // Database structure has been tampered!
{
_error += "\r\nDatabase structure has been tampered!";
_haveErrors = true;
return;
}
}

// When saved from SSMS the json string is encoded with html tags, like &gt; &lt; etc
// We don't need them
jsonstr = HttpUtility.HtmlDecode(jsonstr);
// Get the database json structure
dynamic array = JsonConvert.DeserializeObject(jsonstr);
_columnsFromJson = array["DBColumns"];
_fkConstraintsFromJson = array["DBFKConstraints"];
if(_fkConstraintsFromJson == null)
{
_fkConstraintsFromJson = new JArray();
}
_constraintsFromJson = array["DBConstraints"];
if(_constraintsFromJson == null)
{
_constraintsFromJson = new JArray();
}
_indexesFromJson = array["DBIndexes"];
if(_indexesFromJson == null)
{
_indexesFromJson = new JArray();
}
GetTablesFromJson();
}
catch(Exception e)
{
_error += "\r\nError parsing database structure file: " + e.Message;
_haveErrors = true;
}
}

You can see that we don’t have tables in the schema file, so we don’t read them back. But when we want to create a new database, we need tables, and each table with its own columns. Because every column belongs to a table, we already have all tables in the schema file, but not in a way which we need to create the database. The function GetTablesFromJson() collects all tables from DBColumns element and can be implemented as:

private void GetTablesFromJson()
{
// We have no columns, so we have no tables
_tablesFromJson = _columnsFromJson.Count == 0
? (dynamic)new ArrayList()
: (dynamic)(from schema in _columnsFromJson
select new
{
schemaName = (string)schema["Table Schema"],
tables = (from t in schema["Tables"]
select (string)t["Table Name"]).ToList()
}).ToList();
}

Create Database Tables

Now we have the database tables, so we can create them:

private void CreateTables(SqlCommand cmd)
{
string query = string.Empty;
try
{
foreach(JToken schema in _columnsFromJson)
{
// Table schema might be missing from Json file
string schemaName = (string)schema["Table Schema"];
if(string.IsNullOrWhiteSpace(schemaName))
schemaName = "dbo";

// Create schema if it doesn't exist
query = "IF(NOT EXISTS(SELECT * FROM sys.schemas WHERE name = '" + schemaName + "'))\n" +
"BEGIN\n" +
" EXEC('CREATE SCHEMA [" + schemaName + "] AUTHORIZATION [dbo]')\n" +
"END;";
cmd.CommandText = query;
cmd.ExecuteNonQuery();

JArray tablesFromSchema = (JArray)schema["Tables"];
foreach(var (table, fullTableName) in from JToken table in tablesFromSchema
let tableName = "[" + (string)table["Table Name"] + "]"
let fullTableName = "[" + schemaName + "]." + tableName
select (table, fullTableName))
{
// Start building the query for creating the table
query = "CREATE TABLE " + fullTableName + " (\n";
JArray tableColumns = (JArray)table["Table Columns"];
foreach(JToken column in tableColumns)
{
string columnName, datatype = string.Empty;
string encryptionType, encrypttext = string.Empty;
string defaultValue = null;
bool isNullable = false, isIdentity = false;
int? maxlen = null;
int seed = 0, increment = 0;
int? precision = null, scale = null;

columnName = (string)column["Column Name"];
isNullable = (bool)column["is_nullable"];
isIdentity = (bool)column["is_identity"];
encryptionType = (string)column["encryption_type_desc"];
//cekName = (string)column["CEK_Name"];
if(!string.IsNullOrWhiteSpace(encryptionType) && !string.IsNullOrWhiteSpace(_cek))
encrypttext = " COLLATE Latin1_General_BIN2 ENCRYPTED WITH (COLUMN_ENCRYPTION_KEY=["+
_cek + "], ENCRYPTION_TYPE = " + encryptionType +
", ALGORITHM = 'AEAD_AES_256_CBC_HMAC_SHA_256')";

JArray columnType = (JArray)column["Column Type"];
foreach(JToken coltype in columnType)
{
datatype = (string)coltype["data_type"];
defaultValue = (string)coltype["default_value"];
maxlen = (int?)coltype["max_length"];
if(isIdentity)
{
seed = (int)coltype["seed_value"];
increment = (int)coltype["increment_value"];
}
precision = (int?)coltype["precision"];
scale = (int?)coltype["scale"];
}
if(!string.IsNullOrWhiteSpace(datatype))
query += " [" + columnName + "] " + datatype;
else // We should NOT get here!!!
{
_error += "\r\nError in database schema, in table " +
fullTableName + ", column [" + columnName +
"] has no data type";
_haveErrors = true;
continue; // go to next column, trying to collect all errors
}

if(precision.HasValue)
{
query += "(" + precision.Value;
if(scale.HasValue)
query += ", " + scale.Value;
query += ") ";
}
else if(maxlen.HasValue)
query += "(" + maxlen.Value + ") ";
if(isIdentity)
{
query += " identity(" + seed + ", " + increment + ") NOT NULL";
}
else
{
if(!string.IsNullOrWhiteSpace(encrypttext))
query += encrypttext;
if(!isNullable)
{
query += " NOT NULL ";
}
}
if(!string.IsNullOrWhiteSpace(defaultValue))
query += " DEFAULT " + defaultValue;
query += ",\n";
}
// Replace the last , with )
query = query.Substring(0, query.LastIndexOf(',')) + "\n)";
// Create the table
cmd.CommandText = query;
cmd.ExecuteNonQuery();
_diffmsg += "\r\n\tCreate table " + fullTableName + "...";
}
}
}
catch(Exception e)
{
_error += "\r\nError creating tables: " + e.Message;
_haveErrors = true;
}
}

Create Indexes

After we create the tables, we will create indexes which can be done with code like this:

private void CreateIndexes(SqlCommand cmd)
{
string query = string.Empty;
try
{
foreach(var (fullTableName, arrayjs) in
from JToken table in _indexesFromJson
let schemaName = (string)table["Table Schema"]
let tableName = (string)table["Table Name"]
let fullTableName = "[" + schemaName + "].[" + tableName + "]"
let arrayjs = (JArray)table["Indexes"]
select (fullTableName, arrayjs))
{
foreach(var (index, indexname, columns, typedesc, ignoreDupKey) in
from JToken index in arrayjs
let indexname = "[" + (string)index["index_name"] + "]"
let columns = (JArray)index["Columns"]
let typedesc = (string)index["type_desc"]
let ignoreDupKey = (bool)index["ignore_dup_key"]
select (index, indexname, columns, typedesc, ignoreDupKey))
{
query = "CREATE " + typedesc + " INDEX " + indexname + " ON "
+ fullTableName + "\n(\n";
foreach(var column in columns)
{
bool isDescending = (bool)column["is_descending_key"];
query += " [" + (string)column["column_name"] + "] ";
if(isDescending)
query += " DESC";
query += ",\n";
}
// Replace the last , with )
query = query.Substring(0, query.LastIndexOf(',')) + "\n)";
if(ignoreDupKey)
query += " WITH (IGNORE_DUP_KEY = ON)\n";
cmd.CommandText = query;
cmd.ExecuteNonQuery();
_diffmsg += "\r\n\tCreate index [" + indexname + "] for table "
+ fullTableName + "...";
}
}
}
catch(Exception e)
{
_error += "\r\nError creating indexes: " + e.Message;
_haveErrors = true;
}
}

Create Constraints

Next step is to create constraints, except foreign keys.

private void CreateNonFkConstraints(SqlCommand cmd)
{
// Create Primary Keys, Unique and Check constraints
string query = string.Empty;
try
{
foreach(var (schemaName, tableName, fullTableName, arrayjs) in
from JToken table in _constraintsFromJson
let schemaName = (string)table["Table Schema"]
let tableName = (string)table["Table Name"]
let fullTableName = "[" + schemaName + "].[" + tableName + "]"
let arrayjs = (JArray)table["Constraints"]
select (schemaName, tableName, fullTableName, arrayjs))
{
foreach(var (constraintname,constrainttype,checkclause,columns,typedesc,ignoreDupKey) in
from JToken constraint in arrayjs
let constraintname = "[" + (string)constraint["Constraint Name"] + "]"
let constrainttype = (string)constraint["Constraint Type"]
let checkclause = (string)constraint["CHECK_CLAUSE"]
let columns = (JArray)constraint["Columns"]
let typedesc = (string)constraint["type_desc"]
let ignoreDupKey = (bool?)constraint["ignore_dup_key"]
select (constraintname, constrainttype, checkclause, columns, typedesc, ignoreDupKey))
{
query = "ALTER TABLE " + fullTableName;
if(constrainttype == "PRIMARY KEY" || constrainttype == "UNIQUE")
{
query += " ADD Constraint " + constraintname + " " + constrainttype + " " +
typedesc + "\n(\n";
foreach(var column in columns)
{
bool isDescending = (bool)column["is_descending_key"];
query += " [" + (string)column["COLUMN_NAME"] + "] ";
if(isDescending)
query += " DESC";
query += ",\n";
}
// Replace the last , with )
query = query.Substring(0, query.LastIndexOf(',')) + "\n)";
if(ignoreDupKey.HasValue && ignoreDupKey.Value)
query += " WITH (IGNORE_DUP_KEY = ON)\n";
}
else if(constrainttype == "CHECK")
{
query += " WITH CHECK Add constraint [" + constraintname + "] " +
constrainttype + " " + checkclause;
query += ";\nALTER TABLE " + fullTableName + "Check Constraint [" +
constraintname + "];\n";
}
else
{
_error += "\r\nError creating database constraints: Unknown constraint type [" +
constrainttype + "]";
_haveErrors = true;
continue;
}
cmd.CommandText = query;
cmd.ExecuteNonQuery();
_diffmsg += "\r\n\tAdd constraint " + constraintname + " to table " +
fullTableName + "...";
}
}
}
catch(Exception e)
{
_error += "\r\nError creating constraints: " + e.Message;
_haveErrors = true;
}
}

Create Foreign Keys

We can create foreign key only at the end, so now is the time to do it:

private void CreateFkConstraints(SqlCommand cmd)
{
// Create Foreign Keys
string query = string.Empty;
try
{
foreach(var (schemaName, tableName, fullTableName, arrayjs) in
from JToken table in _fkConstraintsFromJson
let schemaName = (string)table["Table Schema"]
let tableName = (string)table["Table Name"]
let fullTableName = "[" + schemaName + "].[" + tableName + "]"
let arrayjs = (JArray)table["Constraints"]
select (schemaName, tableName, fullTableName, arrayjs))
{
foreach(var (constraintname, referencedTable, columns) in
from JToken constraint in arrayjs
let constraintname = "[" + (string)constraint["Constraint Name"] + "]"
let refschema = (string)constraint["Referenced Schema"]
let reftable = (string)constraint["Referenced Table"]
let referencedTable = "[" + refschema + "].[" + reftable + "]"
let columns = (JArray)constraint["Columns"]
select (constraintname, referencedTable, columns))
{
query = "ALTER TABLE " + fullTableName + " WITH CHECK Add constraint ";
query += constraintname + " Foreign Key(";
foreach(var column in columns)
{
query += "[" + (string)column["Column Name"] + "], ";
}
// Replace the last , with )
query = query.Substring(0, query.LastIndexOf(',')) + ")\n";
query += " references " + referencedTable + " (";
foreach(var column in columns)
{
query += "[" + (string)column["Referenced Column"] + "], ";
}
// Replace the last , with )
query = query.Substring(0, query.LastIndexOf(',')) + ");\n";
query += "ALTER TABLE " + fullTableName + " CHECK constraint [" +
constraintname + "];";
cmd.CommandText = query;
cmd.ExecuteNonQuery();
_diffmsg += "\r\n\tAdd FOREIGN KEY " + constraintname + " to table " +
fullTableName + "...";
}
}
}
catch(Exception e)
{
_error += "\r\nError creating FOREIGN KEYs: " + e.Message;
_haveErrors = true;
}
}
Next Steps
  • Part II will compare two databases and show the differences
  • Part III will change database, without losing data, such that the database will have the same structure as the one we provide to compare. There is full support for always encrypted columns. This means if you want to decrypt an always encrypted column you can do it, as well as if you want to transform a normal column into an always encrypted, you can also do it. This will be done without using PowerShell SSMS.
  • Part III will have attached the full project and some JSON database structures to play with.

References



sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Mircea Dragan Mircea Dragan is a Senior Computer Scientist with a strong mathematical background with over 18 years of hands-on experience.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2020-05-26

Comments For This Article




Tuesday, July 28, 2020 - 1:40:37 PM - Greg Robidoux Back To Top (86206)

You can find part 3 here - https://www.mssqltips.com/sqlservertip/6496/how-to-sync-two-sql-server-databases--part-3/


Tuesday, July 28, 2020 - 12:32:42 PM - Mircea Back To Top (86205)

Oscar >> Wait for part 3, you will have the config file there


Tuesday, July 28, 2020 - 4:02:14 AM - oscar Back To Top (86203)

There is no config file in your project code, whether you forget upload it, thank you very much :)















get free sql tips
agree to terms