From 3cd9b5cd37dfe72af81c6fb42b26b27ee9ab4c42 Mon Sep 17 00:00:00 2001 From: IG Date: Thu, 19 Jan 2023 12:53:24 +0000 Subject: [PATCH] slight improvement in string write performance --- .../Benchmarks/DataTypes.cs | 78 ++++--------------- src/Parquet.PerfRunner/Program.cs | 2 +- src/Parquet/Data/ParquetPlainEncoder.cs | 56 ++++++++----- 3 files changed, 53 insertions(+), 83 deletions(-) diff --git a/src/Parquet.PerfRunner/Benchmarks/DataTypes.cs b/src/Parquet.PerfRunner/Benchmarks/DataTypes.cs index 0c4a9959..e35aa9b7 100644 --- a/src/Parquet.PerfRunner/Benchmarks/DataTypes.cs +++ b/src/Parquet.PerfRunner/Benchmarks/DataTypes.cs @@ -12,6 +12,8 @@ internal class DataTypes { private const int DataSize = 1000000; private Parquet.Data.DataColumn _ints; private Parquet.Data.DataColumn _nullableInts; + private DataColumn _randomStrings; + private DataColumn _repeatingStrings; private static Random random = new Random(); public static string RandomString(int length) { @@ -28,6 +30,16 @@ public DataTypes() { .Range(0, DataSize) .Select(i => i % 4 == 0 ? (int?)null : i) .ToArray()); + + _randomStrings = new DataColumn(new DataField("c"), + Enumerable.Range(0, DataSize) + .Select(i => RandomString(50)) + .ToArray()); + + _repeatingStrings = new DataColumn(new DataField("c"), + Enumerable.Range(0, DataSize) + .Select(i => i < DataSize / 2 ? "first half" : "second half") + .ToArray()); } private async Task Run(DataColumn c) { @@ -53,70 +65,8 @@ public Task NullableInts() { return Run(_nullableInts); } - public async Task SimpleIntWriteRead() { - - // allocate stream large enough to avoid re-allocations during performance test - const int l = 10000000; - var ms = new MemoryStream(l * sizeof(int) * 2); - var schema = new ParquetSchema(new DataField("id")); - var rnd = new Random(); - int[] ints = new int[l]; - for(int i = 0; i < l; i++) { - ints[i] = rnd.Next(); - } - - using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) { - writer.CompressionMethod = CompressionMethod.None; - using(ParquetRowGroupWriter g = writer.CreateRowGroup()) { - await g.WriteColumnAsync(new DataColumn((DataField)schema[0], ints)); - } - } - - ms.Position = 0; - using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) { - using(ParquetRowGroupReader g = reader.OpenRowGroupReader(0)) { - DataColumn data = await g.ReadColumnAsync((DataField)schema[0]); - } - } - } - - public async Task SimpleStringWriteRead() { - - - var col = new DataColumn(new DataField("c"), Enumerable.Range(0, 100000).Select(i => RandomString(100)).ToArray()); - var f = (DataField)col.Field; - var ms = new MemoryStream(); - var schema = new ParquetSchema(col.Field); - - using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) { - writer.CompressionMethod = CompressionMethod.None; - using(ParquetRowGroupWriter g = writer.CreateRowGroup()) { - await g.WriteColumnAsync(col); - } - } - - ms.Position = 0; - using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) { - using(ParquetRowGroupReader g = reader.OpenRowGroupReader(0)) { - DataColumn data = await g.ReadColumnAsync(f); - } - } - } - - public async Task WriteRandomStrings() { - - - var col = new DataColumn(new DataField("c"), Enumerable.Range(0, 100000).Select(i => RandomString(100)).ToArray()); - var f = (DataField)col.Field; - var ms = new MemoryStream(); - var schema = new ParquetSchema(col.Field); - - using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) { - writer.CompressionMethod = CompressionMethod.None; - using(ParquetRowGroupWriter g = writer.CreateRowGroup()) { - await g.WriteColumnAsync(col); - } - } + public Task RandomStrings() { + return Run(_randomStrings); } } } diff --git a/src/Parquet.PerfRunner/Program.cs b/src/Parquet.PerfRunner/Program.cs index 690158a5..330c7e01 100644 --- a/src/Parquet.PerfRunner/Program.cs +++ b/src/Parquet.PerfRunner/Program.cs @@ -3,4 +3,4 @@ using Parquet.PerfRunner.Benchmarks; //new VsParquetSharp().Main(); -await new DataTypes().NullableInts(); +await new DataTypes().RandomStrings(); diff --git a/src/Parquet/Data/ParquetPlainEncoder.cs b/src/Parquet/Data/ParquetPlainEncoder.cs index db81c3c8..f83b2256 100644 --- a/src/Parquet/Data/ParquetPlainEncoder.cs +++ b/src/Parquet/Data/ParquetPlainEncoder.cs @@ -18,6 +18,7 @@ static class ParquetPlainEncoder { private static readonly System.Text.Encoding E = System.Text.Encoding.UTF8; private static readonly byte[] ZeroInt32 = BitConverter.GetBytes((int)0); + private static readonly ArrayPool BytePool = ArrayPool.Shared; public static bool Encode( Array data, int offset, int count, @@ -1028,27 +1029,46 @@ public static int Decode(Stream source, Span data) { } public static void Encode(ReadOnlySpan data, Stream destination) { - foreach(string s in data) { - if(string.IsNullOrEmpty(s)) { - destination.Write(ZeroInt32, 0, ZeroInt32.Length); - } else { - // transofrm to byte array first, as we need the length of the byte buffer, not string length - // todo: this can be improved to re-pool only when needed instead of in each iteration - byte[] b = ArrayPool.Shared.Rent(E.GetByteCount(s) + sizeof(int)); - try { - int len = E.GetBytes(s, 0, s.Length, b, sizeof(int)); -#if NETSTANDARD2_1 - Array.Copy(BitConverter.GetBytes(len), b, sizeof(int)); -#else - Unsafe.As(ref b[0]) = len; -#endif - len += sizeof(int); - destination.Write(b, 0, len); + + // rent a buffer large enough not to reallocate often and not call stream write often + byte[] rb = BytePool.Rent(1024 * 10); + int rbOffset = 0; + try { + + foreach(string s in data) { + int len = string.IsNullOrEmpty(s) ? 0 : E.GetByteCount(s); + int minLen = len + sizeof(int); + int rem = rb.Length - rbOffset; + + // check we have enough space left + if(rem < minLen) { + destination.Write(rb, 0, rbOffset); // dump current buffer + rbOffset = 0; + + // do we need to reallocate for more space? + if(minLen > rb.Length) { + BytePool.Return(rb); + rb = BytePool.Rent(minLen); + } } - finally { - ArrayPool.Shared.Return(b); + + // write our data + if(len == 0) { + Array.Copy(ZeroInt32, 0, rb, rbOffset, ZeroInt32.Length); + } else { + Array.Copy(BitConverter.GetBytes(len), 0, rb, rbOffset, sizeof(int)); } + rbOffset += sizeof(int); + E.GetBytes(s, 0, s.Length, rb, rbOffset); + rbOffset += len; } + + if(rbOffset > 0) { + destination.Write(rb, 0, rbOffset); + } + + } finally { + BytePool.Return(rb); } }