Skip to content

Commit

Permalink
slight improvement in string write performance
Browse files Browse the repository at this point in the history
  • Loading branch information
aloneguid committed Jan 19, 2023
1 parent f7ea6fc commit 3cd9b5c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 83 deletions.
78 changes: 14 additions & 64 deletions src/Parquet.PerfRunner/Benchmarks/DataTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ internal class DataTypes {
private const int DataSize = 1000000;
private Parquet.Data.DataColumn _ints;
private Parquet.Data.DataColumn _nullableInts;
private DataColumn _randomStrings;
private DataColumn _repeatingStrings;

private static Random random = new Random();
public static string RandomString(int length) {
Expand All @@ -28,6 +30,16 @@ public DataTypes() {
.Range(0, DataSize)
.Select(i => i % 4 == 0 ? (int?)null : i)
.ToArray());

_randomStrings = new DataColumn(new DataField<string>("c"),
Enumerable.Range(0, DataSize)
.Select(i => RandomString(50))
.ToArray());

_repeatingStrings = new DataColumn(new DataField<string>("c"),
Enumerable.Range(0, DataSize)
.Select(i => i < DataSize / 2 ? "first half" : "second half")
.ToArray());
}

private async Task Run(DataColumn c) {
Expand All @@ -53,70 +65,8 @@ public Task NullableInts() {
return Run(_nullableInts);
}

public async Task SimpleIntWriteRead() {

// allocate stream large enough to avoid re-allocations during performance test
const int l = 10000000;
var ms = new MemoryStream(l * sizeof(int) * 2);
var schema = new ParquetSchema(new DataField<int>("id"));
var rnd = new Random();
int[] ints = new int[l];
for(int i = 0; i < l; i++) {
ints[i] = rnd.Next();
}

using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) {
writer.CompressionMethod = CompressionMethod.None;
using(ParquetRowGroupWriter g = writer.CreateRowGroup()) {
await g.WriteColumnAsync(new DataColumn((DataField)schema[0], ints));
}
}

ms.Position = 0;
using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) {
using(ParquetRowGroupReader g = reader.OpenRowGroupReader(0)) {
DataColumn data = await g.ReadColumnAsync((DataField)schema[0]);
}
}
}

public async Task SimpleStringWriteRead() {


var col = new DataColumn(new DataField<string>("c"), Enumerable.Range(0, 100000).Select(i => RandomString(100)).ToArray());
var f = (DataField)col.Field;
var ms = new MemoryStream();
var schema = new ParquetSchema(col.Field);

using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) {
writer.CompressionMethod = CompressionMethod.None;
using(ParquetRowGroupWriter g = writer.CreateRowGroup()) {
await g.WriteColumnAsync(col);
}
}

ms.Position = 0;
using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) {
using(ParquetRowGroupReader g = reader.OpenRowGroupReader(0)) {
DataColumn data = await g.ReadColumnAsync(f);
}
}
}

public async Task WriteRandomStrings() {


var col = new DataColumn(new DataField<string>("c"), Enumerable.Range(0, 100000).Select(i => RandomString(100)).ToArray());
var f = (DataField)col.Field;
var ms = new MemoryStream();
var schema = new ParquetSchema(col.Field);

using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) {
writer.CompressionMethod = CompressionMethod.None;
using(ParquetRowGroupWriter g = writer.CreateRowGroup()) {
await g.WriteColumnAsync(col);
}
}
public Task RandomStrings() {
return Run(_randomStrings);
}
}
}
2 changes: 1 addition & 1 deletion src/Parquet.PerfRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
using Parquet.PerfRunner.Benchmarks;

//new VsParquetSharp().Main();
await new DataTypes().NullableInts();
await new DataTypes().RandomStrings();
56 changes: 38 additions & 18 deletions src/Parquet/Data/ParquetPlainEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ static class ParquetPlainEncoder {

private static readonly System.Text.Encoding E = System.Text.Encoding.UTF8;
private static readonly byte[] ZeroInt32 = BitConverter.GetBytes((int)0);
private static readonly ArrayPool<byte> BytePool = ArrayPool<byte>.Shared;

public static bool Encode(
Array data, int offset, int count,
Expand Down Expand Up @@ -1028,27 +1029,46 @@ public static int Decode(Stream source, Span<Interval> data) {
}

public static void Encode(ReadOnlySpan<string> data, Stream destination) {
foreach(string s in data) {
if(string.IsNullOrEmpty(s)) {
destination.Write(ZeroInt32, 0, ZeroInt32.Length);
} else {
// transofrm to byte array first, as we need the length of the byte buffer, not string length
// todo: this can be improved to re-pool only when needed instead of in each iteration
byte[] b = ArrayPool<byte>.Shared.Rent(E.GetByteCount(s) + sizeof(int));
try {
int len = E.GetBytes(s, 0, s.Length, b, sizeof(int));
#if NETSTANDARD2_1
Array.Copy(BitConverter.GetBytes(len), b, sizeof(int));
#else
Unsafe.As<byte, int>(ref b[0]) = len;
#endif
len += sizeof(int);
destination.Write(b, 0, len);

// rent a buffer large enough not to reallocate often and not call stream write often
byte[] rb = BytePool.Rent(1024 * 10);
int rbOffset = 0;
try {

foreach(string s in data) {
int len = string.IsNullOrEmpty(s) ? 0 : E.GetByteCount(s);
int minLen = len + sizeof(int);
int rem = rb.Length - rbOffset;

// check we have enough space left
if(rem < minLen) {
destination.Write(rb, 0, rbOffset); // dump current buffer
rbOffset = 0;

// do we need to reallocate for more space?
if(minLen > rb.Length) {
BytePool.Return(rb);
rb = BytePool.Rent(minLen);
}
}
finally {
ArrayPool<byte>.Shared.Return(b);

// write our data
if(len == 0) {
Array.Copy(ZeroInt32, 0, rb, rbOffset, ZeroInt32.Length);
} else {
Array.Copy(BitConverter.GetBytes(len), 0, rb, rbOffset, sizeof(int));
}
rbOffset += sizeof(int);
E.GetBytes(s, 0, s.Length, rb, rbOffset);
rbOffset += len;
}

if(rbOffset > 0) {
destination.Write(rb, 0, rbOffset);
}

} finally {
BytePool.Return(rb);
}
}

Expand Down

0 comments on commit 3cd9b5c

Please sign in to comment.