Similar to how I’m uploading data to an SFTP the below example streams a SQL table directly to a Google Cloud Storage object. Full code for this is here.
public static async IAsyncEnumerable<DataRow> GetSqlRowsAsync(string schema, string tableName)
{
SqlConnection _sqlConnection = new SqlConnection(_sqlConnectionString);
using (SqlCommand command = new SqlCommand("select * from " + schema + "." + tableName))
{
command.Connection = _sqlConnection;
await _sqlConnection.OpenAsync();
DataTable table = new DataTable();
// Get schema of SQL table
using (SqlDataAdapter adapter = new SqlDataAdapter(command))
{
table.TableName = tableName;
adapter.FillSchema(table, SchemaType.Source);
}
// Return rows as they're read
using (SqlDataReader reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
DataRow row = table.NewRow();
row.Table.Columns
.Cast<DataColumn>()
.ToList()
.ForEach(x => row[x] = reader.GetValue(x.Ordinal));
yield return row;
}
}
_sqlConnection.Close();
}
}
Usage
Here the StreamWriter needs to be flushed before the upload to GCM is initiated as otherwise the StorageClient will finish writing before the stream is complete and the resulting file will be truncated.
GoogleCredential googleCredential = GoogleCredential.FromFile(_googleCredentialPath);
StorageClient storageClient = StorageClient.Create(googleCredential);
using (var memoryStream = new MemoryStream())
using (var streamWriter = new StreamWriter(memoryStream))
using (var csvWriter = new CsvWriter(streamWriter))
{
// Quote all fields
csvWriter.Configuration.ShouldQuote = (field, context) => true;
bool writeHeaders = true;
await foreach (DataRow row in GetSqlRowsAsync(_schema, _table))
{
// Write header row
if (writeHeaders)
{
foreach (DataColumn column in row.Table.Columns)
{
csvWriter.WriteField(column.ColumnName);
}
await csvWriter.NextRecordAsync();
writeHeaders = false;
}
// Write data rows
for (var i = 0; i < row.ItemArray.Length; i++)
{
csvWriter.WriteField(row[i]);
}
await csvWriter.NextRecordAsync();
}
await streamWriter.FlushAsync();
await storageClient.UploadObjectAsync(_cloudStorageBucketName, _cloudStorageObjectName, "text/csv", memoryStream);
}
0 Comments