From 7f92efc573f94b6c2352b769a173144ef2075a0a Mon Sep 17 00:00:00 2001 From: IG Date: Fri, 21 Apr 2023 11:16:17 +0100 Subject: [PATCH] minimize number of streaming calls when reading and writing thrift metadata --- .github/workflows/full.yml | 2 +- src/NetBox.cs | 5 +- .../Parquet.PerfRunner.csproj | 29 ++-- src/Parquet.PerfRunner/Program.cs | 1 + .../Client/TMemoryBufferTransport.cs | 156 ++++++++++++++++++ src/Parquet/Extensions/StreamExtensions.cs | 18 +- src/Parquet/File/DataColumnReader.cs | 4 +- src/Parquet/File/DataColumnWriter.cs | 5 +- src/Parquet/File/ThriftFooter.cs | 5 +- src/Parquet/File/ThriftStream.cs | 65 +++++--- src/Parquet/ParquetActor.cs | 9 +- src/Parquet/ParquetReader.cs | 2 +- src/Parquet/ParquetRowGroupReader.cs | 2 +- src/Parquet/ParquetRowGroupWriter.cs | 5 +- src/Parquet/ParquetWriter.cs | 4 +- 15 files changed, 233 insertions(+), 79 deletions(-) create mode 100644 src/Parquet/3rdparty/Thrift/Transport/Client/TMemoryBufferTransport.cs diff --git a/.github/workflows/full.yml b/.github/workflows/full.yml index c431a35c..7fde5307 100644 --- a/.github/workflows/full.yml +++ b/.github/workflows/full.yml @@ -1,7 +1,7 @@ name: 'Full Workflow' env: - VERSION: 4.9.0 + VERSION: 4.9.1 ASM_VERSION: 4.0.0 on: diff --git a/src/NetBox.cs b/src/NetBox.cs index e7a1c088..cda5046d 100644 --- a/src/NetBox.cs +++ b/src/NetBox.cs @@ -3,7 +3,7 @@ _ _ _ ____ | \ | | ___| |_| __ ) _____ __ | \| |/ _ \ __| _ \ / _ \ \/ / | |\ | __/ |_| |_) | (_) > < -|_| \_|\___|\__|____/ \___/_/\_\ v4.1.9 by @aloneguid +|_| \_|\___|\__|____/ \___/_/\_\ v4.1.11 by @aloneguid https://github.com/aloneguid/netbox */ @@ -824,9 +824,12 @@ public override void Write(byte[] buffer, int offset, int count) // FILE: src/NetBox/System/StreamExtensions.cs namespace System { + using System.Threading.Tasks; + using System.Threading; using global::System.Collections.Generic; using global::System.IO; using global::System.Text; + using System.Diagnostics; /// /// extension diff --git a/src/Parquet.PerfRunner/Parquet.PerfRunner.csproj b/src/Parquet.PerfRunner/Parquet.PerfRunner.csproj index 26ac86eb..201cbe66 100644 --- a/src/Parquet.PerfRunner/Parquet.PerfRunner.csproj +++ b/src/Parquet.PerfRunner/Parquet.PerfRunner.csproj @@ -1,19 +1,20 @@  - - Exe - net7.0 - enable - enable - + + Exe + netcoreapp3.1 + enable + enable + latest + - - - - + + + + - - - + + + - + \ No newline at end of file diff --git a/src/Parquet.PerfRunner/Program.cs b/src/Parquet.PerfRunner/Program.cs index afb2669e..8eb35137 100644 --- a/src/Parquet.PerfRunner/Program.cs +++ b/src/Parquet.PerfRunner/Program.cs @@ -22,4 +22,5 @@ //var c = new Classes(); //c.SetUp(); //c.Serialise(); + await ParquetReader.ReadTableFromFileAsync("C:\\Users\\alone\\Downloads\\wide_parquet\\wide_parquet.parquet"); } diff --git a/src/Parquet/3rdparty/Thrift/Transport/Client/TMemoryBufferTransport.cs b/src/Parquet/3rdparty/Thrift/Transport/Client/TMemoryBufferTransport.cs new file mode 100644 index 00000000..2053293d --- /dev/null +++ b/src/Parquet/3rdparty/Thrift/Transport/Client/TMemoryBufferTransport.cs @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + + +namespace Thrift.Transport.Client { + // ReSharper disable once InconsistentNaming + class TMemoryBufferTransport : TEndpointTransport { + private bool IsDisposed; + private byte[] Bytes; + private int _bytesUsed; + + public TMemoryBufferTransport(TConfiguration? config, int initialCapacity = 2048) + : base(config) { + Bytes = new byte[initialCapacity]; + } + + public TMemoryBufferTransport(byte[] buf, TConfiguration? config) + : base(config) { + Bytes = (byte[])buf.Clone(); + _bytesUsed = Bytes.Length; + UpdateKnownMessageSize(_bytesUsed); + } + + public int Position { get; set; } + + public int Capacity { + get { + Debug.Assert(_bytesUsed <= Bytes.Length); + return Bytes.Length; + } + set { + Array.Resize(ref Bytes, value); + _bytesUsed = value; + } + } + + public int Length { + get { + Debug.Assert(_bytesUsed <= Bytes.Length); + return _bytesUsed; + } + set { + if((Bytes.Length < value) || (Bytes.Length > (10 * value))) + Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25))); + _bytesUsed = value; + } + } + + public void SetLength(int value) { + Length = value; + Position = Math.Min(Position, value); + } + + public override bool IsOpen => true; + + public override Task OpenAsync(CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + return Task.CompletedTask; + } + + public override void Close() { + /* do nothing */ + } + + public void Seek(int delta, SeekOrigin origin) { + int newPos; + switch(origin) { + case SeekOrigin.Begin: + newPos = delta; + break; + case SeekOrigin.Current: + newPos = Position + delta; + break; + case SeekOrigin.End: + newPos = _bytesUsed + delta; + break; + default: + throw new ArgumentException("Unrecognized value", nameof(origin)); + } + + if((0 > newPos) || (newPos > _bytesUsed)) + throw new ArgumentException("Cannot seek outside of the valid range", nameof(origin)); + Position = newPos; + + ResetConsumedMessageSize(); + CountConsumedMessageBytes(Position); + } + + public override ValueTask ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) { + int count = Math.Min(Length - Position, length); + Buffer.BlockCopy(Bytes, Position, buffer, offset, count); + Position += count; + CountConsumedMessageBytes(count); + return new ValueTask(count); + } + + public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) { + return WriteAsync(buffer, 0, buffer.Length, cancellationToken); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { + int free = Length - Position; + Length = Length + count - free; + Buffer.BlockCopy(buffer, offset, Bytes, Position, count); + Position += count; + return Task.CompletedTask; + } + + public override Task FlushAsync(CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + ResetConsumedMessageSize(); + return Task.CompletedTask; + } + + public byte[] GetBuffer() { + byte[] retval = new byte[Length]; + Buffer.BlockCopy(Bytes, 0, retval, 0, Length); + return retval; + } + + internal bool TryGetBuffer(out ArraySegment bufSegment) { + bufSegment = new ArraySegment(Bytes, 0, _bytesUsed); + return true; + } + + // IDisposable + protected override void Dispose(bool disposing) { + if(!IsDisposed) { + if(disposing) { + // nothing to do + } + } + IsDisposed = true; + } + } +} \ No newline at end of file diff --git a/src/Parquet/Extensions/StreamExtensions.cs b/src/Parquet/Extensions/StreamExtensions.cs index 0d9dc8e0..47198adc 100644 --- a/src/Parquet/Extensions/StreamExtensions.cs +++ b/src/Parquet/Extensions/StreamExtensions.cs @@ -27,22 +27,6 @@ public static long ReadInt64(this Stream s) { return BitConverter.ToInt64(tmp, 0); } - public static byte[] ReadBytesExactly(this Stream s, int count, bool allowLess = false) { - byte[] tmp = new byte[count]; - int read = 0; - while(read < count) { - int r = s.Read(tmp, read, count - read); - if(r == 0) - break; - else - read += r; - } - if(read < count && !allowLess) - throw new IOException($"only {read} out of {count} bytes are available"); - - return tmp; - } - public static async Task ReadBytesExactlyAsync(this Stream s, int count) { byte[] tmp = new byte[count]; #if NET7_0_OR_GREATER @@ -62,7 +46,7 @@ public static async Task ReadBytesExactlyAsync(this Stream s, int count) return tmp; } - + public static int ReadUnsignedVarInt(this Stream s) { int result = 0; int shift = 0; diff --git a/src/Parquet/File/DataColumnReader.cs b/src/Parquet/File/DataColumnReader.cs index 41d228c1..643ce0ed 100644 --- a/src/Parquet/File/DataColumnReader.cs +++ b/src/Parquet/File/DataColumnReader.cs @@ -18,7 +18,6 @@ class DataColumnReader { private readonly Thrift.SchemaElement? _thriftSchemaElement; private readonly ThriftFooter _footer; private readonly ParquetOptions _options; - private readonly ThriftStream _thriftStream; private readonly DataColumnStatistics? _stats; public DataColumnReader( @@ -35,7 +34,6 @@ public DataColumnReader( dataField.EnsureAttachedToSchema(nameof(dataField)); - _thriftStream = new ThriftStream(inputStream); _thriftSchemaElement = _footer.GetSchemaElement(_thriftColumnChunk); // read stats as soon as possible @@ -64,7 +62,7 @@ public async Task ReadAsync(CancellationToken cancellationToken = de _inputStream.Seek(fileOffset, SeekOrigin.Begin); while(pc.ValuesRead < totalValuesInChunk) { - Thrift.PageHeader ph = await _thriftStream.ReadAsync(cancellationToken); + Thrift.PageHeader ph = await ThriftIO.ReadAsync(_inputStream, cancellationToken); switch(ph.Type) { case PageType.DICTIONARY_PAGE: diff --git a/src/Parquet/File/DataColumnWriter.cs b/src/Parquet/File/DataColumnWriter.cs index 0e1e5f1c..e930f49c 100644 --- a/src/Parquet/File/DataColumnWriter.cs +++ b/src/Parquet/File/DataColumnWriter.cs @@ -12,7 +12,6 @@ namespace Parquet.File { class DataColumnWriter { private readonly Stream _stream; - private readonly ThriftStream _thriftStream; private readonly ThriftFooter _footer; private readonly Thrift.SchemaElement _schemaElement; private readonly CompressionMethod _compressionMethod; @@ -22,14 +21,12 @@ class DataColumnWriter { public DataColumnWriter( Stream stream, - ThriftStream thriftStream, ThriftFooter footer, Thrift.SchemaElement schemaElement, CompressionMethod compressionMethod, ParquetOptions options, CompressionLevel compressionLevel) { _stream = stream; - _thriftStream = thriftStream; _footer = footer; _schemaElement = schemaElement; _compressionMethod = compressionMethod; @@ -75,7 +72,7 @@ private async Task CompressAndWriteAsync( ph.Compressed_page_size = compressedData.AsSpan().Length; //write the header in - int headerSize = await _thriftStream.WriteAsync(ph, false, cancellationToken); + int headerSize = await ThriftIO.WriteAsync(_stream, ph, cancellationToken); #if NETSTANDARD2_0 byte[] tmp = compressedData.AsSpan().ToArray(); _stream.Write(tmp, 0, tmp.Length); diff --git a/src/Parquet/File/ThriftFooter.cs b/src/Parquet/File/ThriftFooter.cs index f9142dd5..7871e9e0 100644 --- a/src/Parquet/File/ThriftFooter.cs +++ b/src/Parquet/File/ThriftFooter.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -76,8 +77,8 @@ public void Add(long totalRowCount) { _fileMeta.Num_rows += totalRowCount; } - public async Task WriteAsync(ThriftStream thriftStream, CancellationToken cancellationToken = default) { - return await thriftStream.WriteAsync(_fileMeta, false, cancellationToken); + public async Task WriteAsync(Stream s, CancellationToken cancellationToken = default) { + return await ThriftIO.WriteAsync(s, _fileMeta, cancellationToken); } public Thrift.SchemaElement? GetSchemaElement(Thrift.ColumnChunk columnChunk) { diff --git a/src/Parquet/File/ThriftStream.cs b/src/Parquet/File/ThriftStream.cs index 5f6fc3c4..0febe557 100644 --- a/src/Parquet/File/ThriftStream.cs +++ b/src/Parquet/File/ThriftStream.cs @@ -1,6 +1,9 @@ -using System.IO; +using System; +using System.Buffers; +using System.IO; using System.Threading; using System.Threading.Tasks; +using Parquet.Extensions; using Thrift.Protocol; using Thrift.Transport; using Thrift.Transport.Client; @@ -9,44 +12,60 @@ namespace Parquet.File { /// /// Utility methods to work with Thrift data in a stream /// - class ThriftStream { - private readonly Stream _s; - private readonly TProtocol _protocol; + static class ThriftIO { + /// + /// Reads typed structure from incoming stream, using pre-read optimisation. + /// + /// + /// + /// size of the thrift data in side the stream + /// + /// + public static async ValueTask ReadAsync(Stream inputStream, int size, CancellationToken cancellationToken = default) where T : TBase, new() { - public ThriftStream(Stream s) { - _s = s; - TTransport transport = new TStreamTransport(s, s, null); - _protocol = new TCompactProtocol(transport); + if(size <= 0) + throw new ArgumentException("must be positive", nameof(size)); + + var r = new T(); + byte[] buffer = await inputStream.ReadBytesExactlyAsync(size); + var transport = new TMemoryBufferTransport(buffer, null); + var proto = new TCompactProtocol(transport); + await r.ReadAsync(proto, cancellationToken); + return r; } /// - /// Reads typed structure from incoming stream + /// Reads typed structure from incoming stream, potentially making a lot of reads and is slow /// /// + /// + /// /// - public async Task ReadAsync(CancellationToken cancellationToken = default) where T : TBase, new() { - var res = new T(); - await res.ReadAsync(_protocol, cancellationToken); - return res; + public static async ValueTask ReadAsync(Stream inputStream, CancellationToken cancellationToken = default) where T : TBase, new() { + + var r = new T(); + var transport = new TStreamTransport(inputStream, inputStream, null); + var proto = new TCompactProtocol(transport); + await r.ReadAsync(proto, cancellationToken); + return r; } + /// /// Writes types structure to the destination stream /// /// + /// /// - /// When true, rewinds to the original position before writing /// /// Actual size of the object written - public async Task WriteAsync(T obj, bool rewind = false, CancellationToken cancellationToken = default) where T : TBase, new() { - _s.Flush(); - long startPos = _s.Position; - await obj.WriteAsync(_protocol, cancellationToken); - _s.Flush(); - long size = _s.Position - startPos; - if(rewind) - _s.Seek(startPos, SeekOrigin.Begin); - return (int)size; + public static async ValueTask WriteAsync(Stream outputStream, T obj, CancellationToken cancellationToken = default) where T : TBase, new() { + var transport = new TMemoryBufferTransport(null); + var proto = new TCompactProtocol(transport); + await obj.WriteAsync(proto, cancellationToken); + byte[] buffer = transport.GetBuffer(); + await outputStream.WriteAsync(buffer, 0, buffer.Length); + return buffer.Length; } } } \ No newline at end of file diff --git a/src/Parquet/ParquetActor.cs b/src/Parquet/ParquetActor.cs index 53ba12ce..ce83ad6b 100644 --- a/src/Parquet/ParquetActor.cs +++ b/src/Parquet/ParquetActor.cs @@ -18,7 +18,6 @@ public class ParquetActor { private readonly Stream _fileStream; private BinaryWriter? _binaryWriter; - private ThriftStream? _thriftStream; internal ParquetActor(Stream? fileStream) => _fileStream = fileStream ?? throw new ArgumentNullException(nameof(fileStream)); @@ -30,8 +29,6 @@ internal ParquetActor(Stream? fileStream) => internal BinaryWriter Writer => _binaryWriter ??= new BinaryWriter(_fileStream); - internal ThriftStream ThriftStream => _thriftStream ??= new ThriftStream(_fileStream); - /// /// Validates that this file is a valid parquet file by reading head and tail of it /// @@ -48,9 +45,9 @@ protected async Task ValidateFileAsync() { throw new IOException($"not a parquet file, head: {head.ToHexString()}, tail: {tail.ToHexString()}"); } - internal async Task ReadMetadataAsync(CancellationToken cancellationToken = default) { - await GoBeforeFooterAsync(); - return await ThriftStream.ReadAsync(cancellationToken); + internal async ValueTask ReadMetadataAsync(CancellationToken cancellationToken = default) { + int footerLength = await GoBeforeFooterAsync(); + return await ThriftIO.ReadAsync(_fileStream, footerLength, cancellationToken); } internal async ValueTask GoBeforeFooterAsync() { diff --git a/src/Parquet/ParquetReader.cs b/src/Parquet/ParquetReader.cs index bea37f1b..d8e22653 100644 --- a/src/Parquet/ParquetReader.cs +++ b/src/Parquet/ParquetReader.cs @@ -161,7 +161,7 @@ private void InitRowGroupReaders() { throw new InvalidOperationException("no row groups in metadata"); foreach(Thrift.RowGroup thriftRowGroup in _meta.Row_groups) { - _groupReaders.Add(new ParquetRowGroupReader(thriftRowGroup, _footer!, Stream, ThriftStream, _parquetOptions)); + _groupReaders.Add(new ParquetRowGroupReader(thriftRowGroup, _footer!, Stream, _parquetOptions)); } } diff --git a/src/Parquet/ParquetRowGroupReader.cs b/src/Parquet/ParquetRowGroupReader.cs index 45434876..b220d968 100644 --- a/src/Parquet/ParquetRowGroupReader.cs +++ b/src/Parquet/ParquetRowGroupReader.cs @@ -21,7 +21,7 @@ public class ParquetRowGroupReader : IDisposable { internal ParquetRowGroupReader( Thrift.RowGroup rowGroup, ThriftFooter footer, - Stream stream, ThriftStream thriftStream, + Stream stream, ParquetOptions? parquetOptions) { _rowGroup = rowGroup ?? throw new ArgumentNullException(nameof(rowGroup)); _footer = footer ?? throw new ArgumentNullException(nameof(footer)); diff --git a/src/Parquet/ParquetRowGroupWriter.cs b/src/Parquet/ParquetRowGroupWriter.cs index d5e063c6..883e32e1 100644 --- a/src/Parquet/ParquetRowGroupWriter.cs +++ b/src/Parquet/ParquetRowGroupWriter.cs @@ -20,7 +20,6 @@ public class ParquetRowGroupWriter : IDisposable { private readonly ParquetSchema _schema; private readonly Stream _stream; - private readonly ThriftStream _thriftStream; private readonly ThriftFooter _footer; private readonly CompressionMethod _compressionMethod; private readonly CompressionLevel _compressionLevel; @@ -31,14 +30,12 @@ public class ParquetRowGroupWriter : IDisposable internal ParquetRowGroupWriter(ParquetSchema schema, Stream stream, - ThriftStream thriftStream, ThriftFooter footer, CompressionMethod compressionMethod, ParquetOptions formatOptions, CompressionLevel compressionLevel) { _schema = schema ?? throw new ArgumentNullException(nameof(schema)); _stream = stream ?? throw new ArgumentNullException(nameof(stream)); - _thriftStream = thriftStream ?? throw new ArgumentNullException(nameof(thriftStream)); _footer = footer ?? throw new ArgumentNullException(nameof(footer)); _compressionMethod = compressionMethod; _compressionLevel = compressionLevel; @@ -74,7 +71,7 @@ public async Task WriteColumnAsync(DataColumn column, CancellationToken cancella FieldPath path = _footer.GetPath(tse); - var writer = new DataColumnWriter(_stream, _thriftStream, _footer, tse, + var writer = new DataColumnWriter(_stream, _footer, tse, _compressionMethod, _formatOptions, _compressionLevel); diff --git a/src/Parquet/ParquetWriter.cs b/src/Parquet/ParquetWriter.cs index 5c6745cd..0fce0ff4 100644 --- a/src/Parquet/ParquetWriter.cs +++ b/src/Parquet/ParquetWriter.cs @@ -72,7 +72,7 @@ public static async Task CreateAsync( public ParquetRowGroupWriter CreateRowGroup() { _dataWritten = true; - var writer = new ParquetRowGroupWriter(_schema, Stream, ThriftStream, _footer!, + var writer = new ParquetRowGroupWriter(_schema, Stream, _footer!, CompressionMethod, _formatOptions, CompressionLevel); _openedWriters.Add(writer); @@ -146,7 +146,7 @@ public void Dispose() //finalize file //long size = _footer.WriteAsync(ThriftStream).Result; - var sizeTask = Task.Run(() => _footer!.WriteAsync(ThriftStream)); + var sizeTask = Task.Run(() => _footer!.WriteAsync(Stream)); sizeTask.Wait(); long size = sizeTask.Result;