Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for maxcompute - read maxcompute tables in spark jobs #131

Open
wants to merge 17 commits into
base: DAT-3394_add_MAXCOMPUTE_enum
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added buildSrc/.DS_Store
Binary file not shown.
Binary file added caraml-store-spark/.DS_Store
Binary file not shown.
7 changes: 6 additions & 1 deletion caraml-store-spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ dependencies {
testImplementation 'com.github.tomakehurst:wiremock-jre8:2.26.3'
testImplementation "com.dimafeng:testcontainers-scala-kafka_$scalaVersion:0.40.12"
testRuntimeOnly 'com.vladsch.flexmark:flexmark-all:0.35.10'
implementation files('./prebuilt-jars/custom-dialect.jar')
compileOnly('com.aliyun.odps:odps-jdbc:3.8.2') {
exclude group: 'org.antlr', module: 'antlr4-runtime'
}

}
application {
mainClassName = 'dev.caraml.spark.IngestionJob'
Expand All @@ -81,7 +86,7 @@ def containerRegistry = System.getenv('DOCKER_REGISTRY')
docker {
dependsOn shadowJar
dockerfile file('docker/Dockerfile')
files shadowJar.outputs, "$rootDir/caraml-store-pyspark/scripts"
files shadowJar.outputs, "$rootDir/caraml-store-pyspark/scripts", "$rootDir/caraml-store-spark/prebuilt-jars/custom-dialect.jar"
copySpec.with {
from("$rootDir/caraml-store-pyspark") {
include 'templates/**'
Expand Down
6 changes: 4 additions & 2 deletions caraml-store-spark/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ FROM --platform=linux/amd64 apache/spark-py:v3.1.3

ARG GCS_CONNECTOR_VERSION=2.2.5
ARG BQ_CONNECTOR_VERSION=0.27.1
ARG ODPC_JDBC_CONNECTOR=3.8.2

USER root
ADD https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-${GCS_CONNECTOR_VERSION}.jar /opt/spark/jars
ADD https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/${BQ_CONNECTOR_VERSION}/spark-bigquery-with-dependencies_2.12-${BQ_CONNECTOR_VERSION}.jar /opt/spark/jars
ADD https://github.com/aliyun/aliyun-odps-jdbc/releases/download/v${ODPC_JDBC_CONNECTOR}/odps-jdbc-${ODPC_JDBC_CONNECTOR}-jar-with-dependencies.jar /opt/spark/jars

RUN pip install Jinja2==3.1.2
# For logging to /dev/termination-log
RUN mkdir -p /dev

ADD caraml-spark-application-with-dependencies.jar /opt/spark/jars
ADD custom-dialect.jar /opt/spark/jars
ADD templates /opt/spark/work-dir/
ADD historical_feature_retrieval_job.py /opt/spark/work-dir

ADD historical_feature_retrieval_job.py /opt/spark/work-dir
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dev.caraml.spark
import dev.caraml.spark.metrics.IngestionPipelineMetrics
import dev.caraml.spark.sources.bq.BigQueryReader
import dev.caraml.spark.sources.file.FileReader
import dev.caraml.spark.sources.maxCompute.MaxComputeReader
import dev.caraml.spark.validation.RowValidator
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -44,6 +45,13 @@ object BatchPipeline extends BasePipeline {
config.startTime,
config.endTime
)
case source: MaxComputeSource =>
MaxComputeReader.createBatchSource(
sparkSession,
source,
config.startTime,
config.endTime
)
}

val projection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ object IngestionJob {
case (_, x) => x
}
.extract[Sources] match {
case Sources(file: Some[FileSource], _, _) => c.copy(source = file.get)
case Sources(_, bq: Some[BQSource], _) => c.copy(source = bq.get)
case Sources(_, _, kafka: Some[KafkaSource]) => c.copy(source = kafka.get)
case Sources(file: Some[FileSource], _, _, _) => c.copy(source = file.get)
case Sources(_, bq: Some[BQSource], _, _) => c.copy(source = bq.get)
case Sources(_, _, kafka: Some[KafkaSource], _) => c.copy(source = kafka.get)
case Sources(_, _, _, maxCompute: Some[MaxComputeSource]) =>
c.copy(source = maxCompute.get)
}
})
.required()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ case class BQSource(
override val datePartitionColumn: Option[String] = None
) extends BatchSource

case class MaxComputeSource(
project: String,
dataset: String,
table: String,
override val fieldMapping: Map[String, String],
override val eventTimestampColumn: String,
override val createdTimestampColumn: Option[String] = None,
override val datePartitionColumn: Option[String] = None
) extends BatchSource

case class KafkaSource(
bootstrapServers: String,
topic: String,
Expand All @@ -96,7 +106,8 @@ case class KafkaSource(
case class Sources(
file: Option[FileSource] = None,
bq: Option[BQSource] = None,
kafka: Option[KafkaSource] = None
kafka: Option[KafkaSource] = None,
maxCompute: Option[MaxComputeSource] = None
)

case class Field(name: String, `type`: ValueType.Enum)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dev.caraml.spark.sources.maxCompute

import org.apache.spark.sql.jdbc.JdbcDialect

class CustomDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = {
println("can handle? this one")
println(url.startsWith("jdbc:odps"), url)
url.startsWith("jdbc:odps")
}

override def quoteIdentifier(colName: String): String = {
println("inside quote identifier", colName, s"$colName")
s"$colName"
}

override def getSchemaQuery(table: String): String = {
println("getschemaquery", table)
table
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package dev.caraml.spark.sources.maxCompute

import dev.caraml.spark.MaxComputeSource
import org.joda.time.DateTime
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{SQLContext}
import org.apache.spark.sql.jdbc.JdbcDialects

object MaxComputeReader {
def createBatchSource(
sparkSession: SparkSession,
source: MaxComputeSource,
start: DateTime,
end: DateTime
): DataFrame = {
val maxComputeAccessID = sys.env("CARAML_SPARK_MAXCOMPUTE_ACCESS_ID")
val maxComputeAccessKey = sys.env("CARAML_SPARK_MAXCOMPUTE_ACCESS_KEY")
val maxComputeJDBCConnectionURL =
"jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s&interactiveMode=True&enableLimit=False" format source.project

val sqlQuery = "(select * from `%s.%s` where to_millis(%s) > %d and to_millis(%s) < %d)" format (
source.dataset, source.table, source.eventTimestampColumn, start.getMillis, source.eventTimestampColumn, end.getMillis
)

// val sqlQuery = "(select * from `%s.%s`)" format (
// source.dataset, source.table
// )
println("query is", sqlQuery)

println(JdbcDialects.get("jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s&interactiveMode=True&enableLimit=False" format source.project))
val customDialect = new CustomDialect()
JdbcDialects.registerDialect(customDialect)
println("custom dialect registered")
println(JdbcDialects.get("jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s&interactiveMode=True&enableLimit=False" format source.project))

val data = sparkSession.read
.format("jdbc")
.option("url", maxComputeJDBCConnectionURL)
// Not setting queryTimeout will fail the query, whereas setting it up actually doesn't make an impact
.option("queryTimeout", 5000)
.option("dbtable", sqlQuery)
// ,option("query", sqlQuery)
.option("user", maxComputeAccessID)
.option("password", maxComputeAccessKey)
.load()

println(data)
println(data.toDF().show(3))
println(data.toDF().count())

// data.toDF().registerTempTable("temp_table")
// val valres = sparkSession.sql("select * from temp_table")
//
// println("result from query", valres)
data.toDF()
}
}
Loading