Azure Data Warehouse has a nice feature where it can access GZipped file blobs directly from Azure Storage as linked tables. The below code is for downloading zipped files from an SFTP site, extracting them, gzipping them and then uploading them to Azure Storage. This all takes place in memory so nothing is saved to disk, it feels like there should be a more elegant way of doing this by just transforming the stream as it goes rather than creating copies but this works for now and hopefully isn’t too resource intensive.

public async Task TransferFile(SftpFile sftpFile)
{
	_logger.LogInformation("Connecting to SFTP");
	string keyPath = Directory.GetParent(AppContext.BaseDirectory).FullName + sftpConfig.PrivateKeyPath;
	PrivateKeyFile privateKeyFile = new PrivateKeyFile(File.OpenRead(keyPath), sftpConfig.Password);
	PrivateKeyConnectionInfo privateKeyConnectionInfo = new PrivateKeyConnectionInfo(sftpConfig.Host, sftpConfig.UserName, privateKeyFile);

	_logger.LogInformation("Reading SFTP file into memory");
	Stream remoteFileMemoryStream = new MemoryStream();
	using (SftpClient sftp = new SftpClient(privateKeyConnectionInfo))
	{
		sftp.Connect();
		sftp.OpenRead(sftpFile.FullName).CopyTo(remoteFileMemoryStream);
		sftp.Disconnect();
	}

	_logger.LogInformation("Decompressing file");
	ZipArchive zipArchive = new ZipArchive(remoteFileMemoryStream);
	ZipArchiveEntry zipArchiveEntry = zipArchive.Entries.First();

	_logger.LogInformation("Connecting to Azure Storage");
	// Retrieve a reference to a container.
	CloudBlobContainer container = new CloudBlobContainer(new Uri(azureStorageConfig.SasUrl));

	// Retrieve reference to a blob
	CloudBlockBlob blockBlob = container.GetBlockBlobReference(Program.processingDateTime.ToString("yyyyMMddHHmmss") + "/" + sftpFile.Name.Replace(".zip", ".gz"));

	BlobRequestOptions blockBlobOptions = new BlobRequestOptions();
	blockBlobOptions.ParallelOperationThreadCount = azureStorageConfig.ParallelOperationThreadCount;
	blockBlobOptions.SingleBlobUploadThresholdInBytes = azureStorageConfig.SingleBlobUploadThresholdInBytes;

	using (Stream blobStream = new MemoryStream())
	{
		using (GZipStream gZipStream = new GZipStream(blobStream, CompressionMode.Compress, true))
		{
			_logger.LogInformation("GZipping file");
			await zipArchiveEntry.Open().CopyToAsync(gZipStream);
			gZipStream.Close();
			blobStream.Seek(0, SeekOrigin.Begin);
			_logger.LogInformation("Uploading file");
			await blockBlob.UploadFromStreamAsync(blobStream, null, blockBlobOptions, new OperationContext());
		}
	}
}
	

0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *