From 67173d7534240f65fb4ced0372d8ef6bc4459a2b Mon Sep 17 00:00:00 2001 From: IG Date: Thu, 13 Oct 2022 10:17:12 +0100 Subject: [PATCH] Support all the compression methods via my own IronCompress library. --- .github/workflows/release.yml | 2 +- docs/README.md | 2 - docs/features.md | 24 ---- src/Parquet.Test/CompressionTest.cs | 78 +++++----- src/Parquet.Test/DecimalTypeTest.cs | 2 +- src/Parquet.Test/NonSeekableWriterTEst.cs | 111 +++++++-------- src/Parquet.Test/Parquet.Test.csproj | 6 +- src/Parquet.Test/SnappyStreamTest.cs | 50 ------- src/Parquet.Test/TestBase.cs | 3 +- src/Parquet/CompressionMethod.cs | 63 +++++--- src/Parquet/File/BytesOwner.cs | 51 ------- src/Parquet/File/Compressor.cs | 35 +++++ src/Parquet/File/Data/DataFactory.cs | 53 ------- src/Parquet/File/Data/GzipDataReader.cs | 40 ------ src/Parquet/File/Data/GzipDataWriter.cs | 40 ------ src/Parquet/File/Data/IDataReader.cs | 9 -- src/Parquet/File/Data/IDataWriter.cs | 10 -- src/Parquet/File/Data/SnappyDataReader.cs | 16 --- src/Parquet/File/Data/SnappyDataWriter.cs | 14 -- .../File/Data/UncompressedDataReader.cs | 16 --- .../File/Data/UncompressedDataWriter.cs | 12 -- src/Parquet/File/DataColumnReader.cs | 55 +++---- src/Parquet/File/DataColumnWriter.cs | 73 +++++----- src/Parquet/File/DataStreamFactory.cs | 134 ------------------ src/Parquet/File/Streams/GapStream.cs | 88 ------------ src/Parquet/File/Streams/IMarkStream.cs | 14 -- .../File/Streams/SnappyInMemoryStream.cs | 98 ------------- src/Parquet/File/ThriftFooter.cs | 14 +- src/Parquet/Parquet.csproj | 5 +- src/Parquet/ParquetRowGroupWriter.cs | 5 +- src/Parquet/ParquetWriter.cs | 11 +- 31 files changed, 246 insertions(+), 888 deletions(-) delete mode 100644 docs/features.md delete mode 100644 src/Parquet.Test/SnappyStreamTest.cs delete mode 100644 src/Parquet/File/BytesOwner.cs create mode 100644 src/Parquet/File/Compressor.cs delete mode 100644 src/Parquet/File/Data/DataFactory.cs delete mode 100644 src/Parquet/File/Data/GzipDataReader.cs delete mode 100644 src/Parquet/File/Data/GzipDataWriter.cs delete mode 100644 src/Parquet/File/Data/IDataReader.cs delete mode 100644 src/Parquet/File/Data/IDataWriter.cs delete mode 100644 src/Parquet/File/Data/SnappyDataReader.cs delete mode 100644 src/Parquet/File/Data/SnappyDataWriter.cs delete mode 100644 src/Parquet/File/Data/UncompressedDataReader.cs delete mode 100644 src/Parquet/File/Data/UncompressedDataWriter.cs delete mode 100644 src/Parquet/File/DataStreamFactory.cs delete mode 100644 src/Parquet/File/Streams/GapStream.cs delete mode 100644 src/Parquet/File/Streams/IMarkStream.cs delete mode 100644 src/Parquet/File/Streams/SnappyInMemoryStream.cs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 57256842..b326ebbf 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -4,7 +4,7 @@ on: workflow_dispatch: env: - VERSION: 4.0.2 + VERSION: 4.1.0 ASM_VERSION: 4.0.0 jobs: diff --git a/docs/README.md b/docs/README.md index 3f623d84..f1c340f7 100644 --- a/docs/README.md +++ b/docs/README.md @@ -27,8 +27,6 @@ Parquet is a de facto physical storage format in big data applications, includin - [Row-Based API](rows.md) - [Fast Automatic Serialisation](serialisation.md) -You can track the [amount of features we have implemented so far](features.md). - ## Getting started **Parquet.Net** is redistributed as a [NuGet package](https://www.nuget.org/packages/Parquet.Net). All the code is managed and doesn't have any native dependencies, therefore you are ready to go after referencing the package. This also means the library works on **Windows**, **Linux** and **MacOS X** (including M1). diff --git a/docs/features.md b/docs/features.md deleted file mode 100644 index d6e36eaa..00000000 --- a/docs/features.md +++ /dev/null @@ -1,24 +0,0 @@ -# Supported features - -We are implementing Parquet features gradually, and the table below outlines the current status. If the feature is not listed here it's not supported yet. - -|Feature|Reader|Writer| -|-------|------|------| -|Plain encoding|yes|yes| -|Bit Packed encoding|yes|yes| -|RLE/Bitpacked Hybrid encoding|yes|yes| -|Plain Dictionary encoding|yes|yes| -|Delta encoding|no|no| -|Data-length byte array encoding|no|no| -|Delta strings encoding|no|no| -|Impala dates|yes|yes| -|Date with different precision|yes|yes| -|Time period|yes|yes| -|GZIP Compression|yes|yes| -|SNAPPY Compression|yes|yes| -|Row groups|yes|yes| -|Data pages|yes|yes| -|Append to files|-|yes| -|Simple repeatable fields|yes|yes| -|Nested structures|yes|yes| -|Map types|yes|yes| diff --git a/src/Parquet.Test/CompressionTest.cs b/src/Parquet.Test/CompressionTest.cs index c0324c91..b8dd8468 100644 --- a/src/Parquet.Test/CompressionTest.cs +++ b/src/Parquet.Test/CompressionTest.cs @@ -2,48 +2,40 @@ using Parquet.Data; using Xunit; -namespace Parquet.Test -{ - public class CompressionTest : TestBase - { - [Theory] - [InlineData(CompressionMethod.None)] - [InlineData(CompressionMethod.Gzip)] - [InlineData(CompressionMethod.Snappy)] - public async Task All_compression_methods_supported_for_simple_integeres(CompressionMethod compressionMethod) - { - const int value = 5; - object actual = await WriteReadSingle(new DataField("id"), value, compressionMethod); - Assert.Equal(5, (int)actual); - } +namespace Parquet.Test { + public class CompressionTest : TestBase { + [Theory] + [InlineData(CompressionMethod.None)] + [InlineData(CompressionMethod.Snappy)] + [InlineData(CompressionMethod.Gzip)] + [InlineData(CompressionMethod.Lzo)] + [InlineData(CompressionMethod.Brotli)] + [InlineData(CompressionMethod.LZ4)] + [InlineData(CompressionMethod.Zstd)] + public async Task All_compression_methods_supported_for_simple_integeres(CompressionMethod compressionMethod) { + const int value = 5; + object actual = await WriteReadSingle(new DataField("id"), value, compressionMethod); + Assert.Equal(5, (int)actual); + } - [Theory] - [InlineData(CompressionMethod.None)] - [InlineData(CompressionMethod.Gzip)] - [InlineData(CompressionMethod.Snappy)] - public async Task All_compression_methods_supported_for_simple_strings(CompressionMethod compressionMethod) - { - /* - * uncompressed: length - 14, levels - 6 - * - * - */ + [Theory] + [InlineData(CompressionMethod.None)] + [InlineData(CompressionMethod.Snappy)] + [InlineData(CompressionMethod.Gzip)] + [InlineData(CompressionMethod.Lzo)] + [InlineData(CompressionMethod.Brotli)] + [InlineData(CompressionMethod.LZ4)] + [InlineData(CompressionMethod.Zstd)] + public async Task All_compression_methods_supported_for_simple_strings(CompressionMethod compressionMethod) { + /* + * uncompressed: length - 14, levels - 6 + * + * + */ - const string value = "five"; - object actual = await WriteReadSingle(new DataField("id"), value, compressionMethod); - Assert.Equal("five", actual); - } - - [Theory] - [InlineData(-1)] - [InlineData(0)] - [InlineData(1)] - [InlineData(2)] - public async Task Gzip_all_levels(int level) - { - const string value = "five"; - object actual = await WriteReadSingle(new DataField("id"), value, CompressionMethod.Gzip, level); - Assert.Equal("five", actual); - } - } -} + const string value = "five"; + object actual = await WriteReadSingle(new DataField("id"), value, compressionMethod); + Assert.Equal("five", actual); + } + } +} \ No newline at end of file diff --git a/src/Parquet.Test/DecimalTypeTest.cs b/src/Parquet.Test/DecimalTypeTest.cs index bc3fefb9..9d1e602f 100644 --- a/src/Parquet.Test/DecimalTypeTest.cs +++ b/src/Parquet.Test/DecimalTypeTest.cs @@ -19,7 +19,7 @@ public void Validate_Scale_Zero_Should_Be_Allowed() { const int precision = 1; const int scale = 0; var field = new DecimalDataField("field-name", precision, scale); - Assert.Equal(field.Scale, scale); + Assert.Equal(scale, field.Scale); } [Fact] diff --git a/src/Parquet.Test/NonSeekableWriterTEst.cs b/src/Parquet.Test/NonSeekableWriterTEst.cs index 574bd69c..f3031521 100644 --- a/src/Parquet.Test/NonSeekableWriterTEst.cs +++ b/src/Parquet.Test/NonSeekableWriterTEst.cs @@ -5,81 +5,68 @@ using Parquet.Data; using Xunit; -namespace Parquet.Test -{ - public class NonSeekableWriterTest - { - [Fact] - public async Task Write_multiple_row_groups_to_forward_only_stream() - { - var ms = new MemoryStream(); - var forwardOnly = new WriteableNonSeekableStream(ms); - - var schema = new Schema( - new DataField("id"), - new DataField("nonsense")); - - using (ParquetWriter writer = await ParquetWriter.CreateAsync(schema, forwardOnly)) - { - using (ParquetRowGroupWriter rgw = writer.CreateRowGroup()) - { - await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 1 })); - await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "1" })); +namespace Parquet.Test { + public class NonSeekableWriterTest { + [Fact] + public async Task Write_multiple_row_groups_to_forward_only_stream() { + var ms = new MemoryStream(); + var forwardOnly = new WriteableNonSeekableStream(ms); + + var schema = new Schema( + new DataField("id"), + new DataField("nonsense")); + + using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, forwardOnly)) { + using(ParquetRowGroupWriter rgw = writer.CreateRowGroup()) { + await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 1 })); + await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "1" })); + } + + using(ParquetRowGroupWriter rgw = writer.CreateRowGroup()) { + await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 2 })); + await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "2" })); + } } - using (ParquetRowGroupWriter rgw = writer.CreateRowGroup()) - { - await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 2 })); - await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "2" })); - } - } + ms.Position = 0; + using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) { + Assert.Equal(2, reader.RowGroupCount); - ms.Position = 0; - using (ParquetReader reader = await ParquetReader.CreateAsync(ms)) - { - Assert.Equal(2, reader.RowGroupCount); + using(ParquetRowGroupReader rgr = reader.OpenRowGroupReader(0)) { + Assert.Equal(1, rgr.RowCount); - using (ParquetRowGroupReader rgr = reader.OpenRowGroupReader(0)) - { - Assert.Equal(1, rgr.RowCount); + DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]); + Assert.Equal(1, column.Data.GetValue(0)); + } - DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]); - Assert.Equal(1, column.Data.GetValue(0)); - } + using(ParquetRowGroupReader rgr = reader.OpenRowGroupReader(1)) { + Assert.Equal(1, rgr.RowCount); - using (ParquetRowGroupReader rgr = reader.OpenRowGroupReader(1)) - { - Assert.Equal(1, rgr.RowCount); + DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]); + Assert.Equal(2, column.Data.GetValue(0)); - DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]); - Assert.Equal(2, column.Data.GetValue(0)); + } } + } - } - } - - public class WriteableNonSeekableStream : DelegatedStream - { - public WriteableNonSeekableStream(Stream master) : base(master) - { - } + public class WriteableNonSeekableStream : DelegatedStream { + public WriteableNonSeekableStream(Stream master) : base(master) { + } - public override bool CanSeek => false; + public override bool CanSeek => false; - public override bool CanRead => true; + public override bool CanRead => true; - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } + public override long Seek(long offset, SeekOrigin origin) { + throw new NotSupportedException(); + } - public override long Position - { - get => throw new NotSupportedException(); - set => throw new NotSupportedException(); - } - } + public override long Position { + get => base.Position; + set => throw new NotSupportedException(); + } + } - } + } } \ No newline at end of file diff --git a/src/Parquet.Test/Parquet.Test.csproj b/src/Parquet.Test/Parquet.Test.csproj index 51a88ac7..a9bc34a8 100644 --- a/src/Parquet.Test/Parquet.Test.csproj +++ b/src/Parquet.Test/Parquet.Test.csproj @@ -18,9 +18,9 @@ - - - + + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Parquet.Test/SnappyStreamTest.cs b/src/Parquet.Test/SnappyStreamTest.cs deleted file mode 100644 index 5d55a36b..00000000 --- a/src/Parquet.Test/SnappyStreamTest.cs +++ /dev/null @@ -1,50 +0,0 @@ -using System.IO; -using System.IO.Compression; -using NetBox.Generator; -using Parquet.File.Streams; -using Parquet.Test.Xunit; -using Xunit; - -namespace Parquet.Test -{ - public class SnappyStreamTest - { - [Theory] - [Repeat(100)] -#pragma warning disable xUnit1026 // Theory methods should use all of their parameters - public void Compress_decompress_random_byte_chunks(int index) -#pragma warning restore xUnit1026 // Theory methods should use all of their parameters - { - byte[] stage1 = RandomGenerator.GetRandomBytes(2, 1000); - byte[] stage2; - byte[] stage3; - - using (var source = new MemoryStream()) - { - using (var snappy = new SnappyInMemoryStream(source, CompressionMode.Compress)) - { - snappy.Write(stage1, 0, stage1.Length); - snappy.MarkWriteFinished(); - } - stage2 = source.ToArray(); - } - - using (var source = new MemoryStream(stage2)) - { - using (var snappy = new SnappyInMemoryStream(source, CompressionMode.Decompress)) - { - using (var ms = new MemoryStream()) - { - snappy.CopyTo(ms); - stage3 = ms.ToArray(); - } - } - } - - // validate - - Assert.Equal(stage1, stage3); - - } - } -} diff --git a/src/Parquet.Test/TestBase.cs b/src/Parquet.Test/TestBase.cs index cec1da5b..2028bd80 100644 --- a/src/Parquet.Test/TestBase.cs +++ b/src/Parquet.Test/TestBase.cs @@ -92,7 +92,7 @@ protected async Task> WriteReadSingleRowGroup( } } - protected async Task WriteReadSingle(DataField field, object value, CompressionMethod compressionMethod = CompressionMethod.None, int compressionLevel = -1) { + protected async Task WriteReadSingle(DataField field, object value, CompressionMethod compressionMethod = CompressionMethod.None) { //for sanity, use disconnected streams byte[] data; @@ -101,7 +101,6 @@ protected async Task WriteReadSingle(DataField field, object value, Comp using(ParquetWriter writer = await ParquetWriter.CreateAsync(new Schema(field), ms)) { writer.CompressionMethod = compressionMethod; - writer.CompressionLevel = compressionLevel; using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) { Array dataArray = Array.CreateInstance(field.ClrNullableIfHasNullsType, 1); diff --git a/src/Parquet/CompressionMethod.cs b/src/Parquet/CompressionMethod.cs index c242bdc1..8bf86307 100644 --- a/src/Parquet/CompressionMethod.cs +++ b/src/Parquet/CompressionMethod.cs @@ -1,23 +1,46 @@ -namespace Parquet -{ - /// - /// Parquet compression method - /// - public enum CompressionMethod - { - /// - /// No compression - /// - None, +namespace Parquet { + /// + /// Parquet compression method + /// + public enum CompressionMethod { + /// + /// No compression + /// + None = 0, - /// - /// Gzip compression - /// - Gzip, + /// + /// Snappy compression + /// + Snappy = 1, - /// - /// Snappy compression - /// - Snappy - } + /// + /// Gzip compression + /// + Gzip = 2, + + /// + /// LZO + /// + Lzo = 3, + + /// + /// Brotli + /// + Brotli = 4, + + /// + /// LZ4 + /// + LZ4 = 5, + + /// + /// ZSTD + /// + Zstd = 6, + + /// + /// LZ4 raw + /// + Lz4Raw = 7, + } } diff --git a/src/Parquet/File/BytesOwner.cs b/src/Parquet/File/BytesOwner.cs deleted file mode 100644 index 7579d24b..00000000 --- a/src/Parquet/File/BytesOwner.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; -using System.IO; - -namespace Parquet.File -{ - /// - /// Provides a back-reference to an allocator which has created the original byte array. - /// For instance, if this byte array was rented from a pool, the client won't know how to return it so - /// he will just dispose it. - /// - class BytesOwner : IDisposable - { - private readonly byte[] _bytes; //original memory buffer - private readonly int startOffset; - private readonly Action _disposeAction; - private readonly bool _dispose; - - public BytesOwner(byte[] bytes, int startOffset, Memory memory, Action disposeAction, bool dispose) - { - _bytes = bytes ?? throw new ArgumentNullException(nameof(bytes)); - this.startOffset = startOffset; - Memory = memory; - _disposeAction = disposeAction ?? throw new ArgumentNullException(nameof(disposeAction)); - _dispose = dispose; - } - - public Memory Memory { get; } - - /// - /// Creates a stream from the current bytes data. - /// - /// - /// Returned stream should be dispose before disposing current object. - /// - /// - /// Stream for bytes represented by current bytes owner instance. - /// - public Stream ToStream() - { - return new MemoryStream(_bytes, startOffset, this.Memory.Length); - } - - public void Dispose() - { - if (_dispose) - { - _disposeAction(_bytes); - } - } - } -} diff --git a/src/Parquet/File/Compressor.cs b/src/Parquet/File/Compressor.cs new file mode 100644 index 00000000..5f57b990 --- /dev/null +++ b/src/Parquet/File/Compressor.cs @@ -0,0 +1,35 @@ +using System; +using IronCompress; + +namespace Parquet.File { + static class Compressor { + private static readonly Iron _iron = new Iron(); + + public static DataBuffer Compress(CompressionMethod method, ReadOnlySpan input) { + return _iron.Compress(ToCodec(method), input); + } + + public static DataBuffer Decompress(CompressionMethod method, ReadOnlySpan input, int outLength) { + return _iron.Decompress(ToCodec(method), input, outLength); + } + + private static Codec ToCodec(CompressionMethod method) { + switch(method) { + case CompressionMethod.Snappy: + return Codec.Snappy; + case CompressionMethod.Gzip: + return Codec.Gzip; + case CompressionMethod.Lzo: + return Codec.LZO; + case CompressionMethod.Brotli: + return Codec.Brotli; + case CompressionMethod.LZ4: + return Codec.LZ4; + case CompressionMethod.Zstd: + return Codec.Zstd; + default: + throw new NotSupportedException($"{method} not supported"); + } + } + } +} diff --git a/src/Parquet/File/Data/DataFactory.cs b/src/Parquet/File/Data/DataFactory.cs deleted file mode 100644 index b59f06ba..00000000 --- a/src/Parquet/File/Data/DataFactory.cs +++ /dev/null @@ -1,53 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using Parquet.Thrift; - -namespace Parquet.File.Data -{ - static class DataFactory - { - private static readonly Dictionary> CompressionMethodToWorker = new Dictionary>() - { - { CompressionMethod.None, new KeyValuePair(new UncompressedDataWriter(), new UncompressedDataReader()) }, - { CompressionMethod.Gzip, new KeyValuePair(new GzipDataWriter(), new GzipDataReader()) }, - { CompressionMethod.Snappy, new KeyValuePair(new SnappyDataWriter(), new SnappyDataReader()) } - }; - - private static readonly Dictionary CompressionMethodToCodec = new Dictionary - { - { CompressionMethod.None, Thrift.CompressionCodec.UNCOMPRESSED }, - { CompressionMethod.Gzip, Thrift.CompressionCodec.GZIP }, - { CompressionMethod.Snappy, CompressionCodec.SNAPPY } - }; - - public static Thrift.CompressionCodec GetThriftCompression(CompressionMethod method) - { - if (!CompressionMethodToCodec.TryGetValue(method, out Thrift.CompressionCodec thriftCodec)) - throw new NotSupportedException($"codec '{method}' is not supported"); - - return thriftCodec; - } - - public static IDataWriter GetWriter(CompressionMethod method) - { - return CompressionMethodToWorker[method].Key; - } - - public static IDataReader GetReader(CompressionMethod method) - { - return CompressionMethodToWorker[method].Value; - } - - public static IDataReader GetReader(Thrift.CompressionCodec thriftCodec) - { - if (!CompressionMethodToCodec.ContainsValue(thriftCodec)) - throw new NotSupportedException($"reader for compression '{thriftCodec}' is not supported."); - - CompressionMethod method = CompressionMethodToCodec.First(kv => kv.Value == thriftCodec).Key; - - return GetReader(method); - } - } -} diff --git a/src/Parquet/File/Data/GzipDataReader.cs b/src/Parquet/File/Data/GzipDataReader.cs deleted file mode 100644 index b1b498f7..00000000 --- a/src/Parquet/File/Data/GzipDataReader.cs +++ /dev/null @@ -1,40 +0,0 @@ -using System; -using System.IO; -using System.IO.Compression; - -namespace Parquet.File.Data -{ - class GzipDataReader : IDataReader - { - public byte[] Read(Stream source, int count) - { - byte[] srcBytes = new byte[count]; - source.Read(srcBytes, 0, srcBytes.Length); - return Decompress(srcBytes); - } - - private static byte[] Decompress(byte[] source) - { - using (var sourceStream = new MemoryStream(source)) - { - using (var destinationStream = new MemoryStream()) - { - Decompress(sourceStream, destinationStream); - return destinationStream.ToArray(); - } - } - } - - private static void Decompress(Stream source, Stream destination) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (destination == null) throw new ArgumentNullException(nameof(destination)); - - using (var decompressor = new GZipStream(source, CompressionMode.Decompress, true)) - { - decompressor.CopyTo(destination); - destination.Flush(); - } - } - } -} diff --git a/src/Parquet/File/Data/GzipDataWriter.cs b/src/Parquet/File/Data/GzipDataWriter.cs deleted file mode 100644 index 72f09795..00000000 --- a/src/Parquet/File/Data/GzipDataWriter.cs +++ /dev/null @@ -1,40 +0,0 @@ -using System; -using System.IO; -using System.IO.Compression; - -namespace Parquet.File.Data -{ - class GzipDataWriter : IDataWriter - { - public void Write(byte[] buffer, Stream destination) - { - byte[] compressed = Compress(buffer); - destination.Write(compressed, 0, compressed.Length); - } - - private static byte[] Compress(byte[] source) - { - using (var sourceStream = new MemoryStream(source)) - { - using (var destinationStream = new MemoryStream()) - { - Compress(sourceStream, destinationStream); - return destinationStream.ToArray(); - } - } - } - - private static void Compress(Stream source, Stream destination) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (destination == null) throw new ArgumentNullException(nameof(destination)); - - using (var compressor = new GZipStream(destination, CompressionLevel.Optimal, true)) - { - source.CopyTo(compressor); - compressor.Flush(); - } - } - - } -} diff --git a/src/Parquet/File/Data/IDataReader.cs b/src/Parquet/File/Data/IDataReader.cs deleted file mode 100644 index 5d68f93e..00000000 --- a/src/Parquet/File/Data/IDataReader.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.IO; - -namespace Parquet.File.Data -{ - interface IDataReader - { - byte[] Read(Stream source, int count); - } -} \ No newline at end of file diff --git a/src/Parquet/File/Data/IDataWriter.cs b/src/Parquet/File/Data/IDataWriter.cs deleted file mode 100644 index 0abcd6ce..00000000 --- a/src/Parquet/File/Data/IDataWriter.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System.IO; - -namespace Parquet.File.Data -{ - //note that this may be obsolete in next major version - interface IDataWriter - { - void Write(byte[] buffer, Stream destination); - } -} diff --git a/src/Parquet/File/Data/SnappyDataReader.cs b/src/Parquet/File/Data/SnappyDataReader.cs deleted file mode 100644 index 015b6b6f..00000000 --- a/src/Parquet/File/Data/SnappyDataReader.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.IO; -using IronSnappy; - -namespace Parquet.File.Data -{ - class SnappyDataReader : IDataReader - { - public byte[] Read(Stream source, int count) - { - byte[] buffer = new byte[count]; - source.Read(buffer, 0, count); - byte[] uncompressedBytes = Snappy.Decode(buffer); - return uncompressedBytes; - } - } -} \ No newline at end of file diff --git a/src/Parquet/File/Data/SnappyDataWriter.cs b/src/Parquet/File/Data/SnappyDataWriter.cs deleted file mode 100644 index 3baf1467..00000000 --- a/src/Parquet/File/Data/SnappyDataWriter.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System.IO; -using IronSnappy; - -namespace Parquet.File.Data -{ - class SnappyDataWriter : IDataWriter - { - public void Write(byte[] buffer, Stream destination) - { - byte[] compressed = Snappy.Encode(buffer); - destination.Write(compressed, 0, buffer.Length); - } - } -} diff --git a/src/Parquet/File/Data/UncompressedDataReader.cs b/src/Parquet/File/Data/UncompressedDataReader.cs deleted file mode 100644 index 7393cb9d..00000000 --- a/src/Parquet/File/Data/UncompressedDataReader.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.IO; - -namespace Parquet.File.Data -{ - class UncompressedDataReader : IDataReader - { - public byte[] Read(Stream source, int count) - { - byte[] result = new byte[count]; - - source.Read(result, 0, count); - - return result; - } - } -} diff --git a/src/Parquet/File/Data/UncompressedDataWriter.cs b/src/Parquet/File/Data/UncompressedDataWriter.cs deleted file mode 100644 index f8eaee4e..00000000 --- a/src/Parquet/File/Data/UncompressedDataWriter.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.IO; - -namespace Parquet.File.Data -{ - class UncompressedDataWriter : IDataWriter - { - public void Write(byte[] buffer, Stream destination) - { - destination.Write(buffer, 0, buffer.Length); - } - } -} diff --git a/src/Parquet/File/DataColumnReader.cs b/src/Parquet/File/DataColumnReader.cs index 6f562b16..195c9d89 100644 --- a/src/Parquet/File/DataColumnReader.cs +++ b/src/Parquet/File/DataColumnReader.cs @@ -1,10 +1,10 @@ using System; +using System.Buffers; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using Parquet.Data; -using Parquet.File.Streams; using Parquet.File.Values; namespace Parquet.File { @@ -71,14 +71,18 @@ public async Task ReadAsync(CancellationToken cancellationToken = de //there can be only one dictionary page in column Thrift.PageHeader ph = await _thriftStream.ReadAsync(cancellationToken); - if(TryReadDictionaryPage(ph, out colData.dictionary, out colData.dictionaryOffset)) { + (bool read, Array data, int offset) = await TryReadDictionaryPage(ph); + if(read) { + colData.dictionary = data; + colData.dictionaryOffset = offset; ph = await _thriftStream.ReadAsync(cancellationToken); + } int pagesRead = 0; while(true) { - ReadDataPage(ph, colData, maxValues); + await ReadDataPage(ph, colData, maxValues); pagesRead++; @@ -130,35 +134,38 @@ public async Task ReadAsync(CancellationToken cancellationToken = de return finalColumn; } - private bool TryReadDictionaryPage(Thrift.PageHeader ph, out Array dictionary, out int dictionaryOffset) { + private async Task ReadPageData(Thrift.PageHeader ph) { + + byte[] data = ArrayPool.Shared.Rent(ph.Compressed_page_size); + await _inputStream.ReadAsync(data, 0, ph.Compressed_page_size); + + if(_thriftColumnChunk.Meta_data.Codec == Thrift.CompressionCodec.UNCOMPRESSED) { + return new IronCompress.DataBuffer(data, ph.Compressed_page_size, ArrayPool.Shared); + } + + return Compressor.Decompress((CompressionMethod)(int)_thriftColumnChunk.Meta_data.Codec, + data.AsSpan(0, ph.Compressed_page_size), + ph.Uncompressed_page_size); + } + + private async Task<(bool, Array, int)> TryReadDictionaryPage(Thrift.PageHeader ph) { if(ph.Type != Thrift.PageType.DICTIONARY_PAGE) { - dictionary = null; - dictionaryOffset = 0; - return false; + return (false, null, 0); } //Dictionary page format: the entries in the dictionary - in dictionary order - using the plain encoding. - - using(BytesOwner bytes = ReadPageData(ph)) { + using(IronCompress.DataBuffer bytes = await ReadPageData(ph)) { //todo: this is ugly, but will be removed once other parts are migrated to System.Memory - using(var ms = new MemoryStream(bytes.Memory.ToArray())) { + using(var ms = new MemoryStream(bytes.AsSpan().ToArray())) { using(var dataReader = new BinaryReader(ms)) { - dictionary = _dataTypeHandler.GetArray(ph.Dictionary_page_header.Num_values, false, false); - - dictionaryOffset = _dataTypeHandler.Read(dataReader, _thriftSchemaElement, dictionary, 0); - - return true; + Array dictionary = _dataTypeHandler.GetArray(ph.Dictionary_page_header.Num_values, false, false); + int dictionaryOffset = _dataTypeHandler.Read(dataReader, _thriftSchemaElement, dictionary, 0); + return (true, dictionary, dictionaryOffset); } } } } - private BytesOwner ReadPageData(Thrift.PageHeader pageHeader) { - return DataStreamFactory.ReadPageData( - _inputStream, _thriftColumnChunk.Meta_data.Codec, - pageHeader.Compressed_page_size, pageHeader.Uncompressed_page_size); - } - private long GetFileOffset() { //get the minimum offset, we'll just read pages in sequence @@ -172,10 +179,10 @@ private long GetFileOffset() { .Min(); } - private void ReadDataPage(Thrift.PageHeader ph, ColumnRawData cd, long maxValues) { - using(BytesOwner bytes = ReadPageData(ph)) { + private async Task ReadDataPage(Thrift.PageHeader ph, ColumnRawData cd, long maxValues) { + using(IronCompress.DataBuffer bytes = await ReadPageData(ph)) { //todo: this is ugly, but will be removed once other parts are migrated to System.Memory - using(var ms = bytes.ToStream()) { + using(var ms = new MemoryStream(bytes.AsSpan().ToArray())) { int valueCount = ph.Data_page_header.Num_values; using(var reader = new BinaryReader(ms)) { if(_maxRepetitionLevel > 0) { diff --git a/src/Parquet/File/DataColumnWriter.cs b/src/Parquet/File/DataColumnWriter.cs index 4b993e4d..47705044 100644 --- a/src/Parquet/File/DataColumnWriter.cs +++ b/src/Parquet/File/DataColumnWriter.cs @@ -6,7 +6,6 @@ using System.Threading; using System.Threading.Tasks; using Parquet.Data; -using Parquet.File.Streams; using Parquet.File.Values; namespace Parquet.File { @@ -16,7 +15,6 @@ class DataColumnWriter { private readonly ThriftFooter _footer; private readonly Thrift.SchemaElement _schemaElement; private readonly CompressionMethod _compressionMethod; - private readonly int _compressionLevel; private readonly int _rowCount; private struct PageTag { @@ -30,14 +28,12 @@ public DataColumnWriter( ThriftFooter footer, Thrift.SchemaElement schemaElement, CompressionMethod compressionMethod, - int compressionLevel, int rowCount) { _stream = stream; _thriftStream = thriftStream; _footer = footer; _schemaElement = schemaElement; _compressionMethod = compressionMethod; - _compressionLevel = compressionLevel; _rowCount = rowCount; } @@ -68,6 +64,7 @@ private async Task> WriteColumnAsync(DataColumn column, int maxRepetitionLevel, int maxDefinitionLevel, CancellationToken cancellationToken = default) { + var pages = new List(); /* @@ -76,56 +73,56 @@ private async Task> WriteColumnAsync(DataColumn column, * the write efficiency. */ - + byte[] uncompressedData; using(var ms = new MemoryStream()) { - Thrift.PageHeader dataPageHeader = _footer.CreateDataPage(column.Count); //chain streams together so we have real streaming instead of wasting undefraggable LOH memory - using(GapStream pageStream = DataStreamFactory.CreateWriter(ms, _compressionMethod, _compressionLevel, true)) { - using(var writer = new BinaryWriter(pageStream, Encoding.UTF8, true)) { - if(column.RepetitionLevels != null) { - WriteLevels(writer, column.RepetitionLevels, column.RepetitionLevels.Length, maxRepetitionLevel); - } + using(var writer = new BinaryWriter(ms, Encoding.UTF8, true)) { + if(column.RepetitionLevels != null) { + WriteLevels(writer, column.RepetitionLevels, column.RepetitionLevels.Length, maxRepetitionLevel); + } - ArrayView data = new ArrayView(column.Data, column.Offset, column.Count); + ArrayView data = new ArrayView(column.Data, column.Offset, column.Count); - if(maxDefinitionLevel > 0) { - data = column.PackDefinitions(maxDefinitionLevel, out int[] definitionLevels, out int definitionLevelsLength, out int nullCount); + if(maxDefinitionLevel > 0) { + data = column.PackDefinitions(maxDefinitionLevel, out int[] definitionLevels, out int definitionLevelsLength, out int nullCount); - //last chance to capture null count as null data is compressed now - column.Statistics.NullCount = nullCount; + //last chance to capture null count as null data is compressed now + column.Statistics.NullCount = nullCount; - try { - WriteLevels(writer, definitionLevels, definitionLevelsLength, maxDefinitionLevel); - } - finally { - if(definitionLevels != null) { - ArrayPool.Shared.Return(definitionLevels); - } - } + try { + WriteLevels(writer, definitionLevels, definitionLevelsLength, maxDefinitionLevel); } - else { - //no defitions means no nulls - column.Statistics.NullCount = 0; + finally { + if(definitionLevels != null) { + ArrayPool.Shared.Return(definitionLevels); + } } - - dataTypeHandler.Write(tse, writer, data, column.Statistics); - - writer.Flush(); + } + else { + //no defitions means no nulls + column.Statistics.NullCount = 0; } - pageStream.Flush(); //extremely important to flush the stream as some compression algorithms don't finish writing - pageStream.MarkWriteFinished(); - dataPageHeader.Uncompressed_page_size = (int)pageStream.Position; + dataTypeHandler.Write(tse, writer, data, column.Statistics); + + writer.Flush(); } - dataPageHeader.Compressed_page_size = (int)ms.Position; + uncompressedData = ms.ToArray(); + } + + using(IronCompress.DataBuffer compressedData = _compressionMethod == CompressionMethod.None + ? new IronCompress.DataBuffer(uncompressedData) + : Compressor.Compress(_compressionMethod, uncompressedData)) { + + Thrift.PageHeader dataPageHeader = _footer.CreateDataPage(column.Count); + dataPageHeader.Uncompressed_page_size = uncompressedData.Length; + dataPageHeader.Compressed_page_size = compressedData.AsSpan().Length; //write the header in dataPageHeader.Data_page_header.Statistics = column.Statistics.ToThriftStatistics(dataTypeHandler, _schemaElement); int headerSize = await _thriftStream.WriteAsync(dataPageHeader, false, cancellationToken); - ms.Position = 0; - ms.CopyTo(_stream); - + _stream.Write(compressedData); var dataTag = new PageTag { HeaderMeta = dataPageHeader, diff --git a/src/Parquet/File/DataStreamFactory.cs b/src/Parquet/File/DataStreamFactory.cs deleted file mode 100644 index c46bf94e..00000000 --- a/src/Parquet/File/DataStreamFactory.cs +++ /dev/null @@ -1,134 +0,0 @@ -using System; -using System.Buffers; -using System.Collections.Generic; -using System.IO; -using System.IO.Compression; -using IronSnappy; -using Parquet.File.Streams; - -namespace Parquet.File -{ - static class DataStreamFactory - { - private static ArrayPool BytesPool = ArrayPool.Shared; - - private static readonly Dictionary _compressionMethodToCodec = - new Dictionary - { - { CompressionMethod.None, Thrift.CompressionCodec.UNCOMPRESSED }, - { CompressionMethod.Gzip, Thrift.CompressionCodec.GZIP }, - { CompressionMethod.Snappy, Thrift.CompressionCodec.SNAPPY } - }; - - private static readonly Dictionary _codecToCompressionMethod = - new Dictionary - { - { Thrift.CompressionCodec.UNCOMPRESSED, CompressionMethod.None }, - { Thrift.CompressionCodec.GZIP, CompressionMethod.Gzip }, - { Thrift.CompressionCodec.SNAPPY, CompressionMethod.Snappy } - }; - - // this will eventually disappear once we fully migrate to System.Memory - public static GapStream CreateWriter( - Stream nakedStream, - CompressionMethod compressionMethod, int compressionLevel, - bool leaveNakedOpen) - { - Stream dest; - -/*#if !NET14 - nakedStream = new BufferedStream(nakedStream); //optimise writer performance -#endif*/ - - switch (compressionMethod) - { - case CompressionMethod.Gzip: - dest = new GZipStream(nakedStream, ToGzipCompressionLevel(compressionLevel), leaveNakedOpen); - leaveNakedOpen = false; - break; - case CompressionMethod.Snappy: - dest = new SnappyInMemoryStream(nakedStream, CompressionMode.Compress); - leaveNakedOpen = false; - break; - case CompressionMethod.None: - dest = nakedStream; - break; - default: - throw new NotImplementedException($"unknown compression method {compressionMethod}"); - } - - return new GapStream(dest, leaveOpen: leaveNakedOpen); - } - - private static CompressionLevel ToGzipCompressionLevel(int compressionLevel) - { - switch(compressionLevel) - { - case 0: - return CompressionLevel.NoCompression; - case 1: - return CompressionLevel.Fastest; - case 2: - return CompressionLevel.Optimal; - default: - return CompressionLevel.Optimal; - } - } - - public static BytesOwner ReadPageData(Stream nakedStream, Thrift.CompressionCodec compressionCodec, - int compressedLength, int uncompressedLength) - { - if (!_codecToCompressionMethod.TryGetValue(compressionCodec, out CompressionMethod compressionMethod)) - throw new NotSupportedException($"reader for compression '{compressionCodec}' is not supported."); - - int totalBytesRead = 0; - int currentBytesRead = int.MinValue; - byte[] data = BytesPool.Rent(compressedLength); - bool dataRented = true; - - // Some storage solutions (like Azure blobs) might require more than one 'Read' action to read the requested length. - while (totalBytesRead < compressedLength && currentBytesRead != 0) - { - currentBytesRead = nakedStream.Read(data, totalBytesRead, compressedLength - totalBytesRead); - totalBytesRead += currentBytesRead; - } - - if (totalBytesRead != compressedLength) - { - throw new ParquetException($"expected {compressedLength} bytes in source stream but could read only {totalBytesRead}"); - } - - switch (compressionMethod) - { - case CompressionMethod.None: - //nothing to do, original data is the raw data - break; - case CompressionMethod.Gzip: - using (var source = new MemoryStream(data, 0, compressedLength)) - { - byte[] unGzData = BytesPool.Rent(uncompressedLength); - using (var dest = new MemoryStream(unGzData, 0, uncompressedLength)) - { - using (var gz = new GZipStream(source, CompressionMode.Decompress)) - { - gz.CopyTo(dest); - } - } - BytesPool.Return(data); - data = unGzData; - } - break; - case CompressionMethod.Snappy: - byte[] uncompressed = Snappy.Decode(data.AsSpan(0, compressedLength)); - BytesPool.Return(data); - data = uncompressed; - dataRented = false; - break; - default: - throw new NotSupportedException("method: " + compressionMethod); - } - - return new BytesOwner(data, 0, data.AsMemory(0, uncompressedLength), d => BytesPool.Return(d), dataRented); - } - } -} diff --git a/src/Parquet/File/Streams/GapStream.cs b/src/Parquet/File/Streams/GapStream.cs deleted file mode 100644 index 7722b2bd..00000000 --- a/src/Parquet/File/Streams/GapStream.cs +++ /dev/null @@ -1,88 +0,0 @@ -using System; -using System.IO; - -namespace Parquet.File.Streams -{ - class GapStream : Stream, IMarkStream - { - private readonly Stream _parent; - private readonly long? _knownLength; - private readonly bool _leaveOpen; - private long _position; - - public GapStream(Stream parent, long? knownLength = null, bool leaveOpen = false) - { - _parent = parent; - _knownLength = knownLength; - _leaveOpen = leaveOpen; - } - - public override bool CanRead => _parent.CanRead; - - public override bool CanSeek => _parent.CanSeek; - - public override bool CanWrite => _parent.CanWrite; - - public override long Length => _knownLength ?? _parent.Length; - - public override long Position - { - get => _position; - set - { - _parent.Position = value; - _position = value; - } - } - - public override void Flush() - { - _parent.Flush(); - } - - public void MarkWriteFinished() - { - if(_parent is IMarkStream markStream) - { - markStream.MarkWriteFinished(); - } - } - - public override int Read(byte[] buffer, int offset, int count) - { - int read = _parent.Read(buffer, offset, count); - - _position += read; - - return read; - } - - public override long Seek(long offset, SeekOrigin origin) - { - long pos = _parent.Seek(offset, origin); - _position = pos; - return pos; - } - - public override void SetLength(long value) - { - _parent.SetLength(value); - } - - public override void Write(byte[] buffer, int offset, int count) - { - _parent.Write(buffer, offset, count); - _position += count; - } - - protected override void Dispose(bool disposing) - { - if (!_leaveOpen) - { - _parent.Dispose(); - } - - base.Dispose(disposing); - } - } -} diff --git a/src/Parquet/File/Streams/IMarkStream.cs b/src/Parquet/File/Streams/IMarkStream.cs deleted file mode 100644 index b003363f..00000000 --- a/src/Parquet/File/Streams/IMarkStream.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Parquet.File.Streams -{ - interface IMarkStream - { - /// - /// Crappy workaround to mark stream as finished for writing. To be deleted once Snappy supports streaming. - /// - void MarkWriteFinished(); - } -} diff --git a/src/Parquet/File/Streams/SnappyInMemoryStream.cs b/src/Parquet/File/Streams/SnappyInMemoryStream.cs deleted file mode 100644 index 350968c4..00000000 --- a/src/Parquet/File/Streams/SnappyInMemoryStream.cs +++ /dev/null @@ -1,98 +0,0 @@ -using System; -using System.Buffers; -using System.IO; -using System.IO.Compression; -using IronSnappy; - -namespace Parquet.File.Streams -{ - /// - /// In-memory hacky implementation of Snappy streaming as Snappy.Sharp's implementation is a work in progress - /// - class SnappyInMemoryStream : Stream, IMarkStream - { - private readonly Stream _parent; - private readonly CompressionMode _compressionMode; - private readonly MemoryStream _ms; - private bool _finishedForWriting; - - public SnappyInMemoryStream(Stream parent, CompressionMode compressionMode) - { - _parent = parent; - _compressionMode = compressionMode; - - if(compressionMode == CompressionMode.Compress) - { - _ms = new MemoryStream(); - } - else - { - _ms = DecompressFromStream(parent); - } - } - - public override bool CanRead => _compressionMode == CompressionMode.Decompress; - - public override bool CanSeek => false; - - public override bool CanWrite => _compressionMode == CompressionMode.Compress; - - public override long Length => throw new NotSupportedException(); - - public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } - - public void MarkWriteFinished() - { - if (_finishedForWriting) return; - - if(_compressionMode == CompressionMode.Compress) - { - byte[] compressed = Snappy.Encode(_ms.ToArray()); - _parent.Write(compressed, 0, compressed.Length); - } - - _finishedForWriting = true; - } - - protected override void Dispose(bool disposing) - { - Flush(); - - base.Dispose(disposing); - } - - public override int Read(byte[] buffer, int offset, int count) - { - return _ms.Read(buffer, offset, count); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - _ms.Write(buffer, offset, count); - } - - private MemoryStream DecompressFromStream(Stream source) - { - byte[] buffer = ArrayPool.Shared.Rent((int)source.Length); - source.Read(buffer, 0, (int)source.Length); - byte[] uncompressedBytes = Snappy.Decode(buffer.AsSpan(0, (int)source.Length)); - ArrayPool.Shared.Return(buffer); - return new MemoryStream(uncompressedBytes); - } - - public override void Flush() - { - - } - } -} diff --git a/src/Parquet/File/ThriftFooter.cs b/src/Parquet/File/ThriftFooter.cs index 056580a3..05855291 100644 --- a/src/Parquet/File/ThriftFooter.cs +++ b/src/Parquet/File/ThriftFooter.cs @@ -5,7 +5,6 @@ using System.Threading; using System.Threading.Tasks; using Parquet.Data; -using Parquet.File.Data; using Parquet.Thrift; namespace Parquet.File { @@ -145,7 +144,7 @@ public Thrift.RowGroup AddRowGroup() { } public Thrift.ColumnChunk CreateColumnChunk(CompressionMethod compression, Stream output, Thrift.Type columnType, List path, int valuesCount) { - Thrift.CompressionCodec codec = DataFactory.GetThriftCompression(compression); + Thrift.CompressionCodec codec = (Thrift.CompressionCodec)(int)compression; var chunk = new Thrift.ColumnChunk(); long startPos = output.Position; @@ -155,12 +154,11 @@ public Thrift.ColumnChunk CreateColumnChunk(CompressionMethod compression, Strea chunk.Meta_data.Type = columnType; chunk.Meta_data.Codec = codec; chunk.Meta_data.Data_page_offset = startPos; - chunk.Meta_data.Encodings = new List - { - Thrift.Encoding.RLE, - Thrift.Encoding.BIT_PACKED, - Thrift.Encoding.PLAIN - }; + chunk.Meta_data.Encodings = new List { + Thrift.Encoding.RLE, + Thrift.Encoding.BIT_PACKED, + Thrift.Encoding.PLAIN + }; chunk.Meta_data.Path_in_schema = path; chunk.Meta_data.Statistics = new Thrift.Statistics(); diff --git a/src/Parquet/Parquet.csproj b/src/Parquet/Parquet.csproj index 7621eccc..3d99fd4c 100644 --- a/src/Parquet/Parquet.csproj +++ b/src/Parquet/Parquet.csproj @@ -49,14 +49,15 @@ - + + diff --git a/src/Parquet/ParquetRowGroupWriter.cs b/src/Parquet/ParquetRowGroupWriter.cs index be8eceb5..701b901c 100644 --- a/src/Parquet/ParquetRowGroupWriter.cs +++ b/src/Parquet/ParquetRowGroupWriter.cs @@ -21,7 +21,6 @@ public class ParquetRowGroupWriter : IDisposable private readonly ThriftStream _thriftStream; private readonly ThriftFooter _footer; private readonly CompressionMethod _compressionMethod; - private readonly int _compressionLevel; private readonly ParquetOptions _formatOptions; private readonly Thrift.RowGroup _thriftRowGroup; private readonly long _rgStartPos; @@ -33,14 +32,12 @@ internal ParquetRowGroupWriter(Schema schema, ThriftStream thriftStream, ThriftFooter footer, CompressionMethod compressionMethod, - int compressionLevel, ParquetOptions formatOptions) { _schema = schema ?? throw new ArgumentNullException(nameof(schema)); _stream = stream ?? throw new ArgumentNullException(nameof(stream)); _thriftStream = thriftStream ?? throw new ArgumentNullException(nameof(thriftStream)); _footer = footer ?? throw new ArgumentNullException(nameof(footer)); _compressionMethod = compressionMethod; - _compressionLevel = compressionLevel; _formatOptions = formatOptions; _thriftRowGroup = _footer.AddRowGroup(); @@ -76,7 +73,7 @@ public async Task WriteColumnAsync(DataColumn column, CancellationToken cancella List path = _footer.GetPath(tse); var writer = new DataColumnWriter(_stream, _thriftStream, _footer, tse, - _compressionMethod, _compressionLevel, + _compressionMethod, (int)(RowCount ?? 0)); Thrift.ColumnChunk chunk = await writer.WriteAsync(path, column, dataTypeHandler, cancellationToken); diff --git a/src/Parquet/ParquetWriter.cs b/src/Parquet/ParquetWriter.cs index e6323d51..71ca0cb9 100644 --- a/src/Parquet/ParquetWriter.cs +++ b/src/Parquet/ParquetWriter.cs @@ -6,7 +6,6 @@ using System.Threading.Tasks; using Parquet.Data; using Parquet.File; -using Parquet.File.Streams; namespace Parquet { /// @@ -27,14 +26,8 @@ public class ParquetWriter : ParquetActor, IDisposable /// public CompressionMethod CompressionMethod { get; set; } = CompressionMethod.Snappy; - /// - /// Compression level to use, value is treated depending on compression algorithm. Defaults to -1 - /// meaning default compression level. - /// - public int CompressionLevel { get; set; } = -1; - private ParquetWriter(Schema schema, Stream output, ParquetOptions formatOptions = null, bool append = false) - : base(new GapStream(output)) { + : base(output) { if(output == null) throw new ArgumentNullException(nameof(output)); @@ -69,7 +62,7 @@ public ParquetRowGroupWriter CreateRowGroup() { _dataWritten = true; var writer = new ParquetRowGroupWriter(_schema, Stream, ThriftStream, _footer, - CompressionMethod, CompressionLevel, _formatOptions); + CompressionMethod, _formatOptions); _openedWriters.Add(writer);