Skip to content

Commit

Permalink
minimize number of streaming calls when reading and writing thrift me…
Browse files Browse the repository at this point in the history
…tadata
  • Loading branch information
aloneguid committed Apr 21, 2023
1 parent 26ea729 commit 7f92efc
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/full.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: 'Full Workflow'

env:
VERSION: 4.9.0
VERSION: 4.9.1
ASM_VERSION: 4.0.0

on:
Expand Down
5 changes: 4 additions & 1 deletion src/NetBox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ _ _ _ ____
| \ | | ___| |_| __ ) _____ __
| \| |/ _ \ __| _ \ / _ \ \/ /
| |\ | __/ |_| |_) | (_) > <
|_| \_|\___|\__|____/ \___/_/\_\ v4.1.9 by @aloneguid
|_| \_|\___|\__|____/ \___/_/\_\ v4.1.11 by @aloneguid
https://github.com/aloneguid/netbox
*/
Expand Down Expand Up @@ -824,9 +824,12 @@ public override void Write(byte[] buffer, int offset, int count)
// FILE: src/NetBox/System/StreamExtensions.cs

namespace System {
using System.Threading.Tasks;
using System.Threading;
using global::System.Collections.Generic;
using global::System.IO;
using global::System.Text;
using System.Diagnostics;

/// <summary>
/// <see cref="Stream"/> extension
Expand Down
29 changes: 15 additions & 14 deletions src/Parquet.PerfRunner/Parquet.PerfRunner.csproj
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.5" />
<PackageReference Include="ParquetSharp" Version="10.0.1" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.5" />
<PackageReference Include="ParquetSharp" Version="10.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Parquet\Parquet.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Parquet\Parquet.csproj" />
</ItemGroup>

</Project>
</Project>
1 change: 1 addition & 0 deletions src/Parquet.PerfRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@
//var c = new Classes();
//c.SetUp();
//c.Serialise();
await ParquetReader.ReadTableFromFileAsync("C:\\Users\\alone\\Downloads\\wide_parquet\\wide_parquet.parquet");
}
156 changes: 156 additions & 0 deletions src/Parquet/3rdparty/Thrift/Transport/Client/TMemoryBufferTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;


namespace Thrift.Transport.Client {
// ReSharper disable once InconsistentNaming
class TMemoryBufferTransport : TEndpointTransport {
private bool IsDisposed;
private byte[] Bytes;
private int _bytesUsed;

public TMemoryBufferTransport(TConfiguration? config, int initialCapacity = 2048)
: base(config) {
Bytes = new byte[initialCapacity];
}

public TMemoryBufferTransport(byte[] buf, TConfiguration? config)
: base(config) {
Bytes = (byte[])buf.Clone();
_bytesUsed = Bytes.Length;
UpdateKnownMessageSize(_bytesUsed);
}

public int Position { get; set; }

public int Capacity {
get {
Debug.Assert(_bytesUsed <= Bytes.Length);
return Bytes.Length;
}
set {
Array.Resize(ref Bytes, value);
_bytesUsed = value;
}
}

public int Length {
get {
Debug.Assert(_bytesUsed <= Bytes.Length);
return _bytesUsed;
}
set {
if((Bytes.Length < value) || (Bytes.Length > (10 * value)))
Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25)));
_bytesUsed = value;
}
}

public void SetLength(int value) {
Length = value;
Position = Math.Min(Position, value);
}

public override bool IsOpen => true;

public override Task OpenAsync(CancellationToken cancellationToken) {
cancellationToken.ThrowIfCancellationRequested();
return Task.CompletedTask;
}

public override void Close() {
/* do nothing */
}

public void Seek(int delta, SeekOrigin origin) {
int newPos;
switch(origin) {
case SeekOrigin.Begin:
newPos = delta;
break;
case SeekOrigin.Current:
newPos = Position + delta;
break;
case SeekOrigin.End:
newPos = _bytesUsed + delta;
break;
default:
throw new ArgumentException("Unrecognized value", nameof(origin));
}

if((0 > newPos) || (newPos > _bytesUsed))
throw new ArgumentException("Cannot seek outside of the valid range", nameof(origin));
Position = newPos;

ResetConsumedMessageSize();
CountConsumedMessageBytes(Position);
}

public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) {
int count = Math.Min(Length - Position, length);
Buffer.BlockCopy(Bytes, Position, buffer, offset, count);
Position += count;
CountConsumedMessageBytes(count);
return new ValueTask<int>(count);
}

public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) {
return WriteAsync(buffer, 0, buffer.Length, cancellationToken);
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
int free = Length - Position;
Length = Length + count - free;
Buffer.BlockCopy(buffer, offset, Bytes, Position, count);
Position += count;
return Task.CompletedTask;
}

public override Task FlushAsync(CancellationToken cancellationToken) {
cancellationToken.ThrowIfCancellationRequested();
ResetConsumedMessageSize();
return Task.CompletedTask;
}

public byte[] GetBuffer() {
byte[] retval = new byte[Length];
Buffer.BlockCopy(Bytes, 0, retval, 0, Length);
return retval;
}

internal bool TryGetBuffer(out ArraySegment<byte> bufSegment) {
bufSegment = new ArraySegment<byte>(Bytes, 0, _bytesUsed);
return true;
}

// IDisposable
protected override void Dispose(bool disposing) {
if(!IsDisposed) {
if(disposing) {
// nothing to do
}
}
IsDisposed = true;
}
}
}
18 changes: 1 addition & 17 deletions src/Parquet/Extensions/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,6 @@ public static long ReadInt64(this Stream s) {
return BitConverter.ToInt64(tmp, 0);
}

public static byte[] ReadBytesExactly(this Stream s, int count, bool allowLess = false) {
byte[] tmp = new byte[count];
int read = 0;
while(read < count) {
int r = s.Read(tmp, read, count - read);
if(r == 0)
break;
else
read += r;
}
if(read < count && !allowLess)
throw new IOException($"only {read} out of {count} bytes are available");

return tmp;
}

public static async Task<byte[]> ReadBytesExactlyAsync(this Stream s, int count) {
byte[] tmp = new byte[count];
#if NET7_0_OR_GREATER
Expand All @@ -62,7 +46,7 @@ public static async Task<byte[]> ReadBytesExactlyAsync(this Stream s, int count)

return tmp;
}

public static int ReadUnsignedVarInt(this Stream s) {
int result = 0;
int shift = 0;
Expand Down
4 changes: 1 addition & 3 deletions src/Parquet/File/DataColumnReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class DataColumnReader {
private readonly Thrift.SchemaElement? _thriftSchemaElement;
private readonly ThriftFooter _footer;
private readonly ParquetOptions _options;
private readonly ThriftStream _thriftStream;
private readonly DataColumnStatistics? _stats;

public DataColumnReader(
Expand All @@ -35,7 +34,6 @@ public DataColumnReader(

dataField.EnsureAttachedToSchema(nameof(dataField));

_thriftStream = new ThriftStream(inputStream);
_thriftSchemaElement = _footer.GetSchemaElement(_thriftColumnChunk);

// read stats as soon as possible
Expand Down Expand Up @@ -64,7 +62,7 @@ public async Task<DataColumn> ReadAsync(CancellationToken cancellationToken = de
_inputStream.Seek(fileOffset, SeekOrigin.Begin);

while(pc.ValuesRead < totalValuesInChunk) {
Thrift.PageHeader ph = await _thriftStream.ReadAsync<Thrift.PageHeader>(cancellationToken);
Thrift.PageHeader ph = await ThriftIO.ReadAsync<Thrift.PageHeader>(_inputStream, cancellationToken);

switch(ph.Type) {
case PageType.DICTIONARY_PAGE:
Expand Down
5 changes: 1 addition & 4 deletions src/Parquet/File/DataColumnWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
namespace Parquet.File {
class DataColumnWriter {
private readonly Stream _stream;
private readonly ThriftStream _thriftStream;
private readonly ThriftFooter _footer;
private readonly Thrift.SchemaElement _schemaElement;
private readonly CompressionMethod _compressionMethod;
Expand All @@ -22,14 +21,12 @@ class DataColumnWriter {

public DataColumnWriter(
Stream stream,
ThriftStream thriftStream,
ThriftFooter footer,
Thrift.SchemaElement schemaElement,
CompressionMethod compressionMethod,
ParquetOptions options,
CompressionLevel compressionLevel) {
_stream = stream;
_thriftStream = thriftStream;
_footer = footer;
_schemaElement = schemaElement;
_compressionMethod = compressionMethod;
Expand Down Expand Up @@ -75,7 +72,7 @@ private async Task CompressAndWriteAsync(
ph.Compressed_page_size = compressedData.AsSpan().Length;

//write the header in
int headerSize = await _thriftStream.WriteAsync(ph, false, cancellationToken);
int headerSize = await ThriftIO.WriteAsync(_stream, ph, cancellationToken);
#if NETSTANDARD2_0
byte[] tmp = compressedData.AsSpan().ToArray();
_stream.Write(tmp, 0, tmp.Length);
Expand Down
5 changes: 3 additions & 2 deletions src/Parquet/File/ThriftFooter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -76,8 +77,8 @@ public void Add(long totalRowCount) {
_fileMeta.Num_rows += totalRowCount;
}

public async Task<long> WriteAsync(ThriftStream thriftStream, CancellationToken cancellationToken = default) {
return await thriftStream.WriteAsync(_fileMeta, false, cancellationToken);
public async Task<long> WriteAsync(Stream s, CancellationToken cancellationToken = default) {
return await ThriftIO.WriteAsync(s, _fileMeta, cancellationToken);
}

public Thrift.SchemaElement? GetSchemaElement(Thrift.ColumnChunk columnChunk) {
Expand Down
Loading

0 comments on commit 7f92efc

Please sign in to comment.