forked from linkedin/goavro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
union.go
418 lines (384 loc) · 15.3 KB
/
union.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
// Copyright [2019] LinkedIn Corp. Licensed under the Apache License, Version
// 2.0 (the "License"); you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package goavro
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"sort"
)
// codecInfo is a set of quick lookups it holds all the lookup info for the
// all the schemas we need to handle the list of types for this union
type codecInfo struct {
allowedTypes []string
codecFromIndex []*Codec
codecFromName map[string]*Codec
indexFromName map[string]int
}
// Union wraps a datum value in a map for encoding as a Union, as required by
// Union encoder.
//
// When providing a value for an Avro union, the encoder will accept `nil` for a
// `null` value. If the value is non-`nil`, it must be a
// `map[string]interface{}` with a single key-value pair, where the key is the
// Avro type name and the value is the datum's value. As a convenience, the
// `Union` function wraps any datum value in a map as specified above.
//
// func ExampleUnion() {
// codec, err := goavro.NewCodec(`["null","string","int"]`)
// if err != nil {
// fmt.Println(err)
// }
// buf, err := codec.TextualFromNative(nil, goavro.Union("string", "some string"))
// if err != nil {
// fmt.Println(err)
// }
// fmt.Println(string(buf))
// // Output: {"string":"some string"}
// }
func Union(name string, datum interface{}) interface{} {
if datum == nil && name == "null" {
return nil
}
return map[string]interface{}{name: datum}
}
// makeCodecInfo takes the schema array
// and builds some lookup indices
// returning a codecInfo
func makeCodecInfo(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (codecInfo, error) {
allowedTypes := make([]string, len(schemaArray)) // used for error reporting when encoder receives invalid datum type
codecFromIndex := make([]*Codec, len(schemaArray))
codecFromName := make(map[string]*Codec, len(schemaArray))
indexFromName := make(map[string]int, len(schemaArray))
for i, unionMemberSchema := range schemaArray {
unionMemberCodec, err := buildCodec(st, enclosingNamespace, unionMemberSchema, cb)
if err != nil {
return codecInfo{}, fmt.Errorf("Union item %d ought to be valid Avro type: %s", i+1, err)
}
fullName := unionMemberCodec.typeName.fullName
if _, ok := indexFromName[fullName]; ok {
return codecInfo{}, fmt.Errorf("Union item %d ought to be unique type: %s", i+1, unionMemberCodec.typeName)
}
allowedTypes[i] = fullName
codecFromIndex[i] = unionMemberCodec
codecFromName[fullName] = unionMemberCodec
indexFromName[fullName] = i
}
return codecInfo{
allowedTypes: allowedTypes,
codecFromIndex: codecFromIndex,
codecFromName: codecFromName,
indexFromName: indexFromName,
}, nil
}
func unionNativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
var decoded interface{}
var err error
decoded, buf, err = longNativeFromBinary(buf)
if err != nil {
return nil, nil, err
}
index := decoded.(int64) // longDecoder always returns int64, so elide error checking
if index < 0 || index >= int64(len(cr.codecFromIndex)) {
return nil, nil, fmt.Errorf("cannot decode binary union: index ought to be between 0 and %d; read index: %d", len(cr.codecFromIndex)-1, index)
}
c := cr.codecFromIndex[index]
decoded, buf, err = c.nativeFromBinary(buf)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode binary union item %d: %s", index+1, err)
}
if decoded == nil {
// do not wrap a nil value in a map
return nil, buf, nil
}
// Non-nil values are wrapped in a map with single key set to type name of value
return Union(cr.allowedTypes[index], decoded), buf, nil
}
}
func unionBinaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
index, ok := cr.indexFromName["null"]
if !ok {
return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return longBinaryFromNative(buf, index)
case map[string]interface{}:
if len(v) != 1 {
return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
// will execute exactly once
for key, value := range v {
index, ok := cr.indexFromName[key]
if !ok {
return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
c := cr.codecFromIndex[index]
buf, _ = longBinaryFromNative(buf, index)
return c.binaryFromNative(buf, value)
}
}
return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func unionNativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) {
if _, ok := cr.indexFromName["null"]; ok {
return nil, buf[4:], nil
}
}
var datum interface{}
var err error
datum, buf, err = genericMapTextDecoder(buf, nil, cr.codecFromName)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode textual union: %s", err)
}
return datum, buf, nil
}
}
func unionTextualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
_, ok := cr.indexFromName["null"]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return append(buf, "null"...), nil
case map[string]interface{}:
if len(v) != 1 {
return nil, fmt.Errorf("cannot encode textual union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
// will execute exactly once
for key, value := range v {
index, ok := cr.indexFromName[key]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
buf = append(buf, '{')
var err error
buf, err = stringTextualFromNative(buf, key)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
buf = append(buf, ':')
c := cr.codecFromIndex[index]
buf, err = c.textualFromNative(buf, value)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
return append(buf, '}'), nil
}
}
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func textualJSONFromNativeAvro(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
_, ok := cr.indexFromName["null"]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return append(buf, "null"...), nil
case map[string]interface{}:
if len(v) != 1 {
return nil, fmt.Errorf("cannot encode textual union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
// will execute exactly once
for key, value := range v {
index, ok := cr.indexFromName[key]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
var err error
c := cr.codecFromIndex[index]
buf, err = c.textualFromNative(buf, value)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
return buf, nil
}
}
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}
cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
if err != nil {
return nil, err
}
rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,
typeName: &name{"union", nullNamespace},
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: unionNativeFromTextual(&cr),
textualFromNative: unionTextualFromNative(&cr),
}
return rv, nil
}
// Standard JSON
//
// The default avro library supports a json that would result from your data into json
// instead of serializing it into binary
//
// JSON in the wild differs from that in one critical way - unions
// the avro spec requires unions to have their type indicated
// which means every value that is of a union type
// is actually sent as a small map {"string", "some string"}
// instead of simply as the value itself, which is the way of wild JSON
// https://avro.apache.org/docs/current/spec.html#json_encoding
//
// In order to use this to avro encode standard json the unions have to be rewritten
// so the can encode into unions as expected by the avro schema
//
// so the technique is to read in the json in the usual way
// when a union type is found, read the next json object
// try to figure out if it fits into any of the types
// that are specified for the union per the supplied schema
// if so, then wrap the value into a map and return the expected Union
//
// the json is morphed on the read side
// and then it will remain avro-json object
// avro data is not serialized back into standard json
// the data goes to avro-json and stays that way
func buildCodecForTypeDescribedBySliceOneWayJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}
cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
if err != nil {
return nil, err
}
rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,
typeName: &name{"union", nullNamespace},
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJSON(&cr),
textualFromNative: unionTextualFromNative(&cr),
}
return rv, nil
}
func buildCodecForTypeDescribedBySliceTwoWayJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}
cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
if err != nil {
return nil, err
}
rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,
typeName: &name{"union", nullNamespace},
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJSON(&cr),
textualFromNative: textualJSONFromNativeAvro(&cr),
}
return rv, nil
}
func checkAll(allowedTypes []string, cr *codecInfo, buf []byte) (interface{}, []byte, error) {
for _, name := range cr.allowedTypes {
if name == "null" {
// skip null since we know we already got type float64
continue
}
theCodec, ok := cr.codecFromName[name]
if !ok {
continue
}
rv, rb, err := theCodec.NativeFromTextual(buf)
if err != nil {
continue
}
return map[string]interface{}{name: rv}, rb, nil
}
return nil, buf, fmt.Errorf("could not decode any json data in input %v", string(buf))
}
func nativeAvroFromTextualJSON(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
reader := bytes.NewReader(buf)
dec := json.NewDecoder(reader)
var m interface{}
// i should be able to grab the next json "value" with decoder.Decode()
// https://pkg.go.dev/encoding/json#Decoder.Decode
// that dec.More() loop will give the next
// whatever then dec.Decode(&m)
// if m is interface{}
// it goes one legit json object at a time like this
// json.Delim: [
// Q:map[string]interface {}: map[Name:Ed Text:Knock knock.]
// Q:map[string]interface {}: map[Name:Sam Text:Who's there?]
// Q:map[string]interface {}: map[Name:Ed Text:Go fmt.]
// Q:map[string]interface {}: map[Name:Sam Text:Go fmt who?]
// Q:map[string]interface {}: map[Name:Ed Text:Go fmt yourself!]
// string: eewew
// bottom:json.Delim: ]
//
// so right here, grab whatever this object is
// grab the object specified as the value
// and try to figure out what it is and handle it
err := dec.Decode(&m)
if err != nil {
return nil, buf, err
}
allowedTypes := cr.allowedTypes
switch m.(type) {
case nil:
if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) {
if _, ok := cr.codecFromName["null"]; ok {
return nil, buf[4:], nil
}
}
case float64:
// dec.Decode turns them all into float64
// avro spec knows about int, long (variable length zig-zag)
// and then float and double (32 bits, 64 bits)
// https://avro.apache.org/docs/current/spec.html#binary_encode_primitive
//
// double
// doubleNativeFromTextual
// float
// floatNativeFromTextual
// long
// longNativeFromTextual
// int
// intNativeFromTextual
// sorted so it would be
// double, float, int, long
// that makes the priorities right by chance
sort.Strings(cr.allowedTypes)
case map[string]interface{}:
// try to decode it as a map
// because a map should fail faster than a record
// if that fails assume record and return it
sort.Strings(cr.allowedTypes)
}
return checkAll(allowedTypes, cr, buf)
}
}