Similar to how I’m uploading data to an SFTP the below example streams a SQL table directly to a Google Cloud Storage object. Full code for this is here.

public static async IAsyncEnumerable<DataRow> GetSqlRowsAsync(string schema, string tableName)
{
	SqlConnection _sqlConnection = new SqlConnection(_sqlConnectionString);

	using (SqlCommand command = new SqlCommand("select * from " + schema + "." + tableName))
	{
		command.Connection = _sqlConnection;
		await _sqlConnection.OpenAsync();

		DataTable table = new DataTable();

		// Get schema of SQL table
		using (SqlDataAdapter adapter = new SqlDataAdapter(command))
		{
			table.TableName = tableName;
			adapter.FillSchema(table, SchemaType.Source);
		}

		// Return rows as they're read
		using (SqlDataReader reader = await command.ExecuteReaderAsync())
		{
			while (await reader.ReadAsync())
			{
				DataRow row = table.NewRow();

				row.Table.Columns
					.Cast<DataColumn>()
					.ToList()
					.ForEach(x => row[x] = reader.GetValue(x.Ordinal));

				yield return row;
			}
		}

		_sqlConnection.Close();
	}
}

Usage

Here the StreamWriter needs to be flushed before the upload to GCM is initiated as otherwise the StorageClient will finish writing before the stream is complete and the resulting file will be truncated.

GoogleCredential googleCredential = GoogleCredential.FromFile(_googleCredentialPath);
StorageClient storageClient = StorageClient.Create(googleCredential);

using (var memoryStream = new MemoryStream())
using (var streamWriter = new StreamWriter(memoryStream))
using (var csvWriter = new CsvWriter(streamWriter))
{
	// Quote all fields
	csvWriter.Configuration.ShouldQuote = (field, context) => true;

	bool writeHeaders = true;

	await foreach (DataRow row in GetSqlRowsAsync(_schema, _table))
	{
		// Write header row
		if (writeHeaders)
		{
			foreach (DataColumn column in row.Table.Columns)
			{
				csvWriter.WriteField(column.ColumnName);
			}

			await csvWriter.NextRecordAsync();
			writeHeaders = false;
		}

		// Write data rows
		for (var i = 0; i < row.ItemArray.Length; i++)
		{
			csvWriter.WriteField(row[i]);
		}

		await csvWriter.NextRecordAsync();
	}

	await streamWriter.FlushAsync();
	await storageClient.UploadObjectAsync(_cloudStorageBucketName, _cloudStorageObjectName, "text/csv", memoryStream);
}

0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *