Skip to content

Commit

Permalink
modify caraml-store-sdk to enable maxcompute as data source type
Browse files Browse the repository at this point in the history
  • Loading branch information
vinoth-gojek committed Jan 16, 2025
1 parent 84cc988 commit f7c55f6
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ message DataSource {
SparkOverride spark_override = 2;
}

// Defines options for DataSource that sources features from a MaxCompute Query
message MaxComputeOptions {
string table_ref = 1;
}

// Defines options for DataSource that sources features from Kafka messages.
// Each message should be a Protobuf that can be decoded with the generated
// Java Protobuf class at the given class path
Expand All @@ -79,5 +84,6 @@ message DataSource {
FileOptions file_options = 11;
BigQueryOptions bigquery_options = 12;
KafkaOptions kafka_options = 13;
MaxComputeOptions maxcompute_options = 14;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dev.caraml.store.protobuf.core.DataFormatProto.StreamFormat;
import dev.caraml.store.protobuf.core.DataSourceProto;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.BigQueryOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.MaxComputeOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.FileOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.KafkaOptions;
import dev.caraml.store.protobuf.core.DataSourceProto.DataSource.SourceType;
Expand Down Expand Up @@ -86,6 +87,9 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) {
populateDatasourceConfigMapWithSparkOverride(
dataSourceConfigMap, spec.getBigqueryOptions().getSparkOverride());
}
case BATCH_MAXCOMPUTE -> {
dataSourceConfigMap.put("table_ref", spec.getMaxcomputeOptions().getTableRef());
}
case STREAM_KAFKA -> {
dataSourceConfigMap.put("bootstrap_servers", spec.getKafkaOptions().getBootstrapServers());
dataSourceConfigMap.put(
Expand Down Expand Up @@ -167,6 +171,11 @@ public DataSourceProto.DataSource toProto() {
parseDatasourceConfigMapToSparkOverride(dataSourceConfigMap));
spec.setBigqueryOptions(bigQueryOptions.build());
}
case BATCH_MAXCOMPUTE -> {
MaxComputeOptions.Builder maxComputeOptions = MaxComputeOptions.newBuilder();
maxComputeOptions.setTableRef(dataSourceConfigMap.get("table_ref"));
spec.setMaxcomputeOptions(maxComputeOptions.build());
}
case STREAM_KAFKA -> {
KafkaOptions.Builder kafkaOptions = KafkaOptions.newBuilder();
kafkaOptions.setBootstrapServers(dataSourceConfigMap.get("bootstrap_servers"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static void validate(DataSource spec) {
}
case BATCH_BIGQUERY -> Matchers.checkValidBigQueryTableRef(
spec.getBigqueryOptions().getTableRef(), "FeatureTable");
case BATCH_MAXCOMPUTE -> {} // validation for table_ref to be added
case STREAM_KAFKA -> {
StreamFormat.FormatCase messageFormat =
spec.getKafkaOptions().getMessageFormat().getFormatCase();
Expand Down
240 changes: 163 additions & 77 deletions caraml-store-sdk/go/protos/feast/core/DataSource.pb.go

Large diffs are not rendered by default.

26 changes: 14 additions & 12 deletions caraml-store-sdk/python/feast/core/DataSource_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions caraml-store-sdk/python/feast/core/DataSource_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Opti
DESCRIPTOR: _descriptor.FileDescriptor

class DataSource(_message.Message):
__slots__ = ["bigquery_options", "created_timestamp_column", "date_partition_column", "event_timestamp_column", "field_mapping", "file_options", "kafka_options", "type"]
__slots__ = ["bigquery_options", "created_timestamp_column", "date_partition_column", "event_timestamp_column", "field_mapping", "file_options", "kafka_options", "maxcompute_options", "type"]
class SourceType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
class BigQueryOptions(_message.Message):
Expand Down Expand Up @@ -46,6 +46,11 @@ class DataSource(_message.Message):
spark_override: _SparkOverride_pb2.SparkOverride
topic: str
def __init__(self, bootstrap_servers: _Optional[str] = ..., topic: _Optional[str] = ..., message_format: _Optional[_Union[_DataFormat_pb2.StreamFormat, _Mapping]] = ..., spark_override: _Optional[_Union[_SparkOverride_pb2.SparkOverride, _Mapping]] = ...) -> None: ...
class MaxComputeOptions(_message.Message):
__slots__ = ["table_ref"]
TABLE_REF_FIELD_NUMBER: _ClassVar[int]
table_ref: str
def __init__(self, table_ref: _Optional[str] = ...) -> None: ...
BATCH_BIGQUERY: DataSource.SourceType
BATCH_FILE: DataSource.SourceType
BATCH_MAXCOMPUTE: DataSource.SourceType
Expand All @@ -57,6 +62,7 @@ class DataSource(_message.Message):
FILE_OPTIONS_FIELD_NUMBER: _ClassVar[int]
INVALID: DataSource.SourceType
KAFKA_OPTIONS_FIELD_NUMBER: _ClassVar[int]
MAXCOMPUTE_OPTIONS_FIELD_NUMBER: _ClassVar[int]
STREAM_KAFKA: DataSource.SourceType
TYPE_FIELD_NUMBER: _ClassVar[int]
bigquery_options: DataSource.BigQueryOptions
Expand All @@ -66,5 +72,6 @@ class DataSource(_message.Message):
field_mapping: _containers.ScalarMap[str, str]
file_options: DataSource.FileOptions
kafka_options: DataSource.KafkaOptions
maxcompute_options: DataSource.MaxComputeOptions
type: DataSource.SourceType
def __init__(self, type: _Optional[_Union[DataSource.SourceType, str]] = ..., field_mapping: _Optional[_Mapping[str, str]] = ..., event_timestamp_column: _Optional[str] = ..., date_partition_column: _Optional[str] = ..., created_timestamp_column: _Optional[str] = ..., file_options: _Optional[_Union[DataSource.FileOptions, _Mapping]] = ..., bigquery_options: _Optional[_Union[DataSource.BigQueryOptions, _Mapping]] = ..., kafka_options: _Optional[_Union[DataSource.KafkaOptions, _Mapping]] = ...) -> None: ...
def __init__(self, type: _Optional[_Union[DataSource.SourceType, str]] = ..., field_mapping: _Optional[_Mapping[str, str]] = ..., event_timestamp_column: _Optional[str] = ..., date_partition_column: _Optional[str] = ..., created_timestamp_column: _Optional[str] = ..., file_options: _Optional[_Union[DataSource.FileOptions, _Mapping]] = ..., bigquery_options: _Optional[_Union[DataSource.BigQueryOptions, _Mapping]] = ..., kafka_options: _Optional[_Union[DataSource.KafkaOptions, _Mapping]] = ..., maxcompute_options: _Optional[_Union[DataSource.MaxComputeOptions, _Mapping]] = ...) -> None: ...
63 changes: 62 additions & 1 deletion caraml-store-sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class SourceType(enum.Enum):
UNKNOWN = 0
BATCH_FILE = 1
BATCH_BIGQUERY = 2
BATCH_MAXCOMPUTE = 3


@dataclass
Expand Down Expand Up @@ -129,6 +130,38 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions:
spark_override=self.spark_override.to_proto() if self.spark_override else None,
)

@dataclass
class MaxComputeOptions:
"""
DataSource MaxCompute options used to source features from MaxCompute query
"""
table_ref: str

@classmethod
def from_proto(cls, maxcompute_options_proto: DataSourceProto.MaxComputeOptions):
"""
Creates a MaxComputeOptions from a protobuf representation of a MaxCompute option
Args:
maxcompute_options_proto: A protobuf representation of a DataSource
Returns:
Returns a MaxComputeOptions object based on the maxcompute_options protobuf
"""

return cls(
table_ref=maxcompute_options_proto.table_ref,
)

def to_proto(self) -> DataSourceProto.MaxComputeOptions:
"""
Converts an MaxComputeOptionsProto object to its protobuf representation.
Returns:
MaxComputeOptionsProto protobuf
"""

return DataSourceProto.MaxComputeOptions(
table_ref=self.table_ref,
)


@dataclass
class FileSource:
Expand Down Expand Up @@ -171,6 +204,26 @@ def to_proto(self) -> DataSourceProto:
date_partition_column=self.date_partition_column,
)

@dataclass
class MaxComputeSource:
event_timestamp_column: str
table_ref: str
created_timestamp_column: str = ""
field_mapping: Optional[Dict[str, str]] = None
date_partition_column: str = ""
spark_override: Optional[SparkOverride] = None

def to_proto(self) -> DataSourceProto:
return DataSourceProto(
type=DataSourceProto.BATCH_MAXCOMPUTE,
field_mapping=self.field_mapping,
maxcompute_options=MaxComputeOptions(self.table_ref).to_proto(),
event_timestamp_column=self.event_timestamp_column,
created_timestamp_column=self.created_timestamp_column,
date_partition_column=self.date_partition_column,
)



@dataclass
class KafkaOptions:
Expand Down Expand Up @@ -237,7 +290,7 @@ def to_proto(self) -> DataSourceProto:
)


def new_batch_source_from_proto(data_source: DataSourceProto) -> Union[BigQuerySource, FileSource]:
def new_batch_source_from_proto(data_source: DataSourceProto) -> Union[BigQuerySource, FileSource, MaxComputeSource]:
"""
Convert data source config in FeatureTable spec proto to one of the data source model.
"""
Expand All @@ -261,6 +314,14 @@ def new_batch_source_from_proto(data_source: DataSourceProto) -> Union[BigQueryS
date_partition_column=data_source.date_partition_column,
spark_override=SparkOverride.from_proto(data_source.bigquery_options.spark_override),
)
elif data_source.maxcompute_options.table_ref:
return MaxComputeSource(
field_mapping=dict(data_source.field_mapping),
table_ref=data_source.maxcompute_options.table_ref,
event_timestamp_column=data_source.event_timestamp_column,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
else:
raise ValueError("Could not identify the source type being added")

Expand Down
3 changes: 2 additions & 1 deletion caraml-store-sdk/python/feast/feature_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from feast.data_source import (
BigQuerySource,
FileSource,
MaxComputeSource,
KafkaSource, new_batch_source_from_proto, new_stream_source_from_proto,
)
from feast.feature import Feature
Expand All @@ -29,7 +30,7 @@ def __init__(
name: str,
entities: List[str],
features: List[Feature],
batch_source: Optional[Union[BigQuerySource, FileSource]] = None,
batch_source: Optional[Union[BigQuerySource, FileSource, MaxComputeSource]] = None,
stream_source: Optional[KafkaSource] = None,
max_age: Optional[Duration] = None,
labels: Optional[MutableMapping[str, str]] = None,
Expand Down

0 comments on commit f7c55f6

Please sign in to comment.