From 26ea729ac32de27b43f25ae08f9bb2f5a6efdfb9 Mon Sep 17 00:00:00 2001 From: Ivan G Date: Fri, 21 Apr 2023 09:08:52 +0100 Subject: [PATCH] Library-Wide Schema improvements (#308) - low-level: full logical types support in Parquet Thrift schema - class serializer supports custom attributes to customize serialisation of dates, times, and decimals. - added support for .NET 6 DateOnly type. --- .github/workflows/full.yml | 2 +- docs/serialisation.md | 35 ++ src/Parquet.PerfRunner/Program.cs | 7 +- .../File/Values/Primitives/BigDecimalTest.cs | 24 +- src/Parquet.Test/Schema/SchemaTest.cs | 26 - .../Serialisation/SchemaReflectorTest.cs | 74 +++ src/Parquet.Test/TestBase.cs | 8 +- src/Parquet.Test/Types/EndToEndTypeTest.cs | 18 +- src/Parquet/Data/DecimalFormatDefaults.cs | 1 + src/Parquet/Encodings/ParquetPlainEncoder.cs | 57 +++ src/Parquet/Encodings/SchemaEncoder.cs | 483 +++++++++--------- src/Parquet/Extensions/OtherExtensions.cs | 7 + src/Parquet/Extensions/SpanExtensions.cs | 13 + src/Parquet/File/ThriftFooter.cs | 6 +- src/Parquet/ParquetOptions.cs | 8 + src/Parquet/Schema/TimeSpanDataField.cs | 7 +- .../Attributes/ParquetDecimalAttribute.cs | 31 ++ .../ParquetMicroSecondsTimeAttribute.cs | 11 + .../Attributes/ParquetTimestampAttribute.cs | 12 + .../Serialization/ParquetSerializer.cs | 8 +- .../Serialization/ParquetSerializerOptions.cs | 5 + src/Parquet/Serialization/TypeExtensions.cs | 33 +- 22 files changed, 576 insertions(+), 300 deletions(-) create mode 100644 src/Parquet/Serialization/Attributes/ParquetDecimalAttribute.cs create mode 100644 src/Parquet/Serialization/Attributes/ParquetMicroSecondsTimeAttribute.cs create mode 100644 src/Parquet/Serialization/Attributes/ParquetTimestampAttribute.cs diff --git a/.github/workflows/full.yml b/.github/workflows/full.yml index 53b1b737..c431a35c 100644 --- a/.github/workflows/full.yml +++ b/.github/workflows/full.yml @@ -1,7 +1,7 @@ name: 'Full Workflow' env: - VERSION: 4.8.1 + VERSION: 4.9.0 ASM_VERSION: 4.0.0 on: diff --git a/docs/serialisation.md b/docs/serialisation.md index 5aad1b64..2df47e9d 100644 --- a/docs/serialisation.md +++ b/docs/serialisation.md @@ -53,6 +53,41 @@ Serialisation tries to fit into C# ecosystem like a ninja 🥷, including custom - [`JsonIgnore`](https://learn.microsoft.com/en-us/dotnet/api/system.text.json.serialization.jsonignoreattribute?view=net-7.0) - ignores property when reading or writing. - [`JsonPropertyOrder`](https://learn.microsoft.com/en-us/dotnet/api/system.text.json.serialization.jsonpropertyorderattribute?view=net-6.0) - allows to reorder columns when writing to file (by default they are written in class definition order). Only root properties and struct (classes) properties can be ordered (it won't make sense to do the others). +Where built-in JSON attributes are not sufficient, extra attributes are added. + +### Dates + +By default, dates (`DateTime`) are serialized as `INT96` number, which include nanoseconds in the day. In general, `INT96` is obsolete in Parquet, however older systems such as Impala and Hive are still actively using it to represent dates. + +Therefore, when this library sees `INT96` type, it will automatically treat it as a date for both serialization and deserialization. + +If you need to rather use a normal non-legacy date type, just annotate a property with `[ParquetTimestamp]`: + +```csharp +[ParquetTimestamp] +public DateTime TimestampDate { get; set; } +``` + +### Times + +By default, time (`TimeSpan`) is serialised with millisecond precision. but you can increase it by adding `[ParquetMicroSecondsTime]` attribute: + +```csharp +[ParquetMicroSecondsTime] +public TimeSpan MicroTime { get; set; } +``` + +### Decimals Numbers + +By default, `decimal` is serialized with precision (number of digits in a number) of `38` and scale (number of digits to the right of the decimal point in a number) of `18`. If you need to use different precision/scale pair, use `[ParquetDecimal]` attribute: + +```csharp +[ParquetDecimal(40, 20)] +public decimal With_40_20 { get; set; } +``` + + + ## Nested Types You can also serialize [more complex types](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types) supported by the Parquet format. Sometimes you might want to store more complex data in your parquet files, like lists or maps. These are called *nested types* and they can be useful for organizing your information. However, they also come with a trade-off: they make your code slower and use more CPU resources. That's why you should only use them when you really need them and not just because they look cool. Simple columns are faster and easier to work with, so stick to them whenever you can. diff --git a/src/Parquet.PerfRunner/Program.cs b/src/Parquet.PerfRunner/Program.cs index 0584f509..afb2669e 100644 --- a/src/Parquet.PerfRunner/Program.cs +++ b/src/Parquet.PerfRunner/Program.cs @@ -1,6 +1,7 @@ // for performance tests only using BenchmarkDotNet.Running; +using Parquet; using Parquet.PerfRunner.Benchmarks; if(args.Length == 1) { @@ -18,7 +19,7 @@ } else { //new VsParquetSharp().Main(); //await new DataTypes().NullableInts(); - var c = new Classes(); - c.SetUp(); - c.Serialise(); + //var c = new Classes(); + //c.SetUp(); + //c.Serialise(); } diff --git a/src/Parquet.Test/File/Values/Primitives/BigDecimalTest.cs b/src/Parquet.Test/File/Values/Primitives/BigDecimalTest.cs index 23e5f3ce..74d7497c 100644 --- a/src/Parquet.Test/File/Values/Primitives/BigDecimalTest.cs +++ b/src/Parquet.Test/File/Values/Primitives/BigDecimalTest.cs @@ -1,19 +1,13 @@ -using System; -using System.Collections.Generic; -using System.Text; -using Parquet.File.Values.Primitives; +using Parquet.File.Values.Primitives; using Xunit; -namespace Parquet.Test.File.Values.Primitives -{ - public class BigDecimalTest - { - [Fact] - public void Valid_but_massive_bigdecimal() - { - var bd = new BigDecimal(83086059037282.54m, 38, 16); +namespace Parquet.Test.File.Values.Primitives { + public class BigDecimalTest { + [Fact] + public void Valid_but_massive_bigdecimal() { + var bd = new BigDecimal(83086059037282.54m, 38, 16); - //if exception is not thrown (overflow) we're OK - } - } + //if exception is not thrown (overflow) we're OK + } + } } diff --git a/src/Parquet.Test/Schema/SchemaTest.cs b/src/Parquet.Test/Schema/SchemaTest.cs index e794b1f1..419c1121 100644 --- a/src/Parquet.Test/Schema/SchemaTest.cs +++ b/src/Parquet.Test/Schema/SchemaTest.cs @@ -312,32 +312,6 @@ public async Task ReadSchemaActuallyEqualToWriteSchema() { } } - [Theory] - [InlineData(typeof(bool), TT.BOOLEAN, null)] - [InlineData(typeof(byte), TT.INT32, CT.UINT_8)] - [InlineData(typeof(sbyte), TT.INT32, CT.INT_8)] - [InlineData(typeof(short), TT.INT32, CT.INT_16)] - [InlineData(typeof(ushort), TT.INT32, CT.UINT_16)] - [InlineData(typeof(int), TT.INT32, CT.INT_32)] - [InlineData(typeof(uint), TT.INT32, CT.UINT_32)] - [InlineData(typeof(long), TT.INT64, CT.INT_64)] - [InlineData(typeof(ulong), TT.INT64, CT.UINT_64)] - [InlineData(typeof(BigInteger), TT.INT96, null)] - [InlineData(typeof(float), TT.FLOAT, null)] - [InlineData(typeof(double), TT.DOUBLE, null)] - [InlineData(typeof(byte[]), TT.BYTE_ARRAY, null)] - [InlineData(typeof(string), TT.BYTE_ARRAY, CT.UTF8)] - // decimal - [InlineData(typeof(DateTime), TT.INT96, null)] - // TimeSpan - // Interval - public void SystemTypeToThriftMapping(Type t, TT expectedTT, CT? expectedCT) { - Assert.True(SchemaEncoder.FindTypeTuple(t, out TT foundTT, out CT? foundCT)); - - Assert.Equal(expectedTT, foundTT); - Assert.Equal(expectedCT, foundCT); - } - [Fact] public void Decode_list_normal() { ParquetSchema schema = ThriftFooter.Parse( diff --git a/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs b/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs index eaaefc31..32c89e49 100644 --- a/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs +++ b/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs @@ -3,6 +3,7 @@ using System.Text.Json.Serialization; using Parquet.Schema; using Parquet.Serialization; +using Parquet.Serialization.Attributes; using Xunit; namespace Parquet.Test.Serialisation { @@ -239,5 +240,78 @@ public void ListOfStructs() { expectedSchema.Equals(actualSchema), expectedSchema.GetNotEqualsMessage(actualSchema, "expected", "actual")); } + + class DatesPoco { + + public DateTime ImpalaDate { get; set; } + + [ParquetTimestamp] + public DateTime TimestampDate { get; set; } + + public TimeSpan DefaultTime { get; set; } + + [ParquetMicroSecondsTime] + public TimeSpan MicroTime { get; set; } + } + + [Fact] + public void Date_default_impala() { + ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true); + + Assert.True(s.DataFields[0] is DateTimeDataField); + Assert.Equal(DateTimeFormat.Impala, ((DateTimeDataField)s.DataFields[0]).DateTimeFormat); + } + + [Fact] + public void Date_timestamp() { + ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true); + + Assert.True(s.DataFields[1] is DateTimeDataField); + Assert.Equal(DateTimeFormat.DateAndTime, ((DateTimeDataField)s.DataFields[1]).DateTimeFormat); + } + + [Fact] + public void Time_default() { + ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true); + + Assert.True(s.DataFields[2] is TimeSpanDataField); + Assert.Equal(TimeSpanFormat.MilliSeconds, ((TimeSpanDataField)s.DataFields[2]).TimeSpanFormat); + } + + [Fact] + public void Time_micros() { + ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true); + + Assert.True(s.DataFields[3] is TimeSpanDataField); + Assert.Equal(TimeSpanFormat.MicroSeconds, ((TimeSpanDataField)s.DataFields[3]).TimeSpanFormat); + } + + + + class DecimalPoco { + public decimal Default { get; set; } + + [ParquetDecimal(40, 20)] + public decimal With_40_20 { get; set; } + } + + [Fact] + public void Decimal_default() { + ParquetSchema s = typeof(DecimalPoco).GetParquetSchema(true); + + Assert.True(s.DataFields[0] is DecimalDataField); + Assert.Equal(38, ((DecimalDataField)s.DataFields[0]).Precision); + Assert.Equal(18, ((DecimalDataField)s.DataFields[0]).Scale); + } + + [Fact] + public void Decimal_override() { + ParquetSchema s = typeof(DecimalPoco).GetParquetSchema(true); + + Assert.True(s.DataFields[1] is DecimalDataField); + Assert.Equal(40, ((DecimalDataField)s.DataFields[1]).Precision); + Assert.Equal(20, ((DecimalDataField)s.DataFields[1]).Scale); + } + } } \ No newline at end of file diff --git a/src/Parquet.Test/TestBase.cs b/src/Parquet.Test/TestBase.cs index 1abb18b2..176de1bb 100644 --- a/src/Parquet.Test/TestBase.cs +++ b/src/Parquet.Test/TestBase.cs @@ -87,10 +87,16 @@ protected async Task WriteReadSingle(DataField field, object? value, Com //for sanity, use disconnected streams byte[] data; + var options = new ParquetOptions(); +#if !NETCOREAPP3_1 + if(value is DateOnly) + options.UseDateOnlyTypeForDates = true; +#endif + using(var ms = new MemoryStream()) { // write single value - using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(field), ms)) { + using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(field), ms, options)) { writer.CompressionMethod = compressionMethod; using ParquetRowGroupWriter rg = writer.CreateRowGroup(); diff --git a/src/Parquet.Test/Types/EndToEndTypeTest.cs b/src/Parquet.Test/Types/EndToEndTypeTest.cs index bfdcba75..224d6846 100644 --- a/src/Parquet.Test/Types/EndToEndTypeTest.cs +++ b/src/Parquet.Test/Types/EndToEndTypeTest.cs @@ -48,6 +48,9 @@ public class EndToEndTypeTest : TestBase { ["dateDateAndTime local kind"] = (new DateTimeDataField("dateDateAndTime unknown kind", DateTimeFormat.DateAndTime), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)), // don't want any excess info in the offset INT32 doesn't contain or care about this data ["dateDate"] = (new DateTimeDataField("dateDate", DateTimeFormat.Date), DateTime.UtcNow.RoundToDay()), +#if !NETCOREAPP3_1 + ["dateOnly"] = (new DataField("dateOnly"), DateOnly.FromDateTime(DateTime.UtcNow)), +#endif ["interval"] = (new DataField("interval"), new Interval(3, 2, 1)), // time test(loses precision slightly) ["time_micros"] = (new TimeSpanDataField("timeMicros", TimeSpanFormat.MicroSeconds), new TimeSpan(DateTime.UtcNow.TimeOfDay.Ticks / 10 * 10)), @@ -108,6 +111,9 @@ public class EndToEndTypeTest : TestBase { [InlineData("impala date local kind")] [InlineData("dateDateAndTime local kind")] [InlineData("dateDate")] +#if !NETCOREAPP3_1 + [InlineData("dateOnly")] +#endif [InlineData("interval")] [InlineData("time_micros")] [InlineData("time_millis")] @@ -143,9 +149,11 @@ public async Task Type_writes_and_reads_end_to_end(string name) { object actual = await WriteReadSingle(input.field, input.expectedValue); bool equal; - if(input.expectedValue == null && actual == null) equal = true; -else if(actual.GetType().IsArrayOf() && input.expectedValue != null) equal = ((byte[])actual).SequenceEqual((byte[])input.expectedValue); -else if(actual.GetType() == typeof(DateTime)) { + if(input.expectedValue == null && actual == null) + equal = true; + else if(actual.GetType().IsArrayOf() && input.expectedValue != null) { + equal = ((byte[])actual).SequenceEqual((byte[])input.expectedValue); + } else if(actual.GetType() == typeof(DateTime)) { var dtActual = (DateTime)actual; Assert.Equal(DateTimeKind.Utc, dtActual.Kind); var dtExpected = (DateTime)input.expectedValue!; @@ -153,7 +161,9 @@ public async Task Type_writes_and_reads_end_to_end(string name) { ? DateTime.SpecifyKind(dtExpected, DateTimeKind.Utc) // assumes value is UTC : dtExpected.ToUniversalTime(); equal = dtActual.Equals(dtExpected); - } else equal = actual.Equals(input.expectedValue); + } else { + equal = actual.Equals(input.expectedValue); + } Assert.True(equal, $"{name}| expected: [{input.expectedValue}], actual: [{actual}], schema element: {input.field}"); } diff --git a/src/Parquet/Data/DecimalFormatDefaults.cs b/src/Parquet/Data/DecimalFormatDefaults.cs index 7f96007b..37def8cb 100644 --- a/src/Parquet/Data/DecimalFormatDefaults.cs +++ b/src/Parquet/Data/DecimalFormatDefaults.cs @@ -10,6 +10,7 @@ public class DecimalFormatDefaults { /// The Default Precision value used when not explicitly defined; this is the value used prior to parquet-dotnet v3.9. /// public const int DefaultPrecision = 38; + /// /// The Default Scale value used when not explicitly defined; this is the value used prior to parquet-dotnet v3.9. /// diff --git a/src/Parquet/Encodings/ParquetPlainEncoder.cs b/src/Parquet/Encodings/ParquetPlainEncoder.cs index 44eaaec4..ec08713c 100644 --- a/src/Parquet/Encodings/ParquetPlainEncoder.cs +++ b/src/Parquet/Encodings/ParquetPlainEncoder.cs @@ -100,6 +100,13 @@ public static void Encode( Encode(span, destination, tse); if(stats != null) FillStats(span, stats); +#if NET6_0_OR_GREATER + } else if(t == typeof(DateOnly[])) { + Span span = ((DateOnly[])data).AsSpan(offset, count); + Encode(span, destination, tse); + if(stats != null) + FillStats(span, stats); +#endif } else if(t == typeof(TimeSpan[])) { Span span = ((TimeSpan[])data).AsSpan(offset, count); Encode(span, destination, tse); @@ -171,6 +178,11 @@ public static void Decode( } else if(t == typeof(DateTime[])) { Span span = ((DateTime[])data).AsSpan(offset, count); elementsRead = Decode(source, span, tse); +#if NET6_0_OR_GREATER + } else if(t == typeof(DateOnly[])) { + Span span = ((DateOnly[])data).AsSpan(offset, count); + elementsRead = Decode(source, span, tse); +#endif } else if(t == typeof(TimeSpan[])) { Span span = ((TimeSpan[])data).AsSpan(offset, count); elementsRead = Decode(source, span, tse); @@ -238,6 +250,10 @@ public static bool TryEncode(object? value, SchemaElement tse, out byte[]? resul return true; } else if(t == typeof(DateTime)) return TryEncode((DateTime)value, tse, out result); +#if NET6_0_OR_GREATER + else if(t == typeof(DateOnly)) + return TryEncode((DateOnly)value, tse, out result); +#endif else if(t == typeof(TimeSpan)) return TryEncode((TimeSpan)value, tse, out result); else if(t == typeof(Interval)) { @@ -339,6 +355,14 @@ private static bool TryEncode(DateTime value, SchemaElement tse, out byte[] resu } } +#if NET6_0_OR_GREATER + private static bool TryEncode(DateOnly value, SchemaElement tse, out byte[] result) { + int days = value.ToUnixDays(); + result = BitConverter.GetBytes(days); + return true; + } +#endif + private static bool TryEncode(decimal value, SchemaElement tse, out byte[] result) { try { switch(tse.Type) { @@ -766,6 +790,16 @@ public static void Encode(ReadOnlySpan data, Stream destination, Schem } } +#if NET6_0_OR_GREATER + public static void Encode(ReadOnlySpan data, Stream destination, SchemaElement tse) { + foreach(DateOnly element in data) { + int days = element.ToUnixDays(); + byte[] raw = BitConverter.GetBytes(days); + destination.Write(raw, 0, raw.Length); + } + } +#endif + public static int Decode(Span source, Span data, SchemaElement tse) { switch(tse.Type) { case Thrift.Type.INT32: @@ -816,6 +850,20 @@ public static int Decode(Span source, Span data, SchemaElement t } } +#if NET6_0_OR_GREATER + public static int Decode(Span source, Span data, SchemaElement tse) { + int[] ints = ArrayPool.Shared.Rent(data.Length); + try { + int intsRead = Decode(source, ints.AsSpan(0, data.Length)); + for(int i = 0; i < intsRead; i++) + data[i] = DateOnly.FromDateTime(ints[i].AsUnixDaysInDateTime()); + return intsRead; + } finally { + ArrayPool.Shared.Return(ints); + } + } +#endif + public static void Encode(ReadOnlySpan data, Stream destination, SchemaElement tse) { switch(tse.Type) { case Thrift.Type.INT32: @@ -1076,6 +1124,15 @@ public static void FillStats(ReadOnlySpan data, DataColumnStatistics s stats.MaxValue = max; } +#if NET6_0_OR_GREATER + public static void FillStats(ReadOnlySpan data, DataColumnStatistics stats) { + data.MinMax(out DateOnly min, out DateOnly max); + stats.MinValue = min; + stats.MaxValue = max; + } + +#endif + public static void FillStats(ReadOnlySpan data, DataColumnStatistics stats) { data.MinMax(out TimeSpan min, out TimeSpan max); stats.MinValue = min; diff --git a/src/Parquet/Encodings/SchemaEncoder.cs b/src/Parquet/Encodings/SchemaEncoder.cs index 200cf8a7..4fbefbc9 100644 --- a/src/Parquet/Encodings/SchemaEncoder.cs +++ b/src/Parquet/Encodings/SchemaEncoder.cs @@ -12,137 +12,24 @@ namespace Parquet.Encodings { static class SchemaEncoder { - class LookupItem { - public Thrift.Type ThriftType { get; set; } - - public SType? SystemType { get; set; } - - public Dictionary? ConvertedTypes { get; set; } - } - - class LookupTable : List { - - class SystemTypeInfo { - public Thrift.Type tt; - public Thrift.ConvertedType? tct; - public int priority; - } - - // all the cached lookups built during static initialisation - private readonly HashSet _supportedTypes = new(); - private readonly Dictionary _typeToDefaultType = new(); - private readonly Dictionary, SType> _typeAndConvertedTypeToType = new(); - private readonly Dictionary _systemTypeToTypeTuple = new(); - - public void Add(Thrift.Type thriftType, SType t, params object[] options) { - Add(thriftType, t, int.MaxValue, options); - } - - private void AddSystemTypeInfo(Thrift.Type thriftType, SType t, Thrift.ConvertedType? ct, int priority) { - if(_systemTypeToTypeTuple.TryGetValue(t, out SystemTypeInfo? sti)) { - if(priority <= sti.priority) { - sti.tt = thriftType; - sti.tct = ct; - sti.priority = priority; - } - } else { - _systemTypeToTypeTuple[t] = new SystemTypeInfo { - tt = thriftType, - tct = ct, - priority = priority - }; - } - } - - public void Add(Thrift.Type thriftType, SType t, int priority, params object[] options) { - - _typeToDefaultType[thriftType] = t; - - AddSystemTypeInfo(thriftType, t, null, priority); - - _supportedTypes.Add(t); - for(int i = 0; i < options.Length; i += 2) { - var ct = (Thrift.ConvertedType)options[i]; - var clr = (System.Type)options[i + 1]; - _typeAndConvertedTypeToType.Add(new KeyValuePair(thriftType, ct), clr); - - // more specific version overrides less specific - AddSystemTypeInfo(thriftType, clr, ct, int.MaxValue); - - _supportedTypes.Add(clr); - } - } - - public SType? FindSystemType(Thrift.SchemaElement se) { - if(!se.__isset.type) - return null; - - if(se.__isset.converted_type && - _typeAndConvertedTypeToType.TryGetValue( - new KeyValuePair(se.Type, se.Converted_type), - out SType? match)) { - return match; - } - - if(_typeToDefaultType.TryGetValue(se.Type, out match)) { - return match; - } - - return null; - } - - public bool FindTypeTuple(SType type, out Thrift.Type thriftType, out Thrift.ConvertedType? convertedType) { - - if(!_systemTypeToTypeTuple.TryGetValue(type, out SystemTypeInfo? sti)) { - thriftType = default; - convertedType = null; - return false; - } - - thriftType = sti.tt; - convertedType = sti.tct; - return true; - } - - public bool IsSupported(SType? t) => t != null && _supportedTypes.Contains(t); - - public SType[] SupportedTypes => _supportedTypes.ToArray(); - } - - private static readonly LookupTable _lt = new() { - { Thrift.Type.BOOLEAN, typeof(bool) }, - { Thrift.Type.INT32, typeof(int), - Thrift.ConvertedType.UINT_8, typeof(byte), - Thrift.ConvertedType.INT_8, typeof(sbyte), - Thrift.ConvertedType.UINT_16, typeof(ushort), - Thrift.ConvertedType.INT_16, typeof(short), - Thrift.ConvertedType.UINT_32, typeof(uint), - Thrift.ConvertedType.INT_32, typeof(int), - Thrift.ConvertedType.DATE, typeof(DateTime), - Thrift.ConvertedType.DECIMAL, typeof(decimal), - Thrift.ConvertedType.TIME_MILLIS, typeof(TimeSpan), - Thrift.ConvertedType.TIMESTAMP_MILLIS, typeof(DateTime) - }, - { Thrift.Type.INT64 , typeof(long), - Thrift.ConvertedType.INT_64, typeof(long), - Thrift.ConvertedType.UINT_64, typeof(ulong), - Thrift.ConvertedType.TIME_MICROS, typeof(TimeSpan), - Thrift.ConvertedType.TIMESTAMP_MICROS, typeof(DateTime), - Thrift.ConvertedType.TIMESTAMP_MILLIS, typeof(DateTime), - Thrift.ConvertedType.DECIMAL, typeof(decimal) - }, - { Thrift.Type.INT96, typeof(DateTime), 1 }, - { Thrift.Type.INT96, typeof(BigInteger) }, - { Thrift.Type.FLOAT, typeof(float) }, - { Thrift.Type.DOUBLE, typeof(double) }, - { Thrift.Type.BYTE_ARRAY, typeof(byte[]), 1, - Thrift.ConvertedType.UTF8, typeof(string), - Thrift.ConvertedType.DECIMAL, typeof(decimal) - }, - { Thrift.Type.FIXED_LEN_BYTE_ARRAY, typeof(byte[]), - Thrift.ConvertedType.DECIMAL, typeof(decimal), - Thrift.ConvertedType.INTERVAL, typeof(Interval) - } + public static readonly SType[] SupportedTypes = new[] { + typeof(bool), + typeof(byte), typeof(sbyte), + typeof(short), typeof(ushort), + typeof(int), typeof(uint), + typeof(long), typeof(ulong), + typeof(float), + typeof(double), + typeof(decimal), + typeof(BigInteger), + typeof(DateTime), +#if NET6_0_OR_GREATER + typeof(DateOnly), +#endif + typeof(TimeSpan), + typeof(Interval), + typeof(byte[]), + typeof(string) }; static bool TryBuildList(List schema, @@ -244,9 +131,7 @@ static bool TryBuildStruct(List schema, return true; } - public static bool IsSupported(SType? t) => t == typeof(DateTime) || _lt.IsSupported(t); - - public static SType[] SupportedTypes => _lt.SupportedTypes; + public static bool IsSupported(SType? t) => SupportedTypes.Contains(t); /// /// Builds from thrift schema @@ -257,7 +142,7 @@ static bool TryBuildStruct(List schema, /// /// public static Field? Decode(List schema, - ParquetOptions? options, + ParquetOptions options, ref int index, out int ownedChildCount) { Thrift.SchemaElement se = schema[index]; @@ -266,29 +151,8 @@ static bool TryBuildStruct(List schema, Field? f = null; ownedChildCount = 0; - SType? t = _lt.FindSystemType(se); - if(t != null) { - // correction taking int account passed options - if(options != null && options.TreatBigIntegersAsDates && t == typeof(BigInteger)) - t = typeof(DateTime); - - if(options != null && options.TreatByteArrayAsString && t == typeof(byte[])) - t = typeof(string); - - DataField? df; - if(t == typeof(DateTime)) { - df = GetDateTimeDataField(se); - } - else{ - // successful field built - df = new DataField(se.Name, t); - } - - df.IsNullable = isNullable; - df.IsArray = isArray; - df.ThriftSchemaElement = se; + if(TryBuildDataField(se, options, out DataField? df)) { f = df; - index++; return f; } @@ -304,12 +168,88 @@ static bool TryBuildStruct(List schema, return f; } - private static DataField GetDateTimeDataField(SchemaElement se) - { - switch (se.Converted_type) - { + private static bool TryBuildDataField(SchemaElement se, ParquetOptions options, out DataField? df) { + df = null; + + if(!se.__isset.type) + return false; + + SType? st = se.Type switch { + Thrift.Type.BOOLEAN => typeof(bool), + + Thrift.Type.INT32 when se.__isset.converted_type => se.Converted_type switch { + Thrift.ConvertedType.INT_8 => typeof(sbyte), + Thrift.ConvertedType.UINT_8 => typeof(byte), + Thrift.ConvertedType.INT_16 => typeof(short), + Thrift.ConvertedType.UINT_16 => typeof(ushort), + Thrift.ConvertedType.INT_32 => typeof(int), + Thrift.ConvertedType.UINT_32 => typeof(uint), +#if NET6_0_OR_GREATER + Thrift.ConvertedType.DATE => options.UseDateOnlyTypeForDates ? typeof(DateOnly) : typeof(DateTime), +#else + Thrift.ConvertedType.DATE => typeof(DateTime), +#endif + Thrift.ConvertedType.DECIMAL => typeof(decimal), + Thrift.ConvertedType.TIME_MILLIS => typeof(TimeSpan), + Thrift.ConvertedType.TIMESTAMP_MILLIS => typeof(DateTime), + _ => typeof(int) + }, + Thrift.Type.INT32 => typeof(int), + + Thrift.Type.INT64 when se.__isset.converted_type => se.Converted_type switch { + Thrift.ConvertedType.INT_64 => typeof(long), + Thrift.ConvertedType.UINT_64 => typeof(ulong), + Thrift.ConvertedType.TIME_MICROS => typeof(TimeSpan), + Thrift.ConvertedType.TIMESTAMP_MICROS => typeof(DateTime), + Thrift.ConvertedType.TIMESTAMP_MILLIS => typeof(DateTime), + Thrift.ConvertedType.DECIMAL => typeof(decimal), + _ => typeof(long) + }, + Thrift.Type.INT64 => typeof(long), + + Thrift.Type.INT96 when options.TreatBigIntegersAsDates => typeof(DateTime), + Thrift.Type.INT96 => typeof(BigInteger), + Thrift.Type.FLOAT => typeof(float), + Thrift.Type.DOUBLE => typeof(double), + + Thrift.Type.BYTE_ARRAY when se.__isset.converted_type => se.Converted_type switch { + Thrift.ConvertedType.UTF8 => typeof(string), + Thrift.ConvertedType.DECIMAL => typeof(decimal), + _ => typeof(byte[]) + }, + Thrift.Type.BYTE_ARRAY => options.TreatByteArrayAsString ? typeof(string) : typeof(byte[]), + + Thrift.Type.FIXED_LEN_BYTE_ARRAY when se.__isset.converted_type => se.Converted_type switch { + Thrift.ConvertedType.DECIMAL => typeof(decimal), + Thrift.ConvertedType.INTERVAL => typeof(Interval), + _ => typeof(byte[]) + }, + Thrift.Type.FIXED_LEN_BYTE_ARRAY => options.TreatByteArrayAsString ? typeof(string) : typeof(byte[]), + + _ => null + }; + + if(st == null) + return false; + + if(st == typeof(DateTime)) { + df = GetDateTimeDataField(se); + } else { + // successful field built + df = new DataField(se.Name, st); + } + bool isNullable = se.Repetition_type != Thrift.FieldRepetitionType.REQUIRED; + bool isArray = se.Repetition_type == Thrift.FieldRepetitionType.REPEATED; + df.IsNullable = isNullable; + df.IsArray = isArray; + df.ThriftSchemaElement = se; + return true; + } + + private static DataField GetDateTimeDataField(SchemaElement se) { + switch(se.Converted_type) { case ConvertedType.TIMESTAMP_MILLIS: - if (se.Type == Type.INT64) + if(se.Type == Type.INT64) return new DateTimeDataField(se.Name, DateTimeFormat.DateAndTime); break; case ConvertedType.DATE: @@ -320,68 +260,6 @@ private static DataField GetDateTimeDataField(SchemaElement se) return new DateTimeDataField(se.Name, DateTimeFormat.Impala); } - /// - /// Adjust type-specific schema encodings that do not always follow generic use case - /// - /// - /// - private static void AdjustEncoding(DataField df, Thrift.SchemaElement tse) { - if(df.ClrType == typeof(DateTime)) { - if(df is DateTimeDataField dfDateTime) { - switch(dfDateTime.DateTimeFormat) { - case DateTimeFormat.DateAndTime: - tse.Type = Thrift.Type.INT64; - tse.Converted_type = Thrift.ConvertedType.TIMESTAMP_MILLIS; - break; - case DateTimeFormat.Date: - tse.Type = Thrift.Type.INT32; - tse.Converted_type = Thrift.ConvertedType.DATE; - break; - - //other cases are just default - } - } - } else if(df.ClrType == typeof(decimal)) { - if(df is DecimalDataField dfDecimal) { - if(dfDecimal.ForceByteArrayEncoding) - tse.Type = Thrift.Type.FIXED_LEN_BYTE_ARRAY; - else if(dfDecimal.Precision <= 9) - tse.Type = Thrift.Type.INT32; - else if(dfDecimal.Precision <= 18) - tse.Type = Thrift.Type.INT64; - else - tse.Type = Thrift.Type.FIXED_LEN_BYTE_ARRAY; - - tse.Precision = dfDecimal.Precision; - tse.Scale = dfDecimal.Scale; - tse.Type_length = BigDecimal.GetBufferSize(dfDecimal.Precision); - } else { - //set defaults - tse.Precision = DecimalFormatDefaults.DefaultPrecision; - tse.Scale = DecimalFormatDefaults.DefaultScale; - tse.Type_length = 16; - } - } else if(df.ClrType == typeof(Interval)) { - //set type length to 12 - tse.Type_length = 12; - } else if(df.ClrType == typeof(TimeSpan)) { - if(df is TimeSpanDataField dfTime) { - switch(dfTime.TimeSpanFormat) { - case TimeSpanFormat.MicroSeconds: - tse.Type = Thrift.Type.INT64; - tse.Converted_type = Thrift.ConvertedType.TIME_MICROS; - break; - case TimeSpanFormat.MilliSeconds: - tse.Type = Thrift.Type.INT32; - tse.Converted_type = Thrift.ConvertedType.TIME_MILLIS; - break; - - //other cases are just default - } - } - } - } - private static void Encode(ListField listField, Thrift.SchemaElement parent, IList container) { parent.Num_children += 1; @@ -449,19 +327,153 @@ private static void Encode(StructField structField, Thrift.SchemaElement parent, } } - public static void Encode(Field field, Thrift.SchemaElement parent, IList container) { - if(field.SchemaType == SchemaType.Data && field is DataField dataField) { - var tse = new Thrift.SchemaElement(field.Name); + public static Thrift.SchemaElement Encode(DataField field) { + SType st = field.ClrType; + var tse = new Thrift.SchemaElement(field.Name); + tse.LogicalType = new Thrift.LogicalType(); + + if(st == typeof(bool)) { // boolean + tse.Type = Thrift.Type.BOOLEAN; + } else if(st == typeof(byte) || st == typeof(sbyte) || // 32-bit numbers + st == typeof(short) || st == typeof(ushort) || + st == typeof(int) || st == typeof(uint)) { + + tse.Type = Thrift.Type.INT32; + sbyte bw = 0; + if(st == typeof(byte) || st == typeof(sbyte)) + bw = 8; + else if(st == typeof(short) || st == typeof(ushort)) + bw = 16; + else if(st == typeof(int) || st == typeof(uint)) + bw = 32; + bool signed = st == typeof(sbyte) || st == typeof(short) || st == typeof(int); + + tse.LogicalType.INTEGER = new Thrift.IntType { + BitWidth = bw, + IsSigned = signed + }; + tse.Converted_type = bw switch { + 8 => signed ? Thrift.ConvertedType.INT_8 : Thrift.ConvertedType.UINT_8, + 16 => signed ? Thrift.ConvertedType.INT_16 : Thrift.ConvertedType.UINT_16, + 32 => signed ? Thrift.ConvertedType.INT_32 : Thrift.ConvertedType.UINT_32, + _ => Thrift.ConvertedType.INT_32 + }; + } else if(st == typeof(long) || st == typeof(ulong)) { // 64-bit number + tse.Type = Thrift.Type.INT64; + tse.LogicalType.INTEGER = new Thrift.IntType { BitWidth = 64, IsSigned = st == typeof(long) }; + tse.Converted_type = st == typeof(long) ? Thrift.ConvertedType.INT_64 : Thrift.ConvertedType.UINT_64; + } else if(st == typeof(float)) { // float + tse.Type = Thrift.Type.FLOAT; + } else if(st == typeof(double)) { // double + tse.Type = Thrift.Type.DOUBLE; + } else if(st == typeof(BigInteger)) { // BigInteger + tse.Type = Thrift.Type.INT96; + } else if(st == typeof(string)) { // string + tse.Type = Thrift.Type.BYTE_ARRAY; + tse.LogicalType.STRING = new Thrift.StringType(); + tse.Converted_type = Thrift.ConvertedType.UTF8; + } else if(st == typeof(decimal)) { // decimal + + int precision; + int scale; + + if(field is DecimalDataField dfDecimal) { + if(dfDecimal.ForceByteArrayEncoding) + tse.Type = Thrift.Type.FIXED_LEN_BYTE_ARRAY; + else if(dfDecimal.Precision <= 9) + tse.Type = Thrift.Type.INT32; + else if(dfDecimal.Precision <= 18) + tse.Type = Thrift.Type.INT64; + else + tse.Type = Thrift.Type.FIXED_LEN_BYTE_ARRAY; - if(!_lt.FindTypeTuple(dataField.ClrType, out Thrift.Type thriftType, out Thrift.ConvertedType? convertedType)) { - throw new NotSupportedException($"could not find type tuple for {dataField.ClrType}"); + precision = dfDecimal.Precision; + scale = dfDecimal.Scale; + tse.Type_length = BigDecimal.GetBufferSize(dfDecimal.Precision); + } else { + //set defaults + tse.Type = Thrift.Type.FIXED_LEN_BYTE_ARRAY; + precision = DecimalFormatDefaults.DefaultPrecision; + scale = DecimalFormatDefaults.DefaultScale; + tse.Type_length = 16; } - tse.Type = thriftType; - if(convertedType != null) { - // be careful calling thrift setter as it sets other hidden flags - tse.Converted_type = convertedType.Value; + tse.LogicalType.DECIMAL = new Thrift.DecimalType { + Precision = precision, + Scale = scale + }; + tse.Precision = precision; + tse.Scale = scale; + } else if(st == typeof(byte[])) { // byte[] + tse.Type = Thrift.Type.BYTE_ARRAY; + } else if(st == typeof(DateTime)) { // DateTime + if(field is DateTimeDataField dfDateTime) { + switch(dfDateTime.DateTimeFormat) { + case DateTimeFormat.DateAndTime: + tse.Type = Thrift.Type.INT64; + tse.Converted_type = Thrift.ConvertedType.TIMESTAMP_MILLIS; + break; + case DateTimeFormat.Date: + tse.Type = Thrift.Type.INT32; + tse.Converted_type = Thrift.ConvertedType.DATE; + break; + default: + tse.Type = Thrift.Type.INT96; + break; + } + } else { + tse.Type = Thrift.Type.INT96; + } +#if NET6_0_OR_GREATER + } else if(st == typeof(DateOnly)) { // DateOnly + tse.Type = Thrift.Type.INT32; + tse.LogicalType.DATE = new Thrift.DateType(); + tse.Converted_type = Thrift.ConvertedType.DATE; +#endif + } else if(st == typeof(TimeSpan)) { // TimeSpan + if(field is TimeSpanDataField dfTime) { + switch(dfTime.TimeSpanFormat) { + case TimeSpanFormat.MilliSeconds: + tse.Type = Thrift.Type.INT32; + tse.LogicalType.TIME = new Thrift.TimeType { + IsAdjustedToUTC = true, + Unit = new Thrift.TimeUnit { MILLIS = new Thrift.MilliSeconds() } + }; + tse.Converted_type = Thrift.ConvertedType.TIME_MILLIS; + break; + case TimeSpanFormat.MicroSeconds: + tse.Type = Thrift.Type.INT64; + tse.LogicalType.TIME = new Thrift.TimeType { + IsAdjustedToUTC = true, + Unit = new Thrift.TimeUnit { MICROS = new Thrift.MicroSeconds() } + }; + tse.Converted_type = Thrift.ConvertedType.TIME_MICROS; + break; + default: + throw new NotImplementedException($"{dfTime.TimeSpanFormat} time format is not implemented"); + } + } else { + tse.Type = Thrift.Type.INT32; + tse.LogicalType.TIME = new Thrift.TimeType { + IsAdjustedToUTC = true, + Unit = new Thrift.TimeUnit { MILLIS = new Thrift.MilliSeconds() } + }; + tse.Converted_type = Thrift.ConvertedType.TIME_MILLIS; } + } else if(st == typeof(Interval)) { // Interval + tse.Type = Thrift.Type.FIXED_LEN_BYTE_ARRAY; + tse.Type_length = 12; + tse.Converted_type = Thrift.ConvertedType.INTERVAL; + } else { + throw new InvalidOperationException($"type {st} is not supported"); + } + + return tse; + } + + public static void Encode(Field field, Thrift.SchemaElement parent, IList container) { + if(field.SchemaType == SchemaType.Data && field is DataField dataField) { + Thrift.SchemaElement tse = Encode(dataField); bool isList = container.Count > 1 && container[container.Count - 2].Converted_type == Thrift.ConvertedType.LIST; @@ -470,8 +482,6 @@ public static void Encode(Field field, Thrift.SchemaElement parent, IList - _lt.FindTypeTuple(type, out thriftType, out convertedType); } } \ No newline at end of file diff --git a/src/Parquet/Extensions/OtherExtensions.cs b/src/Parquet/Extensions/OtherExtensions.cs index 3a92cf19..812ee11d 100644 --- a/src/Parquet/Extensions/OtherExtensions.cs +++ b/src/Parquet/Extensions/OtherExtensions.cs @@ -39,6 +39,13 @@ public static int ToUnixDays(this DateTime dto) { return (int)diff.TotalDays; } +#if NET6_0_OR_GREATER + public static int ToUnixDays(this DateOnly dto) { + TimeSpan diff = new DateTime(dto.Year, dto.Month, dto.Day) - UnixEpoch; + return (int)diff.TotalDays; + } +#endif + public static DateTime ToUtc(this DateTime dto) => dto.Kind == DateTimeKind.Unspecified ? DateTime.SpecifyKind(dto, DateTimeKind.Utc) diff --git a/src/Parquet/Extensions/SpanExtensions.cs b/src/Parquet/Extensions/SpanExtensions.cs index aba3bf2f..14d6bd7e 100644 --- a/src/Parquet/Extensions/SpanExtensions.cs +++ b/src/Parquet/Extensions/SpanExtensions.cs @@ -206,6 +206,19 @@ public static void MinMax(this ReadOnlySpan span, out DateTime min, ou } } +#if NET6_0_OR_GREATER + public static void MinMax(this ReadOnlySpan span, out DateOnly min, out DateOnly max) { + min = span.IsEmpty ? default(DateOnly) : span[0]; + max = min; + foreach(DateOnly i in span) { + if(i < min) + min = i; + if(i > max) + max = i; + } + } +#endif + public static void MinMax(this ReadOnlySpan span, out TimeSpan min, out TimeSpan max) { min = span.IsEmpty ? default(TimeSpan) : span[0]; max = min; diff --git a/src/Parquet/File/ThriftFooter.cs b/src/Parquet/File/ThriftFooter.cs index 10fb1b94..f9142dd5 100644 --- a/src/Parquet/File/ThriftFooter.cs +++ b/src/Parquet/File/ThriftFooter.cs @@ -33,7 +33,7 @@ internal static ParquetSchema Parse(params Thrift.SchemaElement[] elements) { return new ThriftFooter(new Thrift.FileMetaData { Schema = slst - }).CreateModelSchema(null); + }).CreateModelSchema(new ParquetOptions()); } public ThriftFooter(ParquetSchema schema, long totalRowCount) { @@ -160,7 +160,7 @@ public Thrift.PageHeader CreateDictionaryPage(int numValues) { #region [ Conversion to Model Schema ] - public ParquetSchema CreateModelSchema(ParquetOptions? formatOptions) { + public ParquetSchema CreateModelSchema(ParquetOptions formatOptions) { int si = 0; Thrift.SchemaElement tse = _fileMeta.Schema[si++]; var container = new List(); @@ -170,7 +170,7 @@ public ParquetSchema CreateModelSchema(ParquetOptions? formatOptions) { return new ParquetSchema(container); } - private void CreateModelSchema(FieldPath? path, IList container, int childCount, ref int si, ParquetOptions? formatOptions) { + private void CreateModelSchema(FieldPath? path, IList container, int childCount, ref int si, ParquetOptions formatOptions) { for(int i = 0; i < childCount && si < _fileMeta.Schema.Count; i++) { Field? se = SchemaEncoder.Decode(_fileMeta.Schema, formatOptions, ref si, out int ownedChildCount); if(se == null) diff --git a/src/Parquet/ParquetOptions.cs b/src/Parquet/ParquetOptions.cs index dafc1ca3..2ff5da5c 100644 --- a/src/Parquet/ParquetOptions.cs +++ b/src/Parquet/ParquetOptions.cs @@ -16,6 +16,14 @@ public class ParquetOptions { /// public bool TreatBigIntegersAsDates { get; set; } = true; +#if NET6_0_OR_GREATER + /// + /// When set to true, parquet dates will be deserialized as , otherwise + /// as with missing time part. + /// + public bool UseDateOnlyTypeForDates { get; set; } = false; +#endif + /// /// Whether to use dictionary encoding for string columns. Other column types are not supported. /// diff --git a/src/Parquet/Schema/TimeSpanDataField.cs b/src/Parquet/Schema/TimeSpanDataField.cs index faf3ef29..69a0e7de 100644 --- a/src/Parquet/Schema/TimeSpanDataField.cs +++ b/src/Parquet/Schema/TimeSpanDataField.cs @@ -16,9 +16,10 @@ public class TimeSpanDataField : DataField { /// The name. /// The format. /// - public TimeSpanDataField(string name, TimeSpanFormat format, bool isNullable = false) - : base(name, typeof(TimeSpan)) { - IsNullable = isNullable; + /// + /// + public TimeSpanDataField(string name, TimeSpanFormat format, bool? isNullable = null, bool? isArray = null, string? propertyName = null) + : base(name, typeof(TimeSpan), isNullable, isArray, propertyName) { TimeSpanFormat = format; } } diff --git a/src/Parquet/Serialization/Attributes/ParquetDecimalAttribute.cs b/src/Parquet/Serialization/Attributes/ParquetDecimalAttribute.cs new file mode 100644 index 00000000..4a6031ed --- /dev/null +++ b/src/Parquet/Serialization/Attributes/ParquetDecimalAttribute.cs @@ -0,0 +1,31 @@ +using System; + +namespace Parquet.Serialization.Attributes { + + /// + /// Annotates a property to allow customizing precision and scale + /// + [AttributeUsage(AttributeTargets.Property, AllowMultiple = false)] + public class ParquetDecimalAttribute : Attribute { + + /// + /// + /// + /// + /// + public ParquetDecimalAttribute(int precision, int scale) { + Precision = precision; + Scale = scale; + } + + /// + /// Precision + /// + public int Precision { get; } + + /// + /// Scale + /// + public int Scale { get; } + } +} diff --git a/src/Parquet/Serialization/Attributes/ParquetMicroSecondsTimeAttribute.cs b/src/Parquet/Serialization/Attributes/ParquetMicroSecondsTimeAttribute.cs new file mode 100644 index 00000000..63e81f9d --- /dev/null +++ b/src/Parquet/Serialization/Attributes/ParquetMicroSecondsTimeAttribute.cs @@ -0,0 +1,11 @@ +using System; + +namespace Parquet.Serialization.Attributes { + + /// + /// Specifies that field should be serialised with microseconds precision (not default milliseconds). + /// + [AttributeUsage(AttributeTargets.Property, AllowMultiple = false)] + public class ParquetMicroSecondsTimeAttribute : Attribute { + } +} diff --git a/src/Parquet/Serialization/Attributes/ParquetTimestampAttribute.cs b/src/Parquet/Serialization/Attributes/ParquetTimestampAttribute.cs new file mode 100644 index 00000000..5aee5d88 --- /dev/null +++ b/src/Parquet/Serialization/Attributes/ParquetTimestampAttribute.cs @@ -0,0 +1,12 @@ +using System; + +namespace Parquet.Serialization.Attributes { + + /// + /// Specifies that a property of type should be serialized as Parquet timestamp, which is internally + /// an int64 number. + /// + [AttributeUsage(AttributeTargets.Property, AllowMultiple = false)] + public class ParquetTimestampAttribute : Attribute { + } +} diff --git a/src/Parquet/Serialization/ParquetSerializer.cs b/src/Parquet/Serialization/ParquetSerializer.cs index 260aee74..ccbdb74b 100644 --- a/src/Parquet/Serialization/ParquetSerializer.cs +++ b/src/Parquet/Serialization/ParquetSerializer.cs @@ -62,7 +62,9 @@ public static async Task SerializeAsync(IEnumerable objectI } bool append = options != null && options.Append; - using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, null, append, cancellationToken)) { + using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, + options?.ParquetOptions, + append, cancellationToken)) { if(options != null) { writer.CompressionMethod = options.CompressionMethod; @@ -105,10 +107,12 @@ public static async Task SerializeAsync(IEnumerable objectI /// /// /// + /// /// /// /// public static async Task> DeserializeAsync(Stream source, + ParquetOptions? options = null, CancellationToken cancellationToken = default) where T : new() { @@ -123,7 +127,7 @@ public static async Task> DeserializeAsync(Stream source, var result = new List(); - using ParquetReader reader = await ParquetReader.CreateAsync(source); + using ParquetReader reader = await ParquetReader.CreateAsync(source, options, cancellationToken: cancellationToken); for(int rgi = 0; rgi < reader.RowGroupCount; rgi++) { using ParquetRowGroupReader rg = reader.OpenRowGroupReader(rgi); diff --git a/src/Parquet/Serialization/ParquetSerializerOptions.cs b/src/Parquet/Serialization/ParquetSerializerOptions.cs index f039cd2c..d40b9575 100644 --- a/src/Parquet/Serialization/ParquetSerializerOptions.cs +++ b/src/Parquet/Serialization/ParquetSerializerOptions.cs @@ -32,5 +32,10 @@ public class ParquetSerializerOptions { /// Custom row group size, if different from /// public int? RowGroupSize { get; set; } + + /// + /// Further customisations + /// + public ParquetOptions? ParquetOptions { get; set; } = new ParquetOptions(); } } diff --git a/src/Parquet/Serialization/TypeExtensions.cs b/src/Parquet/Serialization/TypeExtensions.cs index 522f86a6..7395c80c 100644 --- a/src/Parquet/Serialization/TypeExtensions.cs +++ b/src/Parquet/Serialization/TypeExtensions.cs @@ -4,8 +4,10 @@ using System.Linq; using System.Reflection; using System.Text.Json.Serialization; +using Parquet.Data; using Parquet.Encodings; using Parquet.Schema; +using Parquet.Serialization.Attributes; namespace Parquet.Serialization { @@ -65,14 +67,36 @@ private static List FindProperties(Type t, bool forWriting) { } private static Field ConstructDataField(string name, string propertyName, Type t, PropertyInfo? pi) { - var r = new DataField(name, t, propertyName: propertyName); + Field r; + + if(t == typeof(DateTime)) { + bool isTimestamp = pi?.GetCustomAttribute() != null; + r = new DateTimeDataField(name, + isTimestamp ? DateTimeFormat.DateAndTime : DateTimeFormat.Impala, + propertyName: propertyName); + } else if(t == typeof(TimeSpan)) { + r = new TimeSpanDataField(name, + pi?.GetCustomAttribute() == null + ? TimeSpanFormat.MilliSeconds + : TimeSpanFormat.MicroSeconds, + propertyName: propertyName); + } else if(t == typeof(decimal)) { + ParquetDecimalAttribute? ps = pi?.GetCustomAttribute(); + r = ps == null + ? new DecimalDataField(name, + DecimalFormatDefaults.DefaultPrecision, DecimalFormatDefaults.DefaultScale, + propertyName: propertyName) + : new DecimalDataField(name, ps.Precision, ps.Scale, propertyName: propertyName); + } else { + r = new DataField(name, t, propertyName: propertyName); + } + // legacy attribute parsing ParquetColumnAttribute? columnAttr = pi?.GetCustomAttribute(); - if(columnAttr != null) { if(columnAttr.UseListField) { - return new ListField(r.Name, r.ClrNullableIfHasNullsType, propertyName, - columnAttr.ListContainerName, columnAttr.ListElementName); + Type nt = new DataField(name, t).ClrNullableIfHasNullsType; + return new ListField(r.Name, nt, propertyName, columnAttr.ListContainerName, columnAttr.ListElementName); } if(t == typeof(TimeSpan)) @@ -182,6 +206,7 @@ private static Field MakeField(Type t, string columnName, string propertyName, .Select(p => MakeField(p, forWriting, makeArrays)) .Where(f => f != null) .Select(f => f!) + .OrderBy(f => f.Order) .ToArray(); if(fields.Length == 0)