Thread Safe DataTable Inserts and SqlBulkCopy

As part of a project where I was running multiple threads making API calls like this I was making a stored procedure call to log the results to an SQL database for each result returned. This worked fine when a low number of threads was used but then started to experience long delays due to the sheer number of connections being opened and closed to the SQL server and the number of rows being written. In order to get around this I decided to store the results returned in memory and then do a single bulk insert when all the data had been gathered, although it’s not the best idea to store a large amount of data in memory before comitting it the amount of data returned wasn’t enough to cause any problems.

I did however experience problems inserting rows into my in-memory DataTable which I was using the returned data before the bulk insert as it turns out DataTable is not thread safe and I was getting DataTable internal index is corrupted errors frequently when trying to add rows. I got around this by having each thread take an exclusive lock on the DataTable when adding a row which isn’t the optimal solution but was good enough for my use case.

With the project based on that linked to in my prior post above I did the following to store the returned data in memory and do a single bulk insert into SQL after the data had been gathered.

Program

Prepare a DataSet with all desired DataTables, disable constraints from those tables to allow faster inserting, fire up threads to insert into them and then finally insert and merge the results.


class Program
{
	public static IConfigurationRoot configuration;
	public static DataSet dataSet;

	static void Main(string[] args)
	{
		MainAsync(args).Wait();
	}

	static async Task MainAsync(string[] args)
	{
		// Create service collection
		ServiceCollection serviceCollection = new ServiceCollection();
		ConfigureServices(serviceCollection);

		// Create service provider
		IServiceProvider serviceProvider = serviceCollection.BuildServiceProvider();

		// Get desired product IDs
		DateTime processingDateTime = DateTime.Now;

		List<string> productIds = new List<string>();

		productIds = await Common.GetProductIds(configuration.GetConnectionString("DataConnection"));

		// Create dataset
		await Common.TruncateLoadingTables(configuration.GetConnectionString("DataConnection"));
		List<string> loadingTables = configuration.GetSection("LoadingTables").Get<List<string>>();
		dataSet = Common.GetDataSet(loadingTables, configuration.GetConnectionString("DataConnection"));

		// Prepare datatables for loading
		foreach (DataTable table in dataSet.Tables)
		{
			table.BeginLoadData();
		}

		// Create a block with an asynchronous action
		var block = new ActionBlock<string>(
			async x => await serviceProvider.GetService<App>().Run(x),
			new ExecutionDataflowBlockOptions
			{
				BoundedCapacity = int.Parse(configuration["Threading:BoundedCapacity"]), // Cap the item count
				MaxDegreeOfParallelism = int.Parse(configuration["Threading:MaxDegreeOfParallelism"])
			});

		// Add items to the block and asynchronously wait if BoundedCapacity is reached
		foreach (string productId in productIds.Distinct())
		{
			await block.SendAsync(productId);
		}

		block.Complete();
		await block.Completion;

		// Accept changes in datatables
		foreach (DataTable table in dataSet.Tables)
		{
			table.EndLoadData();
			table.AcceptChanges();
		}

		// Insert into loading tables
		await Common.BulkCopyDataSetToSql(dataSet, configuration.GetConnectionString("DataConnection"));
		await Common.MergeData(configuration.GetConnectionString("DataConnection"));
	}
}

Common


public static DataSet GetDataSet(List<string> tableNames, string connectionString)
{
	DataSet dataSet = new DataSet();

	foreach (string tableName in tableNames)
	{
		using (SqlConnection connection = new SqlConnection(connectionString))
		{
			using (SqlCommand command = new SqlCommand("select * from " + tableName))
			{
				command.Connection = connection;

				connection.Open();

				using (SqlDataAdapter adapter = new SqlDataAdapter(command))
				{
					DataTable dataTable = new DataTable();
					dataTable.TableName = tableName;
					adapter.FillSchema(dataTable, SchemaType.Source);
					dataSet.Tables.Add(dataTable);
				}

				connection.Close();
			}
		}
	}

	return dataSet;
}

public static async Task BulkCopyDataSetToSql(DataSet dataSet, string connectionString)
{
	using (SqlConnection connection = new SqlConnection(connectionString))
	{
		foreach (DataTable table in dataSet.Tables)
		{
			using (SqlBulkCopy copy = new SqlBulkCopy(connection))
			{
				copy.DestinationTableName = table.TableName;

				foreach (DataColumn column in table.Columns)
				{
					copy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(column.ColumnName, column.ColumnName));
				}

				connection.Open();
				await copy.WriteToServerAsync(table);
				connection.Close();
			}
		}
	}
}

ProductService

Obtain a lock on the desired global DataTable and add a row.


public static void SetProduct(Product productDetails)
{
	string tableName = "loading__product";

	lock(Program.dataSet.Tables[tableName])
	{
		DataRow row = Program.dataSet.Tables[tableName].NewRow();

		row["id"] = productDetails.Data.Id;
		row["name"] = productDetails.Data.Name;

		Program.dataSet.Tables[tableName].Rows.Add(row);
	}
}

Leave a Reply

Your email address will not be published. Required fields are marked *