diff --git a/online/src/main/scala/ai/chronon/online/Fetcher.scala b/online/src/main/scala/ai/chronon/online/Fetcher.scala index e7e490d0a..d8d77f16f 100644 --- a/online/src/main/scala/ai/chronon/online/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/Fetcher.scala @@ -160,10 +160,8 @@ class Fetcher(val kvStore: KVStore, val joinName = joinConf.metaData.nameToFilePath val keySchema = StructType(s"${joinName.sanitize}_key", keyFields.toArray) - val keyCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString) val baseValueSchema = StructType(s"${joinName.sanitize}_value", valueFields.toArray) - val baseValueCodec = AvroCodec.of(AvroConversions.fromChrononSchema(baseValueSchema).toString) - val joinCodec = JoinCodec(joinConf, keySchema, baseValueSchema, keyCodec, baseValueCodec) + val joinCodec = JoinCodec(joinConf, keySchema, baseValueSchema) logControlEvent(joinCodec) joinCodec } @@ -310,6 +308,12 @@ class Fetcher(val kvStore: KVStore, val loggingTry: Try[Unit] = joinCodecTry.map(codec => { val metaData = codec.conf.join.metaData val samplePercent = if (metaData.isSetSamplePercent) metaData.getSamplePercent else 0 + + // Exit early if sample percent is 0 + if (samplePercent == 0) { + return Response(resp.request, Success(resp.derivedValues)) + } + val keyBytes = encode(codec.keySchema, codec.keyCodec, resp.request.keys, cast = true) val hash = if (samplePercent > 0) { diff --git a/online/src/main/scala/ai/chronon/online/JoinCodec.scala b/online/src/main/scala/ai/chronon/online/JoinCodec.scala index 9cf3ea2fd..ab1867ce1 100644 --- a/online/src/main/scala/ai/chronon/online/JoinCodec.scala +++ b/online/src/main/scala/ai/chronon/online/JoinCodec.scala @@ -30,12 +30,7 @@ import ai.chronon.online.OnlineDerivationUtil.{ timeFields } -case class JoinCodec(conf: JoinOps, - keySchema: StructType, - baseValueSchema: StructType, - keyCodec: AvroCodec, - baseValueCodec: AvroCodec) - extends Serializable { +case class JoinCodec(conf: JoinOps, keySchema: StructType, baseValueSchema: StructType) extends Serializable { @transient lazy val valueSchema: StructType = { val fields = if (conf.join == null || conf.join.derivations == null || baseValueSchema.fields.isEmpty) { @@ -65,7 +60,8 @@ case class JoinCodec(conf: JoinOps, @transient lazy val renameOnlyDeriveFunc: (Map[String, Any], Map[String, Any]) => Map[String, Any] = buildRenameOnlyDerivationFunction(conf.derivationsScala) - @transient lazy val valueCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(valueSchema).toString) + def valueCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(valueSchema).toString) + def keyCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString) /* * Get the serialized string repr. of the logging schema.