As part of a recent project I’ve had to download a gzipped file of product details from a client endpoint and to load these products into a local SQL database.

Although this is easy enough to do in discrete synchronous steps as below what I really wanted to do was to do this as efficiently as possible and essentially stream the product details directly into the target SQL database.

  • Download file
  • Unzip file
  • Read file into in-memory JSON object
  • Transform enumerable of items into a DataTable
  • Insert DataTable into SQL table

Obviously some or all of the above can be done in memory but given the gzipped file is 15MB in size and it’s uncompressed version is 160MB this probably isn’t the most efficient way of doing it.

What I ended up doing was decompressing the file stream in memory, parsing the items out of this stream and then streaming them through a DataReader into SQL with a SqlBulkCopy.

SqlBulkCopy can take either a DataTable, DataRow[] or a DataReader as an input and will then insert the rows into your SQL table. Both DataTable and DataRow[] are lumpy and so would all need to be stored in memory before being handed over to SqlBulkCopy for inserting. A DataReader however, allows results to be streamed into the SQL table which is much more efficient.

The only problem with using a DataReader is that there is not one already available that allows streaming of objects, just the regular SqlDataReader.

The first task was to create this and handily I managed to find this project which provided a good starting point. The ObjectDataReader here has only implemented some of the required interface methods so I tried to fill it in as much as possible, the only method I couldn’t manage to implement was the GetData one.

ObjectDataReader.cs

public sealed class ObjectDataReader<TData> : IDataReader
{
	private class PropertyAccessor
	{
		public List<Func<TData, object>> Accessors { get; set; }
		public Dictionary<string, int> Lookup { get; set; }
		public Dictionary<int, string> ReversedLookup { get; set; }
	}

	private static readonly Lazy<PropertyAccessor> s_propertyAccessorCache =
		new Lazy<PropertyAccessor>(() =>
		{
			var propertyAccessors = typeof(TData)
			.GetProperties(BindingFlags.Instance | BindingFlags.Public)
			.Where(p => p.CanRead)
			.Select((p, i) => new
			{
				Index = i,
				Property = p,
				Accessor = CreatePropertyAccessor(p)
			})
			.ToArray();

			return new PropertyAccessor
			{
				Accessors = propertyAccessors.Select(p => p.Accessor).ToList(),
				Lookup = propertyAccessors.ToDictionary(
				p => p.Property.Name, p => p.Index, StringComparer.OrdinalIgnoreCase),
				ReversedLookup = propertyAccessors.ToDictionary(
				p => p.Index, p => p.Property.Name),
			};
		});

	private static Func<TData, object> CreatePropertyAccessor(PropertyInfo p)
	{
		var parameter = Expression.Parameter(typeof(TData), "input");
		var propertyAccess = Expression.Property(parameter, p.GetGetMethod());
		var castAsObject = Expression.TypeAs(propertyAccess, typeof(object));
		var lamda = Expression.Lambda<Func<TData, object>>(castAsObject, parameter);
		return lamda.Compile();
	}

	private IEnumerator<TData> m_dataEnumerator;

	public ObjectDataReader(IEnumerable<TData> data)
	{
		m_dataEnumerator = data.GetEnumerator();
	}

	#region IDisposable Members

	public void Dispose()
	{
		this.Dispose(true);
		GC.SuppressFinalize(this);
	}

	protected void Dispose(bool disposing)
	{
		if (disposing)
		{
			if (m_dataEnumerator != null)
			{
				m_dataEnumerator.Dispose();
				m_dataEnumerator = null;
			}
		}
	}

	#endregion

	#region IDataReader Members

	public void Close()
	{
		Dispose();
	}

	public int Depth => 1;

	public DataTable GetSchemaTable()
	{
		return null;
	}

	public bool IsClosed => m_dataEnumerator == null;

	public bool NextResult()
	{
		return false;
	}

	public bool Read()
	{
		if (IsClosed)
			throw new ObjectDisposedException(GetType().Name);
		return m_dataEnumerator.MoveNext();
	}

	public int RecordsAffected => -1;

	#endregion

	// IDisposable Members

	#region IDataRecord Members

	public int GetOrdinal(string name)
	{
		int ordinal;
		if (!s_propertyAccessorCache.Value.Lookup.TryGetValue(name, out ordinal))
			throw new InvalidOperationException("Unknown parameter name: " + name);
		return ordinal;
	}

	public object GetValue(int i)
	{
		if (m_dataEnumerator == null)
			throw new ObjectDisposedException(GetType().Name);
		return s_propertyAccessorCache.Value.Accessors[i](m_dataEnumerator.Current);
	}

	public int GetValues(Object[] values)
	{
		int i = 0;

		while (i < values.Length && i < FieldCount)
		{
			values[i] = GetValue(i);

			i++;
		}

		return i;
	}

	public bool GetBoolean(int i)
	{
		return (bool)GetValue(i);
	}

	public byte GetByte(int i)
	{
		return (byte)GetValue(i);
	}

	public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferOffset, int length)
	{
		var array = (byte[])GetValue(i);

		var count = 0;
		while ((count++) < length)
		{
			if (fieldOffset >= array.Length)
			{
				break;
			}

			buffer[bufferOffset++] = array[fieldOffset++];
		}

		return count;
	}

	public char GetChar(int i)
	{
		return (char)GetValue(i);
	}

	public long GetChars(int i, long fieldOffset, char[] buffer, int bufferOffset, int length)
	{
		var array = (char[])GetValue(i);

		var count = 0;
		while ((count++) < length)
		{
			if (fieldOffset >= array.Length)
			{
				break;
			}

			buffer[bufferOffset++] = array[fieldOffset++];
		}

		return count;
	}

	public IDataReader GetData(int i)
	{
		throw new NotImplementedException();
	}

	public String GetDataTypeName(int i)
	{
		var value = GetValue(i);
		if (value == null)
		{
			return null;
		}

		return value.GetType().Name;
	}

	public DateTime GetDateTime(int i)
	{
		return (DateTime)GetValue(i);
	}

	public decimal GetDecimal(int i)
	{
		return (decimal)GetValue(i);
	}

	public double GetDouble(int i)
	{
		return (double)GetValue(i);
	}

	public Type GetFieldType(int i)
	{
		var value = GetValue(i);
		if (value == null)
		{
			return null;
		}

		return value.GetType();
	}

	public float GetFloat(int i)
	{
		return (float)GetValue(i);
	}

	public Guid GetGuid(int i)
	{
		return (Guid)GetValue(i);
	}

	public short GetInt16(int i)
	{
		return (short)GetValue(i);
	}

	public int GetInt32(int i)
	{
		return (int)GetValue(i);
	}

	public long GetInt64(int i)
	{
		return (long)GetValue(i);
	}

	public String GetName(int i)
	{
		string name;
		if (!s_propertyAccessorCache.Value.ReversedLookup.TryGetValue(i, out name))
			throw new InvalidOperationException("Unknown parameter ordinal: " + i.ToString());
		return name;
	}

	public String GetString(int i)
	{
		return (String)GetValue(i);
	}

	public bool IsDBNull(int i)
	{
		var value = GetValue(i);
		return (value == null || Convert.IsDBNull(value));
	}

	public Object this[String name]
	{
		get
		{
			var index = GetOrdinal(name);
			return GetValue(index);
		}
	}

	public Object this[int i]
	{
		get { return GetValue(i); }
	}

	public int FieldCount => s_propertyAccessorCache.Value.Accessors.Count;

	#endregion
}

There also seems to be another version of ObjectDataReader floating around which doesn’t use lambda expressions for determining the property types. I’d assumed this would perform worse than the version that I’m using but they seem to be about the same, possibly this is because my input is not large enough to allow the difference to be discerned. There’s also an archived version in the Microsoft code gallery which I haven’t tried yet.

JSON

My input JSON is formatted as an array within an object so requires a bit more work to stream than if it was a simple array that was being returned.

{
	"items": [
		{
			"UniqueId": 123,
			"ProductName": "Product 1",
			"InStock": true
		},
		{
			"UniqueId": 456,
			"ProductName": "Product 2",
			"InStock": true
		},
		{
			"UniqueId": 789,
			"ProductName": "Product 3",
			"InStock": false
		}
	]
}

This is streamed in using the following function.

private static IEnumerable<T> DeserializeIEnumerableFromStream<T>(Stream stream)
{
	using (StreamReader streamReader = new StreamReader(stream))
	using (JsonTextReader jsonTextReader = new JsonTextReader(streamReader))
	{
		while (jsonTextReader.Read())
		{
			if (jsonTextReader.TokenType == JsonToken.StartArray)
			{
				while (jsonTextReader.Read())
				{
					if (jsonTextReader.TokenType == JsonToken.StartObject)
					{
						JObject jObject = JObject.Load(jsonTextReader);
						yield return JsonConvert.DeserializeObject<T>(jObject.ToString());
					}
				}
			}
		}
	}
}

The JsonTextReader TokenType checks ensure that we skip out the rot items object.

Program.cs

class Program
{
	static int Main(string[] args)
	{
		try
		{
			// Start!
			MainAsync(args).Wait();
			return 0;
		}
		catch
		{
			return 1;
		}
	}

	static async Task MainAsync(string[] args)
	{
		using (HttpClient client = new HttpClient(handler))
		using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, new Uri("http://files.MYSOURCE.com/PRODUCTS.json.gz")))
		using (HttpResponseMessage response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
		using (GZipStream stream = new GZipStream(await response.Content.ReadAsStreamAsync(), CompressionMode.Decompress))
		{
			using (SqlConnection connection = new SqlConnection("Server=MYSERVER;Database=MYDATABASE;Trusted_Connection=True;"))
			{
				await connection.OpenAsync();
				DataTable table = new DataTable();

				using (SqlDataAdapter adapter = new SqlDataAdapter("select top 0 * from ProductItem", connection))
				{
					adapter.FillSchema(table, SchemaType.Source);
				}

				using (SqlBulkCopy bulk = new SqlBulkCopy(connection))
				{
					bulk.DestinationTableName = table.TableName;

					using (ObjectDataReader<ProductItem> objectDataReader = new ObjectDataReader<ProductItem>(DeserializeIEnumerableFromStream<ProductItem>(stream)))
					{
						await bulk.WriteToServerAsync(objectDataReader);
					}
				}
			}
		}
	}
}

Enhancements

The above Program.cs is just an example but it manages to load 88k product records into SQL in 25-28s which feels like a reasonable speed.

The main improvement I’d like to make to this would be to to allow the ObjectDataReader to use the new IAsyncEnumerable as an input type rather than using IEnumerable though this seems to require recreating the SqlBulkCopy class as well which I haven’t had time to do yet.


Leave a Reply

Your email address will not be published. Required fields are marked *


Fatal error: Uncaught GuzzleHttp\Exception\ClientException: Client error: `POST https://dc.services.visualstudio.com/v2/track` resulted in a `400 Invalid instrumentation key` response: {"itemsReceived":1,"itemsAccepted":0,"errors":[{"index":0,"statusCode":400,"message":"Invalid instrumentation key"}]} in D:\home\site\wwwroot\wp-content\plugins\application-insights\vendor\guzzlehttp\guzzle\src\Exception\RequestException.php:113 Stack trace: #0 D:\home\site\wwwroot\wp-content\plugins\application-insights\vendor\guzzlehttp\guzzle\src\Middleware.php(66): GuzzleHttp\Exception\RequestException::create(Object(GuzzleHttp\Psr7\Request), Object(GuzzleHttp\Psr7\Response)) #1 D:\home\site\wwwroot\wp-content\plugins\application-insights\vendor\guzzlehttp\promises\src\Promise.php(203): GuzzleHttp\Middleware::GuzzleHttp\{closure}(Object(GuzzleHttp\Psr7\Response)) #2 D:\home\site\wwwroot\wp-content\plugins\application-insights\vendor\guzzlehttp\promises\src\Promise.php(156): GuzzleHttp\Promise\Promise::callHandler(1, Object(GuzzleHttp\P in D:\home\site\wwwroot\wp-content\plugins\application-insights\vendor\guzzlehttp\guzzle\src\Exception\RequestException.php on line 113