Skip to content

Commit

Permalink
Support all the compression methods via my own IronCompress library.
Browse files Browse the repository at this point in the history
  • Loading branch information
aloneguid committed Oct 13, 2022
1 parent f421254 commit 67173d7
Show file tree
Hide file tree
Showing 31 changed files with 246 additions and 888 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:

env:
VERSION: 4.0.2
VERSION: 4.1.0
ASM_VERSION: 4.0.0

jobs:
Expand Down
2 changes: 0 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ Parquet is a de facto physical storage format in big data applications, includin
- [Row-Based API](rows.md)
- [Fast Automatic Serialisation](serialisation.md)

You can track the [amount of features we have implemented so far](features.md).

## Getting started

**Parquet.Net** is redistributed as a [NuGet package](https://www.nuget.org/packages/Parquet.Net). All the code is managed and doesn't have any native dependencies, therefore you are ready to go after referencing the package. This also means the library works on **Windows**, **Linux** and **MacOS X** (including M1).
Expand Down
24 changes: 0 additions & 24 deletions docs/features.md

This file was deleted.

78 changes: 35 additions & 43 deletions src/Parquet.Test/CompressionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,40 @@
using Parquet.Data;
using Xunit;

namespace Parquet.Test
{
public class CompressionTest : TestBase
{
[Theory]
[InlineData(CompressionMethod.None)]
[InlineData(CompressionMethod.Gzip)]
[InlineData(CompressionMethod.Snappy)]
public async Task All_compression_methods_supported_for_simple_integeres(CompressionMethod compressionMethod)
{
const int value = 5;
object actual = await WriteReadSingle(new DataField<int>("id"), value, compressionMethod);
Assert.Equal(5, (int)actual);
}
namespace Parquet.Test {
public class CompressionTest : TestBase {
[Theory]
[InlineData(CompressionMethod.None)]
[InlineData(CompressionMethod.Snappy)]
[InlineData(CompressionMethod.Gzip)]
[InlineData(CompressionMethod.Lzo)]
[InlineData(CompressionMethod.Brotli)]
[InlineData(CompressionMethod.LZ4)]
[InlineData(CompressionMethod.Zstd)]
public async Task All_compression_methods_supported_for_simple_integeres(CompressionMethod compressionMethod) {
const int value = 5;
object actual = await WriteReadSingle(new DataField<int>("id"), value, compressionMethod);
Assert.Equal(5, (int)actual);
}

[Theory]
[InlineData(CompressionMethod.None)]
[InlineData(CompressionMethod.Gzip)]
[InlineData(CompressionMethod.Snappy)]
public async Task All_compression_methods_supported_for_simple_strings(CompressionMethod compressionMethod)
{
/*
* uncompressed: length - 14, levels - 6
*
*
*/
[Theory]
[InlineData(CompressionMethod.None)]
[InlineData(CompressionMethod.Snappy)]
[InlineData(CompressionMethod.Gzip)]
[InlineData(CompressionMethod.Lzo)]
[InlineData(CompressionMethod.Brotli)]
[InlineData(CompressionMethod.LZ4)]
[InlineData(CompressionMethod.Zstd)]
public async Task All_compression_methods_supported_for_simple_strings(CompressionMethod compressionMethod) {
/*
* uncompressed: length - 14, levels - 6
*
*
*/

const string value = "five";
object actual = await WriteReadSingle(new DataField<string>("id"), value, compressionMethod);
Assert.Equal("five", actual);
}

[Theory]
[InlineData(-1)]
[InlineData(0)]
[InlineData(1)]
[InlineData(2)]
public async Task Gzip_all_levels(int level)
{
const string value = "five";
object actual = await WriteReadSingle(new DataField<string>("id"), value, CompressionMethod.Gzip, level);
Assert.Equal("five", actual);
}
}
}
const string value = "five";
object actual = await WriteReadSingle(new DataField<string>("id"), value, compressionMethod);
Assert.Equal("five", actual);
}
}
}
2 changes: 1 addition & 1 deletion src/Parquet.Test/DecimalTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void Validate_Scale_Zero_Should_Be_Allowed() {
const int precision = 1;
const int scale = 0;
var field = new DecimalDataField("field-name", precision, scale);
Assert.Equal(field.Scale, scale);
Assert.Equal(scale, field.Scale);
}

[Fact]
Expand Down
111 changes: 49 additions & 62 deletions src/Parquet.Test/NonSeekableWriterTEst.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,81 +5,68 @@
using Parquet.Data;
using Xunit;

namespace Parquet.Test
{
public class NonSeekableWriterTest
{
[Fact]
public async Task Write_multiple_row_groups_to_forward_only_stream()
{
var ms = new MemoryStream();
var forwardOnly = new WriteableNonSeekableStream(ms);

var schema = new Schema(
new DataField<int>("id"),
new DataField<string>("nonsense"));

using (ParquetWriter writer = await ParquetWriter.CreateAsync(schema, forwardOnly))
{
using (ParquetRowGroupWriter rgw = writer.CreateRowGroup())
{
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 1 }));
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "1" }));
namespace Parquet.Test {
public class NonSeekableWriterTest {
[Fact]
public async Task Write_multiple_row_groups_to_forward_only_stream() {
var ms = new MemoryStream();
var forwardOnly = new WriteableNonSeekableStream(ms);

var schema = new Schema(
new DataField<int>("id"),
new DataField<string>("nonsense"));

using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, forwardOnly)) {
using(ParquetRowGroupWriter rgw = writer.CreateRowGroup()) {
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 1 }));
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "1" }));
}

using(ParquetRowGroupWriter rgw = writer.CreateRowGroup()) {
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 2 }));
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "2" }));
}
}

using (ParquetRowGroupWriter rgw = writer.CreateRowGroup())
{
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[0], new[] { 2 }));
await rgw.WriteColumnAsync(new DataColumn((DataField)schema[1], new[] { "2" }));
}
}
ms.Position = 0;
using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) {
Assert.Equal(2, reader.RowGroupCount);

ms.Position = 0;
using (ParquetReader reader = await ParquetReader.CreateAsync(ms))
{
Assert.Equal(2, reader.RowGroupCount);
using(ParquetRowGroupReader rgr = reader.OpenRowGroupReader(0)) {
Assert.Equal(1, rgr.RowCount);

using (ParquetRowGroupReader rgr = reader.OpenRowGroupReader(0))
{
Assert.Equal(1, rgr.RowCount);
DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]);
Assert.Equal(1, column.Data.GetValue(0));
}

DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]);
Assert.Equal(1, column.Data.GetValue(0));
}
using(ParquetRowGroupReader rgr = reader.OpenRowGroupReader(1)) {
Assert.Equal(1, rgr.RowCount);

using (ParquetRowGroupReader rgr = reader.OpenRowGroupReader(1))
{
Assert.Equal(1, rgr.RowCount);
DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]);
Assert.Equal(2, column.Data.GetValue(0));

DataColumn column = await rgr.ReadColumnAsync((DataField)schema[0]);
Assert.Equal(2, column.Data.GetValue(0));
}

}
}

}
}

public class WriteableNonSeekableStream : DelegatedStream
{
public WriteableNonSeekableStream(Stream master) : base(master)
{
}
public class WriteableNonSeekableStream : DelegatedStream {
public WriteableNonSeekableStream(Stream master) : base(master) {
}

public override bool CanSeek => false;
public override bool CanSeek => false;

public override bool CanRead => true;
public override bool CanRead => true;

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotSupportedException();
}

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
}
public override long Position {
get => base.Position;
set => throw new NotSupportedException();
}
}

}
}
}
6 changes: 3 additions & 3 deletions src/Parquet.Test/Parquet.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
50 changes: 0 additions & 50 deletions src/Parquet.Test/SnappyStreamTest.cs

This file was deleted.

3 changes: 1 addition & 2 deletions src/Parquet.Test/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected async Task<Tuple<DataColumn[], Schema>> WriteReadSingleRowGroup(
}
}

protected async Task<object> WriteReadSingle(DataField field, object value, CompressionMethod compressionMethod = CompressionMethod.None, int compressionLevel = -1) {
protected async Task<object> WriteReadSingle(DataField field, object value, CompressionMethod compressionMethod = CompressionMethod.None) {
//for sanity, use disconnected streams
byte[] data;

Expand All @@ -101,7 +101,6 @@ protected async Task<object> WriteReadSingle(DataField field, object value, Comp

using(ParquetWriter writer = await ParquetWriter.CreateAsync(new Schema(field), ms)) {
writer.CompressionMethod = compressionMethod;
writer.CompressionLevel = compressionLevel;

using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
Array dataArray = Array.CreateInstance(field.ClrNullableIfHasNullsType, 1);
Expand Down
Loading

0 comments on commit 67173d7

Please sign in to comment.