diff --git a/.gitignore b/.gitignore index dc020f2..40f73a7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .idea/ spark/ +spark integration-test/target/ *.class *.log diff --git a/e2e/runner.sh b/e2e/runner.sh index aded856..44a50f8 100755 --- a/e2e/runner.sh +++ b/e2e/runner.sh @@ -90,21 +90,31 @@ git checkout -B $BRANCH origin/$BRANCH TAG=$(git rev-parse HEAD | cut -c -6) echo "Spark distribution built at SHA $TAG" +FILE_SERVER_IMAGE="$IMAGE_REPO/spark-examples-file-server:$TAG" +FILE_SERVER_BUILD_DIR="$TEST_ROOT/integration-test/docker-file-server" +rm -rf $FILE_SERVER_BUILD_DIR/jars +mkdir -p $FILE_SERVER_BUILD_DIR/jars +cp $SPARK_REPO_ROOT/dist/examples/jars/spark-examples*.jar $FILE_SERVER_BUILD_DIR/jars/. cd $SPARK_REPO_ROOT/dist if [[ $DEPLOY_MODE == cloud ]] ; then + docker build -t $FILE_SERVER_IMAGE "$TEST_ROOT/integration-test/docker-file-server" ./sbin/build-push-docker-images.sh -r $IMAGE_REPO -t $TAG build if [[ $IMAGE_REPO == gcr.io* ]] ; then gcloud docker -- push $IMAGE_REPO/spark-driver:$TAG && \ gcloud docker -- push $IMAGE_REPO/spark-executor:$TAG && \ gcloud docker -- push $IMAGE_REPO/spark-init:$TAG + gcloud docker -- push $FILE_SERVER_IMAGE else ./sbin/build-push-docker-images.sh -r $IMAGE_REPO -t $TAG push + docker push $FILE_SERVER_IMAGE fi else # -m option for minikube. + eval $(minikube docker-env) + docker build -t $FILE_SERVER_IMAGE "$TEST_ROOT/integration-test/docker-file-server" ./sbin/build-push-docker-images.sh -m -r $IMAGE_REPO -t $TAG build fi @@ -112,6 +122,7 @@ cd $TEST_ROOT/integration-test $SPARK_REPO_ROOT/build/mvn clean -Ddownload.plugin.skip=true integration-test \ -Dspark-distro-tgz=$SPARK_REPO_ROOT/*.tgz \ -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://$MASTER \ + -Dspark.docker.test.fileServerImage=$FILE_SERVER_IMAGE \ -Dspark.docker.test.driverImage=$IMAGE_REPO/spark-driver:$TAG \ -Dspark.docker.test.executorImage=$IMAGE_REPO/spark-executor:$TAG \ -Dspark.docker.test.initContainerImage=$IMAGE_REPO/spark-init:$TAG" || : diff --git a/integration-test/docker-file-server/.gitignore b/integration-test/docker-file-server/.gitignore new file mode 100644 index 0000000..2723de6 --- /dev/null +++ b/integration-test/docker-file-server/.gitignore @@ -0,0 +1 @@ +jars diff --git a/integration-test/docker-file-server/Dockerfile b/integration-test/docker-file-server/Dockerfile new file mode 100644 index 0000000..537748d --- /dev/null +++ b/integration-test/docker-file-server/Dockerfile @@ -0,0 +1,4 @@ +FROM nginx:alpine + +COPY jars /opt/spark/jars +COPY nginx.conf /etc/nginx/nginx.conf diff --git a/integration-test/docker-file-server/nginx.conf b/integration-test/docker-file-server/nginx.conf new file mode 100644 index 0000000..ce0a45f --- /dev/null +++ b/integration-test/docker-file-server/nginx.conf @@ -0,0 +1,34 @@ +user nginx; +worker_processes 1; + +error_log /var/log/nginx/error.log warn; +pid /var/run/nginx.pid; + +events { + worker_connections 1024; +} + +http { + server { + root /opt/spark/jars; + location /ping { + return 200 'pong'; + add_header Content-Type text/plain; + } + } + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + + sendfile on; + #tcp_nopush on; + + keepalive_timeout 65; + + #gzip on; +} diff --git a/integration-test/pom.xml b/integration-test/pom.xml index bf48318..20de4cc 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -135,7 +135,7 @@ /bin/sh -c - rm -rf spark-distro; mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp + rm -rf spark-distro; mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp; rm -rf docker-file-server/jars; mkdir -p docker-file-server/jars; cp spark-distro/examples/jars/spark-examples*.jar docker-file-server/jars/. diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 91d40a8..c4660d9 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.File +import java.net.URI import java.nio.file.Paths import java.util.UUID import java.util.regex.Pattern import scala.collection.JavaConverters._ - import com.google.common.io.PatternFilenameFilter import io.fabric8.kubernetes.api.model.{Container, Pod} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} @@ -39,6 +39,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkAppConf: SparkAppConf = _ + private var remoteExamplesJarUri: URI = _ private val driverImage = System.getProperty( "spark.docker.test.driverImage", @@ -50,7 +51,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit "spark.docker.test.initContainerImage", "spark-init:latest") - override def beforeAll(): Unit = { testBackend.initialize() kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) @@ -67,6 +67,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) .set("spark.kubernetes.executor.label.spark-app-locator", APP_LOCATOR_LABEL) kubernetesTestComponents.createNamespace() + remoteExamplesJarUri = SparkExamplesFileServerRunner + .launchServerAndGetUriForExamplesJar(kubernetesTestComponents) } after { @@ -97,6 +99,11 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit runSparkPiAndVerifyCompletion(appArgs = Array("5")) } + test("Run SparkPi using the remote example jar.") { + sparkAppConf.set("spark.kubernetes.initContainer.image", initContainerImage) + runSparkPiAndVerifyCompletion(appResource = remoteExamplesJarUri.toString) + } + test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") { sparkAppConf .set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi") @@ -163,8 +170,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit createTestSecret() - runSparkPageRankAndVerifyCompletion( - appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE), + runSparkPiAndVerifyCompletion( + appResource = remoteExamplesJarUri.toString, driverPodChecker = (driverPod: Pod) => { doBasicDriverPodCheck(driverPod) checkTestSecret(driverPod, withInitContainer = true) @@ -188,7 +195,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit driverPodChecker, executorPodChecker) } - private def runSparkPageRankAndVerifyCompletion( appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkExamplesFileServerRunner.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkExamplesFileServerRunner.scala new file mode 100644 index 0000000..fa96b1f --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkExamplesFileServerRunner.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.net.{URI, URL} +import java.nio.file.Paths +import java.util.UUID + +import io.fabric8.kubernetes.api.model.{Endpoints, Pod, Service} +import org.apache.http.client.utils.URIBuilder + +private[spark] object SparkExamplesFileServerRunner { + + private val fileServerImage = System.getProperty( + "spark.docker.test.fileServerImage", "spark-examples-file-server:latest") + private val fileServerExampleJarsDir = Paths.get("docker-file-server", "jars") + require( + fileServerExampleJarsDir + .toFile + .listFiles() + .exists(file => file.getName.startsWith("spark-examples")), + s"No spark-examples jar found in $fileServerExampleJarsDir.") + require( + fileServerExampleJarsDir + .toFile + .listFiles() + .count(file => file.getName.startsWith("spark-examples")) == 1, + s"Multiple spark-examples jars found in $fileServerExampleJarsDir.") + private val fileServerExampleJar = Paths.get("docker-file-server", "jars") + .toFile + .listFiles() + .filter(file => file.getName.startsWith("spark-examples"))(0) + .getName + private val fileServerPodLocatorLabelKey = "fileServerLocator" + private val fileServerPodLocatorLabelValue = UUID.randomUUID().toString.replaceAll("-", "") + private val fileServerName = "spark-examples-file-server" + + def launchServerAndGetUriForExamplesJar( + kubernetesTestComponents: KubernetesTestComponents): URI = { + val podReadinessWatcher = new SparkReadinessWatcher[Pod] + Utils.tryWithResource( + kubernetesTestComponents + .kubernetesClient + .pods() + .withName(fileServerName) + .watch(podReadinessWatcher)) { _ => + kubernetesTestComponents.kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(fileServerName) + .addToLabels(fileServerPodLocatorLabelKey, fileServerPodLocatorLabelValue) + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("main") + .withImage(fileServerImage) + .withImagePullPolicy("Never") + .withNewReadinessProbe() + .withNewHttpGet() + .withNewPort(80) + .withPath("/ping") + .endHttpGet() + .endReadinessProbe() + .endContainer() + .endSpec() + .done() + podReadinessWatcher.waitUntilReady() + } + val endpointsReadinessWatcher = new SparkReadinessWatcher[Endpoints] + Utils.tryWithResource( + kubernetesTestComponents + .kubernetesClient + .endpoints() + .withName(fileServerName) + .watch(endpointsReadinessWatcher)) { _ => + kubernetesTestComponents.kubernetesClient.services().createNew() + .withNewMetadata() + .withName(fileServerName) + .endMetadata() + .withNewSpec() + .addToSelector(fileServerPodLocatorLabelKey, fileServerPodLocatorLabelValue) + .addNewPort() + .withName("file-server-port") + .withNewTargetPort(80) + .withPort(80) + .endPort() + .withType("NodePort") + .endSpec() + .done() + endpointsReadinessWatcher.waitUntilReady() + } + val resolvedNodePort = kubernetesTestComponents + .kubernetesClient + .services() + .withName(fileServerName) + .get() + .getSpec + .getPorts + .get(0) + .getNodePort + val masterHostname = URI.create(kubernetesTestComponents.clientConfig.getMasterUrl).getHost + new URIBuilder() + .setHost(masterHostname) + .setPort(resolvedNodePort) + .setScheme("http") + .setPath(s"/$fileServerExampleJar") + .build() + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index 9cce325..0df0ad0 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -43,6 +43,8 @@ private[spark] class SparkDockerImageBuilder private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", throw new IllegalStateException("DOCKER_HOST env not found.")) + private val FILE_SERVER_BUILD_PATH = Paths.get("docker-file-server") + private val originalDockerUri = URI.create(dockerHost) private val httpsDockerUri = new URIBuilder() .setHost(originalDockerUri.getHost) @@ -68,6 +70,10 @@ private[spark] class SparkDockerImageBuilder buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) + dockerClient.build( + FILE_SERVER_BUILD_PATH, + "spark-examples-file-server", + new LoggingBuildHandler()) } private def buildImage(