From 92f5783be6245a14d06c513e979861502659396a Mon Sep 17 00:00:00 2001 From: Srikumar Venugopal Date: Fri, 17 Jan 2020 10:03:19 +0000 Subject: [PATCH] Hive Metastore Integration --- .gitignore | 7 +- examples/hive/k8s/Dockerfile | 21 ++ examples/hive/k8s/Dockerfile.hiveserver | 19 + examples/hive/k8s/Dockerfile.metastore | 20 ++ examples/hive/k8s/Makefile | 123 +++++++ examples/hive/k8s/README.md | 39 ++ examples/hive/k8s/conf/hive-site.tmpl | 98 +++++ .../hive/k8s/conf/metastore-log4j2.properties | 64 ++++ examples/hive/k8s/conf/metastore-site.tmpl | 124 +++++++ .../hive/k8s/deploy/cluster/postgres.yaml | 51 +++ examples/hive/k8s/deploy/database-secret.yaml | 12 + examples/hive/k8s/deploy/database.yaml | 51 +++ examples/hive/k8s/deploy/hivemetastore.yaml | 40 +++ examples/hive/k8s/deploy/hiveserver.yaml | 45 +++ .../hive/k8s/deploy/postgres-volumes.yaml | 14 + examples/hive/k8s/deploy/s3-secret.tmpl | 11 + examples/hive/k8s/deploy_hive.sh | 8 + examples/hive/k8s/hiveserver-entrypoint.sh | 3 + examples/hive/k8s/metastore-entrypoint.sh | 12 + examples/hive/k8s/test-hive.sh | 10 + examples/hive/sampleapp/README.md | 43 +++ examples/hive/sampleapp/bookdataset.yaml | 15 + examples/hive/sampleapp/books.csv | 11 + examples/hive/sampleapp/run_sample.sh | 90 +++++ examples/hive/sampleapp/sample.hql | 5 + examples/hive/sampleapp/samplepod.yaml | 13 + src/dataset-operator/deploy/operator.yaml | 4 +- src/dataset-operator/go.mod | 7 +- src/dataset-operator/go.sum | 20 ++ .../pkg/admissioncontroller/mutate.go | 111 ++++-- .../pkg/apis/com/v1alpha1/dataset_types.go | 2 +- .../controller/dataset/dataset_controller.go | 340 ++++++++++++++++-- .../controller/dataset/metastore_client.go | 146 ++++++++ 33 files changed, 1522 insertions(+), 57 deletions(-) create mode 100644 examples/hive/k8s/Dockerfile create mode 100644 examples/hive/k8s/Dockerfile.hiveserver create mode 100644 examples/hive/k8s/Dockerfile.metastore create mode 100644 examples/hive/k8s/Makefile create mode 100644 examples/hive/k8s/README.md create mode 100644 examples/hive/k8s/conf/hive-site.tmpl create mode 100644 examples/hive/k8s/conf/metastore-log4j2.properties create mode 100644 examples/hive/k8s/conf/metastore-site.tmpl create mode 100644 examples/hive/k8s/deploy/cluster/postgres.yaml create mode 100644 examples/hive/k8s/deploy/database-secret.yaml create mode 100644 examples/hive/k8s/deploy/database.yaml create mode 100644 examples/hive/k8s/deploy/hivemetastore.yaml create mode 100644 examples/hive/k8s/deploy/hiveserver.yaml create mode 100644 examples/hive/k8s/deploy/postgres-volumes.yaml create mode 100644 examples/hive/k8s/deploy/s3-secret.tmpl create mode 100644 examples/hive/k8s/deploy_hive.sh create mode 100755 examples/hive/k8s/hiveserver-entrypoint.sh create mode 100755 examples/hive/k8s/metastore-entrypoint.sh create mode 100755 examples/hive/k8s/test-hive.sh create mode 100644 examples/hive/sampleapp/README.md create mode 100644 examples/hive/sampleapp/bookdataset.yaml create mode 100644 examples/hive/sampleapp/books.csv create mode 100755 examples/hive/sampleapp/run_sample.sh create mode 100644 examples/hive/sampleapp/sample.hql create mode 100644 examples/hive/sampleapp/samplepod.yaml create mode 100644 src/dataset-operator/pkg/controller/dataset/metastore_client.go diff --git a/.gitignore b/.gitignore index b526d80d..eca0e86a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,7 @@ +.DS_Store _tmp -.idea \ No newline at end of file +.idea +*.sw[po] +*Dockerfile.csi-driver-nfs +*Dockerfile.csi-s3 +*s3-secret.yaml diff --git a/examples/hive/k8s/Dockerfile b/examples/hive/k8s/Dockerfile new file mode 100644 index 00000000..bffd0d89 --- /dev/null +++ b/examples/hive/k8s/Dockerfile @@ -0,0 +1,21 @@ +FROM registry.access.redhat.com/ubi7/ubi + +WORKDIR /opt + +ENV JAVA_HOME=/usr/lib/jvm/jre/ +ENV HADOOP_HOME=/opt/hadoop-3.1.2 +ENV HIVE_HOME=/opt/apache-hive-3.1.2-bin + +RUN yum update --disableplugin=subscription-manager -y && rm -rf /var/cache/yum && \ + yum install --disableplugin=subscription-manager java-1.8.0-openjdk-headless -y && \ + yum install --disableplugin=subscription-manager postgresql-devel -y + +RUN curl -L https://archive.apache.org/dist/hadoop/core/hadoop-3.1.2/hadoop-3.1.2.tar.gz | tar zxf - && \ + curl -L https://www-us.apache.org/dist/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz | tar zxf - + +RUN curl -L https://jdbc.postgresql.org/download/postgresql-42.2.8.jar > ${HIVE_HOME}/lib/postgresql-42.2.8.jar && \ + curl -L https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.2/hadoop-aws-3.1.2.jar > ${HADOOP_HOME}/lib/hadoop-aws-3.1.2.jar && \ + curl -L https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.671/aws-java-sdk-core-1.11.671.jar > ${HADOOP_HOME}/lib/aws-java-sdk-core-1.11.671.jar && \ + curl -L https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.671/aws-java-sdk-s3-1.11.671.jar > ${HADOOP_HOME}/lib/aws-java-sdk-s3-1.11.671.jar && \ + curl -L https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.671/aws-java-sdk-dynamodb-1.11.671.jar > ${HADOOP_HOME}/lib/aws-java-sdk-dynamodb-1.11.671.jar && \ + cp -v ${HADOOP_HOME}/lib/*aws*.jar ${HIVE_HOME}/lib/ diff --git a/examples/hive/k8s/Dockerfile.hiveserver b/examples/hive/k8s/Dockerfile.hiveserver new file mode 100644 index 00000000..f2816cc1 --- /dev/null +++ b/examples/hive/k8s/Dockerfile.hiveserver @@ -0,0 +1,19 @@ +FROM dlf-hive-base:latest + +ENV HIVE_HOME=/opt/apache-hive-3.1.2-bin + +COPY conf/hive-site.xml ${HIVE_HOME}/conf/ +COPY hiveserver-entrypoint.sh ${HIVE_HOME}/entrypoint.sh + +RUN groupadd -r hive --gid=1000 && \ + useradd -r -g hive --uid=1000 -d ${HIVE_HOME} hive && \ + chown hive:hive -R ${HIVE_HOME} && \ + mkdir /tmp/hive && \ + chmod -R a+w /tmp/hive/ + +USER hive +WORKDIR $HIVE_HOME +EXPOSE 10001 10002 + +ENTRYPOINT ["./entrypoint.sh"] +CMD ["bin/hive","--service","hiveserver2"] diff --git a/examples/hive/k8s/Dockerfile.metastore b/examples/hive/k8s/Dockerfile.metastore new file mode 100644 index 00000000..1f1fbdfc --- /dev/null +++ b/examples/hive/k8s/Dockerfile.metastore @@ -0,0 +1,20 @@ +FROM dlf-hive-base:latest + +ENV METASTORE_HOME=/opt/apache-hive-3.1.2-bin + +COPY conf/metastore-site.xml ${METASTORE_HOME}/conf/hive-site.xml +COPY conf/metastore-log4j2.properties ${METASTORE_HOME}/conf/metastore-log4j2.properties +COPY metastore-entrypoint.sh ${METASTORE_HOME}/entrypoint.sh + +RUN groupadd -r hive --gid=1000 && \ + useradd -r -g hive --uid=1000 -d ${METASTORE_HOME} hive && \ + chown hive:hive -R ${METASTORE_HOME} && \ + mkdir /tmp/hive && \ + chmod -R a+w /tmp/hive/ + +USER hive +WORKDIR $METASTORE_HOME +EXPOSE 9083 + +ENTRYPOINT ["./entrypoint.sh"] +CMD ["bin/hive","--service","metastore"] diff --git a/examples/hive/k8s/Makefile b/examples/hive/k8s/Makefile new file mode 100644 index 00000000..62d1dee6 --- /dev/null +++ b/examples/hive/k8s/Makefile @@ -0,0 +1,123 @@ +SHELL=/bin/bash + +DATASET_OPERATOR_NAMESPACE := default + +DOCKER_REGISTRY_COMPONENTS := the_registry_to_use_for_components +DOCKER_REGISTRY_SECRET := your_already_installed_secrets + +HIVE_FILE_PATH := $(shell pwd) + +HIVE_BASE_DOCKERFILE := $(HIVE_FILE_PATH)/Dockerfile +HIVE_BASE_IMAGE := dlf-hive-base +HIVE_BASE_TAG := latest +HIVE_BASE_IMAGE := $(HIVE_BASE_IMAGE):$(HIVE_BASE_TAG) + +HIVESERVER_IMAGE := hive-server +HIVESERVER_TAG := latest +HIVESERVER_IMAGE := $(DOCKER_REGISTRY_COMPONENTS)/$(HIVESERVER_IMAGE) +HIVESERVER_IMAGE := $(HIVESERVER_IMAGE):$(HIVESERVER_TAG) +HIVESERVER_DOCKERFILE := $(HIVE_FILE_PATH)/Dockerfile.hiveserver + +HIVEMETASTORE_IMAGE := hive-metastore +HIVEMETASTORE_TAG := latest +HIVEMETASTORE_IMAGE := $(DOCKER_REGISTRY_COMPONENTS)/$(HIVEMETASTORE_IMAGE) +HIVEMETASTORE_IMAGE := $(HIVEMETASTORE_IMAGE):$(HIVEMETASTORE_TAG) +HIVEMETASTORE_DOCKERFILE := $(HIVE_FILE_PATH)/Dockerfile.metastore + +MAKE_ENV += DATASET_OPERATOR_NAMESPACE +MAKE_ENV += DOCKER_REGISTRY_SECRET +MAKE_ENV += HIVESERVER_IMAGE +MAKE_ENV += HIVEMETASTORE_IMAGE + +SHELL_EXPORT := $(foreach v,$(MAKE_ENV),$(v)='$($(v))' ) + +#K8S_FILES += $(shell find $(HIVE_FILE_PATH)/deploy -maxdepth 1 -name '*.yaml') +define load_containers_minikube + @mkdir -p _tmp ;\ + docker save $(1) | gzip > _tmp/$(2).tar.gz ;\ + eval $$(minikube docker-env) ;\ + docker load < _tmp/$(2).tar.gz ;\ + rm -rf _tmp/$(2).tar.gz +endef + +define build-images + $(info Building $(1)) + docker build -t $(1) -f $(2) $(3) +endef + +define deploy-k8s + $(info Deploying $(1)) + @$(SHELL_EXPORT) envsubst < $(HIVE_FILE_PATH)/deploy/$(1).yaml | kubectl apply -n $(DATASET_OPERATOR_NAMESPACE) -f - + @sleep 30s; kubectl wait --namespace $(DATASET_OPERATOR_NAMESPACE) --for condition=ready pods -l app=$(1) --timeout=90s > /dev/null 2>&1 +endef + +define undeploy-k8s + $(info Undeploying $(1)) + @$(SHELL_EXPORT) envsubst < $(HIVE_FILE_PATH)/deploy/$(1).yaml | kubectl delete -n $(DATASET_OPERATOR_NAMESPACE) --ignore-not-found --wait -f - +endef + +noobaa-env: +ifeq ($(origin S3_ENDPOINT),undefined) +$(info Getting connection parameters from Noobaa) +ifeq ($(origin NOOBAA_HOME),environment) +NOOBAA_HOME := ${NOOBAA_HOME} +else +$(error NOOBAA_HOME not found or unset) +endif +S3_ENDPOINT := $(shell minikube service s3 --url | head -n1) +AWS_ACCESS_KEY_ID := $(shell $(NOOBAA_HOME)/noobaa status 2>/dev/null | grep AWS_ACCESS_KEY_ID | awk -F ": " '{print $$2}') +AWS_SECRET_ACCESS_KEY := $(shell $(NOOBAA_HOME)/noobaa status 2>/dev/null | grep AWS_SECRET_ACCESS_KEY | awk -F ": " '{print $$2}') +else ifeq ($(origin S3_ENDPOINT),environment) +$(info Getting connection parameters from env) +S3_ENDPOINT := ${S3_ENDPOINT} +AWS_ACCESS_KEY_ID := ${AWS_ACCESS_KEY_ID} +AWS_SECRET_ACCESS_KEY := ${AWS_SECRET_ACCESS_KEY} +endif + +conf/hive-site.xml: noobaa-env + @sed -e "s|\$${S3_ENDPOINT}|$(S3_ENDPOINT)|g" conf/hive-site.tmpl > conf/hive-site.xml + +conf/metastore-site.xml: noobaa-env + @sed -e "s|\$${S3_ENDPOINT}|$(S3_ENDPOINT)|g" conf/metastore-site.tmpl > conf/metastore-site.xml + +deploy/s3-secret.yaml: noobaa-env + @sed -e "s|\$${AWS_ACCESS_KEY_ID}|$(AWS_ACCESS_KEY_ID)|g" \ + -e "s|\$${AWS_SECRET_ACCESS_KEY}|$(AWS_SECRET_ACCESS_KEY)|g" \ + deploy/s3-secret.tmpl > deploy/s3-secret.yaml + +build-images: conf/hive-site.xml conf/metastore-site.xml + $(call build-images,$(HIVE_BASE_IMAGE),$(HIVE_BASE_DOCKERFILE),$(HIVE_FILE_PATH)) + $(call build-images,$(HIVEMETASTORE_IMAGE),$(HIVEMETASTORE_DOCKERFILE),$(HIVE_FILE_PATH)) + $(call build-images,$(HIVESERVER_IMAGE),$(HIVESERVER_DOCKERFILE),$(HIVE_FILE_PATH)) + +push-images: build-images + @docker push $(HIVESERVER_IMAGE) ;\ + docker push $(HIVEMETASTORE_IMAGE) + +minikube-load-containers: build-images + $(call load_containers_minikube,$(HIVEMETASTORE_IMAGE),hivemetastore) + $(call load_containers_minikube,$(HIVESERVER_IMAGE),hiveserver) + +deploy-secret: deploy/s3-secret.yaml + @kubectl apply -n $(DATASET_OPERATOR_NAMESPACE) -f deploy/s3-secret.yaml; \ + kubectl apply -n $(DATASET_OPERATOR_NAMESPACE) -f deploy/database-secret.yaml + +deploy-database: deploy-secret + $(call deploy-k8s,database) + +deploy-hivemetastore: minikube-load-containers deploy-database + $(call deploy-k8s,hivemetastore) + +deploy-hive: deploy-hivemetastore + $(call deploy-k8s,hiveserver) + +undeploy-hive: + $(call undeploy-k8s,hiveserver) + $(call undeploy-k8s,hivemetastore) + $(call undeploy-k8s,database) + $(call undeploy-k8s,database-secret) + $(call undeploy-k8s,s3-secret) + +minikube-install: deploy-hive + +minikube-uninstall: undeploy-hive diff --git a/examples/hive/k8s/README.md b/examples/hive/k8s/README.md new file mode 100644 index 00000000..d00d3bd1 --- /dev/null +++ b/examples/hive/k8s/README.md @@ -0,0 +1,39 @@ +# Installing Hive in Kubernetes + +In this series of steps, we will be installing Hive in Kubernetes to have a +metadata catalog that can be queried by the framework to create datasets. Hive +should be installed in the same namespace as the rest of DLF. This +guide assumes Minikube as the target Kubernetes cluster, but it is applicable to any Kubernetes/Openshift infrastructure. + +## Initial steps + +First, some configuration. The ObjectStorage integration of Hive requires that the endpoint be provided at the point of initial configuration. +if you are using the Nooba install as described in the main installation guide, then all you have to do is to export the directory where Nooba is placed. + +``` +$ unset S3_ENDPOINT +$ export NOOBAA_HOME=path/to/Noobaa/directory +``` +If you are using a different Object Storage service, then you need to set these environment variables + +``` +$ export S3_ENDPOINT=http:// +$ export AWS_ACCESS_KEY_ID = "Access key for Object Storage" +$ export AWS_SECRET_ACCESS_KEY = "Secret access key for Object Storage" +``` +Then, examine `Makefile` in `examples/hive/k8s` and add values for `DATASET_NAMESPACE_OPERATOR`, `DOCKER_REGISTRY_COMPONENTS`, and `DOCKER_REGISTRY_SECRET`. Please ensure that these variable values are the same as that used for installing DLF. + +Now go ahead and complete the install +``` +$ make minikube-install +``` + +Test your installation with `test-hive.sh`. Examine the script in a editor and change the values of namespace and repository variables +``` +$ ./test-hive.sh +``` +If the output is +``` +HTTP/1.1 200 OK +``` +then you can try the URL provided in a browser and verify that the Hive landing page is displayed correctly diff --git a/examples/hive/k8s/conf/hive-site.tmpl b/examples/hive/k8s/conf/hive-site.tmpl new file mode 100644 index 00000000..a1ddecaa --- /dev/null +++ b/examples/hive/k8s/conf/hive-site.tmpl @@ -0,0 +1,98 @@ + + + + metastore.thrift.uris + thrift://hivemetastore:9083 + + + + hive.metastore.schema.verification + false + + + + hive.metastore.warehouse.dir + file:///tmp + + + + fs.default.name + file:///tmp + + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + + fs.s3a.endpoint + ${S3_ENDPOINT} + + + + hive.exec.scratchdir + /tmp/hive + + + + hive.server2.transport.mode + http + + + + hive.server2.thrift.http.port + 10001 + + + + hive.server2.thrift.http.path + cliservice + + + + hive.server2.thrift.http.min.worker.threads + 5 + + + + hive.server2.thrift.http.max.worker.threads + 500 + + + + hive.server2.logging.operation.enabled + true + + + + hive.server2.logging.operation.level + PERFORMANCE + + + + mapred.input.dir.recursive + true + + + + hive.mapred.supports.subdirectories + true + + + + hive.server2.active.passive.ha.enable + true + + + + hive.execution.engine + tez + + + + hive.metastore.event.db.notification.api.auth + false + + + diff --git a/examples/hive/k8s/conf/metastore-log4j2.properties b/examples/hive/k8s/conf/metastore-log4j2.properties new file mode 100644 index 00000000..6e49f4df --- /dev/null +++ b/examples/hive/k8s/conf/metastore-log4j2.properties @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +status = INFO +name = HiveLog4j2 +packages = org.apache.hadoop.hive.ql.log + +# list of properties +property.hive.log.level = INFO +property.hive.root.logger = console +property.hive.log.dir = ${sys:java.io.tmpdir}/${sys:user.name} +property.hive.log.file = hive.log +property.hive.perflogger.log.level = INFO + +# list of all appenders +appenders = console + +# console appender +appender.console.type = Console +appender.console.name = console +appender.console.target = SYSTEM_ERR +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n + +# list of all loggers +loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, PerfLogger + +logger.NIOServerCnxn.name = org.apache.zookeeper.server.NIOServerCnxn +logger.NIOServerCnxn.level = WARN + +logger.ClientCnxnSocketNIO.name = org.apache.zookeeper.ClientCnxnSocketNIO +logger.ClientCnxnSocketNIO.level = WARN + +logger.DataNucleus.name = DataNucleus +logger.DataNucleus.level = ERROR + +logger.Datastore.name = Datastore +logger.Datastore.level = ERROR + +logger.JPOX.name = JPOX +logger.JPOX.level = ERROR + +logger.PerfLogger.name = org.apache.hadoop.hive.ql.log.PerfLogger +logger.PerfLogger.level = ${sys:hive.perflogger.log.level} + +# root logger +rootLogger.level = ${sys:hive.log.level} +rootLogger.appenderRefs = root +rootLogger.appenderRef.root.ref = ${sys:hive.root.logger} + + diff --git a/examples/hive/k8s/conf/metastore-site.tmpl b/examples/hive/k8s/conf/metastore-site.tmpl new file mode 100644 index 00000000..55cfc590 --- /dev/null +++ b/examples/hive/k8s/conf/metastore-site.tmpl @@ -0,0 +1,124 @@ + + + + + hive.metastore.schema.verification + false + + + + hive.metastore.warehouse.dir + file:///tmp + + + + fs.default.name + file:///tmp + + + + javax.jdo.option.ConnectionURL + jdbc:postgresql://metastoredb:5432/hive_metastore + + + + javax.jdo.option.ConnectionDriverName + org.postgresql.Driver + + + + javax.jdo.option.ConnectionUserName + postgres + + + + javax.jdo.option.ConnectionPassword + sekritpasswd + + + + datanucleus.autoCreateSchema + false + + + + hive.exec.scratchdir + /tmp/hive + + + + hive.server2.transport.mode + http + + + + hive.server2.thrift.http.port + 10001 + + + + hive.server2.thrift.http.path + cliservice + + + + hive.server2.thrift.http.min.worker.threads + 5 + + + + hive.server2.thrift.http.max.worker.threads + 500 + + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + + fs.s3a.endpoint + ${S3_ENDPOINT} + + + + hive.server2.logging.operation.enabled + true + + + + hive.server2.logging.operation.level + PERFORMANCE + + + + hive.server2.webui.show.graph + true + + + + hive.server2.webui.explain.output + true + + + + hive.server2.webui.show.stats + true + + + + mapred.input.dir.recursive + true + + + + hive.mapred.supports.subdirectories + true + + + + hive.metastore.event.db.notification.api.auth + false + + + diff --git a/examples/hive/k8s/deploy/cluster/postgres.yaml b/examples/hive/k8s/deploy/cluster/postgres.yaml new file mode 100644 index 00000000..487a404f --- /dev/null +++ b/examples/hive/k8s/deploy/cluster/postgres.yaml @@ -0,0 +1,51 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: metastoredb + namespace: drlmcdlf +spec: + type: "LoadBalancer" + ports: + - port: 5432 + selector: + app: database +--- +apiVersion: apps/v1beta1 +kind: StatefulSet +metadata: + name: database + namespace: drlmcdlf +spec: + replicas: 1 + selector: + matchLabels: + app: database + serviceName: "metastoredb" + template: + metadata: + labels: + app: database + spec: + containers: + - name: postgres + image: postgres:10 + imagePullPolicy: "IfNotPresent" + ports: + - containerPort: 5432 + envFrom: + - configMapRef: + name: postgres-config + volumeMounts: + - mountPath: /var/lib/postgresql/data + name: postgresdb + volumeClaimTemplates: + - metadata: + name: postgresdb + spec: + storageClassName: "managed-nfs-storage" + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi diff --git a/examples/hive/k8s/deploy/database-secret.yaml b/examples/hive/k8s/deploy/database-secret.yaml new file mode 100644 index 00000000..a4e33775 --- /dev/null +++ b/examples/hive/k8s/deploy/database-secret.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: postgres-secret + labels: + app: postgres +type: Opaque +stringData: + POSTGRES_DB: hive_metastore + POSTGRES_USER: postgres + POSTGRES_PASSWORD: sekritpasswd diff --git a/examples/hive/k8s/deploy/database.yaml b/examples/hive/k8s/deploy/database.yaml new file mode 100644 index 00000000..3745798e --- /dev/null +++ b/examples/hive/k8s/deploy/database.yaml @@ -0,0 +1,51 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: metastoredb +spec: + type: "LoadBalancer" + ports: + - port: 5432 + selector: + app: database +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: database +spec: + replicas: 1 + selector: + matchLabels: + app: database + serviceName: "metastore-db" + template: + metadata: + labels: + app: database + spec: + containers: + - name: postgres + image: postgres:10 + imagePullPolicy: "IfNotPresent" + ports: + - containerPort: 5432 + envFrom: + - secretRef: + name: postgres-secret + volumeMounts: + - mountPath: /var/lib/postgresql/data + name: postgresdb + volumeClaimTemplates: + - metadata: + name: postgresdb + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi + selector: + matchLabels: + type: local diff --git a/examples/hive/k8s/deploy/hivemetastore.yaml b/examples/hive/k8s/deploy/hivemetastore.yaml new file mode 100644 index 00000000..a165e194 --- /dev/null +++ b/examples/hive/k8s/deploy/hivemetastore.yaml @@ -0,0 +1,40 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: hivemetastore +spec: + type: "NodePort" + selector: + app: hivemetastore + ports: + - port: 9083 + targetPort: 9083 + name: metastore +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hivemetastore +spec: + replicas: 1 + selector: + matchLabels: + app: hivemetastore + template: + metadata: + labels: + app: hivemetastore + spec: + containers: + - name: hivemetastore-container + image: ${HIVEMETASTORE_IMAGE} + imagePullPolicy: "IfNotPresent" + envFrom: + - secretRef: + name: s3-secret + ports: + - containerPort: 9083 + name: metastore + imagePullSecrets: + - name: ${DOCKER_REGISTRY_SECRET} diff --git a/examples/hive/k8s/deploy/hiveserver.yaml b/examples/hive/k8s/deploy/hiveserver.yaml new file mode 100644 index 00000000..2e4f2655 --- /dev/null +++ b/examples/hive/k8s/deploy/hiveserver.yaml @@ -0,0 +1,45 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: hiveserver +spec: + type: "LoadBalancer" + selector: + app: hiveserver + ports: + - port: 80 + targetPort: 10002 + name: web-ui + - port: 10001 + targetPort: 10001 + name: cliservice +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hiveserver +spec: + replicas: 1 + selector: + matchLabels: + app: hiveserver + template: + metadata: + labels: + app: hiveserver + spec: + containers: + - name: hiveserver-container + image: ${HIVESERVER_IMAGE} + imagePullPolicy: "IfNotPresent" + envFrom: + - secretRef: + name: s3-secret + ports: + - containerPort: 10001 + name: cliservice + - containerPort: 10002 + name: web-ui + imagePullSecrets: + - name: ${DOCKER_REGISTRY_SECRET} diff --git a/examples/hive/k8s/deploy/postgres-volumes.yaml b/examples/hive/k8s/deploy/postgres-volumes.yaml new file mode 100644 index 00000000..9111bb98 --- /dev/null +++ b/examples/hive/k8s/deploy/postgres-volumes.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: hms-pv + labels: + type: local +spec: + accessModes: + - ReadWriteOnce + capacity: + storage: 5Gi + hostPath: + path: "/var/data" diff --git a/examples/hive/k8s/deploy/s3-secret.tmpl b/examples/hive/k8s/deploy/s3-secret.tmpl new file mode 100644 index 00000000..c0fdb178 --- /dev/null +++ b/examples/hive/k8s/deploy/s3-secret.tmpl @@ -0,0 +1,11 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: s3-secret +type: Opaque +stringData: + #Hadoop AWS looks for these environment variables + #while setting up Hive connector + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} diff --git a/examples/hive/k8s/deploy_hive.sh b/examples/hive/k8s/deploy_hive.sh new file mode 100644 index 00000000..27df01a2 --- /dev/null +++ b/examples/hive/k8s/deploy_hive.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +NOOBA_HOME="/full/path/to/nooba/installation" + +export S3_ENDPOINT=$(minikube service s3 --url | head -n1) + + +envsubst < conf/hive-site.tmpl | tee conf/hive-site.xml diff --git a/examples/hive/k8s/hiveserver-entrypoint.sh b/examples/hive/k8s/hiveserver-entrypoint.sh new file mode 100755 index 00000000..5fc44481 --- /dev/null +++ b/examples/hive/k8s/hiveserver-entrypoint.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +exec "$@" diff --git a/examples/hive/k8s/metastore-entrypoint.sh b/examples/hive/k8s/metastore-entrypoint.sh new file mode 100755 index 00000000..f9eab851 --- /dev/null +++ b/examples/hive/k8s/metastore-entrypoint.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +bin/hive --service schemaTool -dbType postgres -info 2> /dev/null + +if [ $? -ne 0 ]; then + bin/hive --service schemaTool -initSchema -dbType postgres -verbose +fi + +exec "$@" + + + diff --git a/examples/hive/k8s/test-hive.sh b/examples/hive/k8s/test-hive.sh new file mode 100755 index 00000000..a9011c97 --- /dev/null +++ b/examples/hive/k8s/test-hive.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +DATASET_OPERATOR_NAMESPACE=default + +HIVE_WEB_PORT=`kubectl get svc hiveserver -n ${DATASET_OPERATOR_NAMESPACE} -o jsonpath='{.spec.ports[?(@.name=="web-ui")].nodePort}'` +HIVE_WEB_IP=`minikube service hiveserver --url -n ${DATASET_OPERATOR_NAMESPACE} | awk -F':' -v port="$HIVE_WEB_PORT" '{if ($3 == port) print $2}' - | cut -d / -f 3` + +echo "You can open http://${HIVE_WEB_IP}:${HIVE_WEB_PORT} in a browser.." +echo "Testing using curl" +curl -sSIf http://${HIVE_WEB_IP}:${HIVE_WEB_PORT} | head -n1 diff --git a/examples/hive/sampleapp/README.md b/examples/hive/sampleapp/README.md new file mode 100644 index 00000000..7c80c35e --- /dev/null +++ b/examples/hive/sampleapp/README.md @@ -0,0 +1,43 @@ +## Example Application + +### On Minikube with Nooba +0. Examine the script `run_sample.sh` in the directory `examples/hive/sampleapp` +If you are using Nooba as your Object Storage provider as per the main installation, then set the value of `NOOBAA_HOME` +``` +$ unset S3_ENDPOINT +$ export NOOBAA_HOME="path/to/noobaa/installation" +``` +If you are using any other S3 provider, please make sure that the values of `S3_ENDPOINT`, `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` are set +Then, examine `run_sample.sh` and change `DOCKER_REGISTRY_COMPONENTS` to the registry used for installing hive and execute this script +``` +$ ./run_sample.sh +``` +This will create a dataset called `bookds`, mount it inside a pod `samplepod`, and display the CSV file + +### Environment Injection of Dataset Information + +1. Edit the sample pod YAML file by changing this line + +``` +8 dataset.0.useas: "mount" +``` + +to +``` +8 dataset.0.useas: "configmap" +``` + +2. Delete the old pod and recreate it + +``` +$ kubectl delete pod sampleapp +$ kubectl create -f samplepod.yaml +``` + +3. Check the environment variables in the pod + +``` +$ kubectl exec -it sampleapp env | grep -i bookds +``` + + diff --git a/examples/hive/sampleapp/bookdataset.yaml b/examples/hive/sampleapp/bookdataset.yaml new file mode 100644 index 00000000..89fc5ec6 --- /dev/null +++ b/examples/hive/sampleapp/bookdataset.yaml @@ -0,0 +1,15 @@ +--- +apiVersion: com.ie.ibm.hpsys/v1alpha1 +kind: Dataset +metadata: + name: bookds + namespace: default +spec: + remote: + type: "CatalogEntry" + table: "default/books" + endpoint: "${S3_ENDPOINT}" + accessKeyID: "${AWS_ACCESS_KEY_ID}" + secretAccessKey: "${AWS_SECRET_ACCESS_KEY}" + region: "eu-de" + mountAllowed: "true" diff --git a/examples/hive/sampleapp/books.csv b/examples/hive/sampleapp/books.csv new file mode 100644 index 00000000..6eba3e24 --- /dev/null +++ b/examples/hive/sampleapp/books.csv @@ -0,0 +1,11 @@ +id, title, author, year, isbn +1, Nineteen Eighty-Four, George Orwell, 1949, 451524934 +2, To Kill a Mockingbird, Harper Lee, 1960, 61120081 +3, The Kite Runner, Khaled Hosseini, 2003, 1594480001 +4, The Fellowship of the Ring, J.R.R. Tolkien, 1954, 618346252 +5, The Lovely Bones, Alice Sebold, 2002, 316166685 +6, The Hitchhiker's Guide to the Galaxy, Douglas Adams, 1979, 345391802 +7, The Shining, Stephen King, 1977, 450040186 +8, Ender's Game, Orson Scott Card, 1985, 812550706 +9, Sense and Sensibility, Jane Austen, 1811, 141439661 +10, The Picture of Dorian Gray, Oscar Wilde, 1891, 375751513 \ No newline at end of file diff --git a/examples/hive/sampleapp/run_sample.sh b/examples/hive/sampleapp/run_sample.sh new file mode 100755 index 00000000..06b08132 --- /dev/null +++ b/examples/hive/sampleapp/run_sample.sh @@ -0,0 +1,90 @@ +#!/bin/bash + + +DATASET_OPERATOR_NAMESPACE=default +DOCKER_REGISTRY_COMPONENTS=the_registry_to_use_for_components +HIVESERVER_IMAGE="hive-server:latest" + +function check_env(){ + echo "Checking if S3 connection variables are available" + if [[ -z "$S3_ENDPOINT" ]]; then + echo "Using Nooba for connection credentials" + if [[ -z "$NOOBAA_HOME" ]]; then + echo "Noobaa install cannot be found" + exit 1 + fi + export S3_ENDPOINT=$(minikube service s3 --url | head -n1) + export AWS_ACCESS_KEY_ID=$(${NOOBAA_HOME}/noobaa status 2>/dev/null | grep AWS_ACCESS_KEY_ID | awk -F ": " '{print $2}') + export AWS_SECRET_ACCESS_KEY=$(${NOOBAA_HOME}/noobaa status 2>/dev/null | grep AWS_SECRET_ACCESS_KEY | awk -F ": " '{print $2}') + fi +} + +function populate_hive(){ + echo "Populating Hive with the book table" + HIVE_CLI_PORT=`kubectl get svc hiveserver -n ${DATASET_OPERATOR_NAMESPACE} -o jsonpath='{.spec.ports[?(@.name=="cliservice")].nodePort}'` + HIVE_CLI_IP=`minikube service hiveserver --url -n ${DATASET_OPERATOR_NAMESPACE} | awk -F':' -v port="$HIVE_CLI_PORT" '{if ($3 == port) print $2}' - | cut -d / -f 3` + docker run -v ${PWD}:/sampleapp -it --network host ${DOCKER_REGISTRY_COMPONENTS}/${HIVESERVER_IMAGE} bin/beeline -u "jdbc:hive2://$HIVE_CLI_IP:$HIVE_CLI_PORT/;transportMode=http;httpPath=/cliservice" -f /sampleapp/sample.hql + if [ $? -eq 0 ] + then + echo "Hive successfully populated" + fi + +} + + +function build_awscli_image(){ + echo "Building image for S3 commands" + docker build -f ${NOOBAA_HOME}/Dockerfile-awscli-alpine -t awscli-alpine . > /dev/null 2>&1 + + if [ $? -eq 0 ] + then + echo "AWS image successfully built" + fi +} + +function create_s3_dataset(){ + echo "Creating S3 bucket and uploading data" + docker run --rm --network host \ + -e AWS_ACCESS_KEY_ID \ + -e AWS_SECRET_ACCESS_KEY \ + awscli-alpine \ + aws --endpoint ${S3_ENDPOINT} \ + s3 mb s3://book-test + + if [ $? -eq 0 ] + then + echo "Bucket book-test successfully created" + fi + + docker run --rm --network host \ + -e AWS_ACCESS_KEY_ID \ + -e AWS_SECRET_ACCESS_KEY \ + -v ${PWD}:/sampleapp \ + awscli-alpine \ + aws --endpoint ${S3_ENDPOINT} \ + s3 cp /sampleapp/books.csv s3://book-test/ + + if [ $? -eq 0 ] + then + echo "books.csv successfully uploaded" + fi +} + +function create_book_dataset(){ + echo "Creating the Book dataset object" + envsubst < bookdataset.yaml | kubectl apply -f - + kubectl apply -f samplepod.yaml + kubectl wait --for=condition=ready pods --all > /dev/null 2>&1 +} + +function test_book_dataset(){ + echo "Checking if the Book dataset is available" + kubectl exec -it sampleapp cat /mnt/datasets/bookds/books.csv +} + +check_env +build_awscli_image +create_s3_dataset +populate_hive +create_book_dataset +test_book_dataset diff --git a/examples/hive/sampleapp/sample.hql b/examples/hive/sampleapp/sample.hql new file mode 100644 index 00000000..897cb6ca --- /dev/null +++ b/examples/hive/sampleapp/sample.hql @@ -0,0 +1,5 @@ +DROP TABLE books; +CREATE EXTERNAL TABLE books (id int, title STRING, author STRING, year INT, isbn STRING) +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' +LOCATION 's3a://book-test/' +tblproperties ("skip.header.line.count"="1"); diff --git a/examples/hive/sampleapp/samplepod.yaml b/examples/hive/sampleapp/samplepod.yaml new file mode 100644 index 00000000..688b2192 --- /dev/null +++ b/examples/hive/sampleapp/samplepod.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: v1 +kind: Pod +metadata: + name: sampleapp + namespace: default + labels: + dataset.0.id: "bookds" + dataset.0.useas: "mount" +spec: + containers: + - name: nginx + image: nginx diff --git a/src/dataset-operator/deploy/operator.yaml b/src/dataset-operator/deploy/operator.yaml index b9328df0..d85b56a5 100644 --- a/src/dataset-operator/deploy/operator.yaml +++ b/src/dataset-operator/deploy/operator.yaml @@ -18,7 +18,7 @@ spec: # Replace this with the built image name image: ${DATASET_OPERATOR_IMAGE} command: - - dataset-operator + - dataset-operator imagePullPolicy: IfNotPresent ports: - containerPort: 8443 @@ -52,4 +52,4 @@ spec: name: dataset-operator ports: - port: 443 - targetPort: webhook-api \ No newline at end of file + targetPort: webhook-api diff --git a/src/dataset-operator/go.mod b/src/dataset-operator/go.mod index 3872d3a6..20766d40 100644 --- a/src/dataset-operator/go.mod +++ b/src/dataset-operator/go.mod @@ -1,14 +1,15 @@ module dataset-operator require ( - github.com/NYTimes/gziphandler v1.0.1 // indirect + github.com/akolb1/gometastore v0.0.0-20181012003105-a6ffbeed8c1a github.com/go-openapi/spec v0.19.0 - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/operator-framework/operator-sdk v0.0.0-20190802125515-bc5cbd4b9481 github.com/spf13/pflag v1.0.3 + github.com/srikumar003/gometastore v0.0.0-20200114133707-fed410b64f57 // indirect k8s.io/api v0.0.0-20190612125737-db0771252981 k8s.io/apimachinery v0.0.0-20190612125636-6a5db36e93ad k8s.io/client-go v11.0.0+incompatible + k8s.io/klog v0.3.1 k8s.io/kube-openapi v0.0.0-20190603182131-db7b694dc208 sigs.k8s.io/controller-runtime v0.1.12 sigs.k8s.io/controller-tools v0.1.10 @@ -31,4 +32,4 @@ replace ( replace github.com/operator-framework/operator-sdk => github.com/operator-framework/operator-sdk v0.10.0 -replace git.apache.org/thrift.git => github.com/apache/thrift v0.12.0 +go 1.13 diff --git a/src/dataset-operator/go.sum b/src/dataset-operator/go.sum index d4cb67b0..0ea797c5 100644 --- a/src/dataset-operator/go.sum +++ b/src/dataset-operator/go.sum @@ -7,10 +7,14 @@ cloud.google.com/go v0.37.2 h1:4y4L7BdHenTfZL0HervofNTHh9Ad6mNX72cQvl+5eH0= cloud.google.com/go v0.37.2/go.mod h1:H8IAquKe2L30IxoupDgqTaQvKSwF/c8prYHynGIWQbA= contrib.go.opencensus.io/exporter/ocagent v0.4.11 h1:Zwy9skaqR2igcEfSVYDuAsbpa33N0RPtnYTHEe2whPI= contrib.go.opencensus.io/exporter/ocagent v0.4.11/go.mod h1:7ihiYRbdcVfW4m4wlXi9WRPdv79C0fStcjNlyE6ek9s= +git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= +git.apache.org/thrift.git v0.12.0 h1:CMxsZlAmxKs+VAZMlDDL0wXciMblJcutQbEe3A9CYUM= +git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-autorest v11.1.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest v11.7.0+incompatible h1:gzma19dc9ejB75D90E5S+/wXouzpZyA+CV+/MJPSD/k= github.com/Azure/go-autorest v11.7.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/MakeNowJust/heredoc v0.0.0-20171113091838-e9091a26100e/go.mod h1:64YHyfSL2R96J44Nlwm39UHepQbyR5q10x7iYa1ks2E= github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= @@ -27,9 +31,12 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/akolb1/gometastore v0.0.0-20181012003105-a6ffbeed8c1a h1:MvGfv2bAaMGOX0KgFA7F+rlHTYyGhaX2LZug83qwz04= +github.com/akolb1/gometastore v0.0.0-20181012003105-a6ffbeed8c1a/go.mod h1:mdTAY/BIdq55FihYydBqwcXSd7QHl7oGSa+W7T/L88c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= +github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/appscode/jsonpatch v0.0.0-20190108182946-7c0e3b262f30 h1:Kn3rqvbUFqSepE2OqVu0Pn1CbDw9IuMlONapol0zuwk= github.com/appscode/jsonpatch v0.0.0-20190108182946-7c0e3b262f30/go.mod h1:4AJxUpXUhv4N+ziTvIcWWXgeorXpxPZOfk9HdEVr96M= @@ -89,6 +96,7 @@ github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= @@ -127,6 +135,7 @@ github.com/go-openapi/validate v0.17.2/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+ github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobuffalo/envy v1.6.5/go.mod h1:N+GkhhZ/93bGZc6ZKhJLP6+m+tCNPKwgSpH9kaifseQ= +github.com/gobuffalo/envy v1.6.15 h1:OsV5vOpHYUpP7ZLS6sem1y40/lNX1BZj+ynMiRi21lQ= github.com/gobuffalo/envy v1.6.15/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v0.0.0-20170330071051-c0656edd0d9e/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -205,6 +214,7 @@ github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -231,6 +241,7 @@ github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190403194419-1ea4449da983 h1:wL11wNW7dhKIcRCHSm4sHKPWz0tt4mwBsVodG7+Xyqg= github.com/mailru/easyjson v0.0.0-20190403194419-1ea4449da983/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/markbates/inflect v1.0.4 h1:5fh1gzTFhfae06u3hzHYO9xe3l3v3nW5Pwt3naLTP5g= github.com/markbates/inflect v1.0.4/go.mod h1:1fR9+pO2KHEO9ZRtto13gDwwZaAKstQzferVeWqbgNs= github.com/martinlindhe/base36 v0.0.0-20180729042928-5cda0030da17/go.mod h1:+AtEs8xrBpCeYgSLoY/aJ6Wf37jtBuR0s35750M27+8= github.com/mattbaird/jsonpatch v0.0.0-20171005235357-81af80346b1a/go.mod h1:M1qoD/MqPgTZIk0EWKB38wE28ACRfVcn+cU08jyArI0= @@ -312,6 +323,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.3.0 h1:RR9dF3JtopPvtkroDZuVD7qquD0bnHlKSqaQhgwt8yk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v0.0.0-20151117072312-300106c228d5/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sclevine/spec v1.0.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U= @@ -322,6 +334,7 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= @@ -331,6 +344,8 @@ github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzu github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/srikumar003/gometastore v0.0.0-20200114133707-fed410b64f57 h1:L3jOkCUZm0QGG6pbT84psxptb2K6oj9L98HLl1hlKQQ= +github.com/srikumar003/gometastore v0.0.0-20200114133707-fed410b64f57/go.mod h1:dNX/4QJaZIpby/HdAh7r8B9cxbqkSV5CCukCo4nKxFg= github.com/stevvooe/resumable v0.0.0-20180830230917-22b14a53ba50/go.mod h1:1pdIZTAHUz+HDKDVZ++5xg/duPlhKAIzw9qy42CWYp4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -446,6 +461,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190213015956-f7e1b50d2251/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190408170212-12dd9f86f350 h1:0USRhKWpISljvJE8egltEaoJb+VD0IUA4eOH6W1yss8= golang.org/x/tools v0.0.0-20190408170212-12dd9f86f350/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= @@ -457,6 +473,7 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -500,6 +517,7 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.0.0-20190222213804-5cb15d344471 h1:MzQGt8qWQCR+39kbYRd0uQqsvSidpYqJLFeWiJ9l4OE= k8s.io/api v0.0.0-20190222213804-5cb15d344471/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= +k8s.io/apiextensions-apiserver v0.0.0-20190228180357-d002e88f6236 h1:JfFtjaElBIgYKCWEtYQkcNrTpW+lMO4GJy8NP6SVQmM= k8s.io/apiextensions-apiserver v0.0.0-20190228180357-d002e88f6236/go.mod h1:IxkesAMoaCRoLrPJdZNZUQp9NfZnzqaVzLhb2VEQzXE= k8s.io/apimachinery v0.0.0-20190221213512-86fb29eff628 h1:UYfHH+KEF88OTg+GojQUwFTNxbxwmoktLwutUzR0GPg= k8s.io/apimachinery v0.0.0-20190221213512-86fb29eff628/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= @@ -512,6 +530,7 @@ k8s.io/code-generator v0.0.0-20181203235156-f8cba74510f3/go.mod h1:MYiN+ZJZ9HkET k8s.io/gengo v0.0.0-20181106084056-51747d6e00da/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20181113154421-fd15ee9cc2f7/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= +k8s.io/gengo v0.0.0-20190327210449-e17681d19d3a h1:QoHVuRquf80YZ+/bovwxoMO3Q/A3nt3yTgS0/0nejuk= k8s.io/gengo v0.0.0-20190327210449-e17681d19d3a/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/helm v2.13.1+incompatible/go.mod h1:LZzlS4LQBHfciFOurYBFkCMTaZ0D1l+p0teMg7TSULI= k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= @@ -532,6 +551,7 @@ k8s.io/kubernetes v1.11.8-beta.0.0.20190124204751-3a10094374f2/go.mod h1:ocZa8+6 k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= sigs.k8s.io/controller-runtime v0.1.12 h1:ovDq28E64PeY1yR+6H7DthakIC09soiDCrKvfP2tPYo= sigs.k8s.io/controller-runtime v0.1.12/go.mod h1:HFAYoOh6XMV+jKF1UjFwrknPbowfyHEHHRdJMf2jMX8= +sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde h1:ZkaHf5rNYzIB6CB82keKMQNv7xxkqT0ylOBdfJPfi+k= sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde/go.mod h1:ATWLRP3WGxuAN9HcT2LaKHReXIH+EZGzRuMHuxjXfhQ= sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI= sigs.k8s.io/testing_frameworks v0.1.1 h1:cP2l8fkA3O9vekpy5Ks8mmA0NW/F7yBdXf8brkWhVrs= diff --git a/src/dataset-operator/pkg/admissioncontroller/mutate.go b/src/dataset-operator/pkg/admissioncontroller/mutate.go index e1b85a37..6dbee578 100644 --- a/src/dataset-operator/pkg/admissioncontroller/mutate.go +++ b/src/dataset-operator/pkg/admissioncontroller/mutate.go @@ -46,12 +46,12 @@ func Mutate(body []byte) ([]byte, error) { datasetInfo := map[string]map[string]string{} for k, v := range pod.Labels { - fmt.Printf("key[%s] value[%s]\n", k, v) - if(strings.HasPrefix(k, prefixLabels)){ + log.Printf("key[%s] value[%s]\n", k, v) + if strings.HasPrefix(k, prefixLabels) { datasetNameArray := strings.Split(k, ".") - datasetId := strings.Join([]string{datasetNameArray[0],datasetNameArray[1]},".") - if _, ok := datasetInfo[datasetId]; ok==false { - datasetInfo[datasetId]=map[string]string{datasetNameArray[2]:v} + datasetId := strings.Join([]string{datasetNameArray[0], datasetNameArray[1]}, ".") + if _, ok := datasetInfo[datasetId]; ok == false { + datasetInfo[datasetId] = map[string]string{datasetNameArray[2]: v} } else { datasetInfo[datasetId][datasetNameArray[2]] = v } @@ -70,50 +70,113 @@ func Mutate(body []byte) ([]byte, error) { existing_volumes_id := len(pod.Spec.Volumes) datasets_tomount := []string{} + configs_toinject := []string{} p := []map[string]interface{}{} for k, v := range datasetInfo { - fmt.Printf("key[%s] value[%s]\n", k, v) - if(v["useas"]=="mount"){ + log.Printf("key[%s] value[%s]\n", k, v) + + //TODO: currently, the useas and dataset types are not cross-checked + //e.g. useas configmap is not applicable to NFS shares. + //There may be future dataset backends (e.g. SQL queries) that may + //not be able to be mounted. This logic needs to be revisited + switch v["useas"] { + case "mount": patch := map[string]interface{}{ - "op": "add", - "path": "/spec/volumes/"+fmt.Sprint(existing_volumes_id), - "value": map[string]interface{}{ - "name": v["id"], - "persistentVolumeClaim": map[string]string{"claimName": v["id"]}, - }, + "op": "add", + "path": "/spec/volumes/" + fmt.Sprint(existing_volumes_id), + } + patch["value"] = map[string]interface{}{ + "name": v["id"], + "persistentVolumeClaim": map[string]string{"claimName": v["id"]}, } datasets_tomount = append(datasets_tomount, v["id"]) p = append(p, patch) - existing_volumes_id+=1 + existing_volumes_id += 1 + case "configmap": + //by default, we will mount a config map inside the containers. + configs_toinject = append(configs_toinject, v["id"]) + default: + //this is an error + log.Printf("Error: The useas for this dataset is not recognized") } + } containers := pod.Spec.Containers - for container_idx,container := range containers{ + for container_idx, container := range containers { mounts := container.VolumeMounts mount_names := []string{} - for _, mount := range mounts{ + for _, mount := range mounts { mount_name := mount.Name - mount_names = append(mount_names,mount_name) + mount_names = append(mount_names, mount_name) } mount_idx := len(mounts) - for _, dataset_tomount := range datasets_tomount{ - exists, _ := in_array(dataset_tomount,mount_names) - if(exists == false){ + + for _, dataset_tomount := range datasets_tomount { + //TODO: Check if the dataset reference exists in the API server + exists, _ := in_array(dataset_tomount, mount_names) + if exists == false { patch := map[string]interface{}{ - "op": "add", - "path": "/spec/containers/"+fmt.Sprint(container_idx)+"/volumeMounts/"+fmt.Sprint(mount_idx), + "op": "add", + "path": "/spec/containers/" + fmt.Sprint(container_idx) + "/volumeMounts/" + fmt.Sprint(mount_idx), "value": map[string]interface{}{ - "name": dataset_tomount, - "mountPath": "/mnt/datasets/"+dataset_tomount, + "name": dataset_tomount, + "mountPath": "/mnt/datasets/" + dataset_tomount, }, } p = append(p, patch) mount_idx += 1 } } + + var values []interface{} + for _, config_toinject := range configs_toinject { + //TODO: Check if the configmap reference exists in the API server + + configmap_ref := map[string]interface{}{ + "prefix": config_toinject + "_", + "configMapRef": map[string]interface{}{ + "name": config_toinject, + }, + } + // We also have to inject the companion secret. We are using the convention followed + // in the controller where the names of the configmap and the secret are the same. + secret_ref := map[string]interface{}{ + "prefix": config_toinject + "_", + "secretRef": map[string]interface{}{ + "name": config_toinject, + }, + } + + values = append(values, configmap_ref) + values = append(values, secret_ref) + } + + if container.EnvFrom == nil || len(container.EnvFrom) == 0 { + // In this case, the envFrom path does not exist in the PodSpec. We are creating + // (initialising) this path with an array of configMapRef (RFC 6902) + log.Printf("there ") + patch := map[string]interface{}{ + "op": "add", + "path": "/spec/containers/" + fmt.Sprint(container_idx) + "/envFrom", + "value": values, + } + p = append(p, patch) + } else { + // In this case, the envFrom path does exist in the PodSpec. So, we just append to + // the existing array (Notice the path value) + for _, val := range values { + patch := map[string]interface{}{ + "op": "add", + "path": "/spec/containers/" + fmt.Sprint(container_idx) + "/envFrom/-", + "value": val, + } + p = append(p, patch) + } + } + } resp.Patch, err = json.Marshal(p) diff --git a/src/dataset-operator/pkg/apis/com/v1alpha1/dataset_types.go b/src/dataset-operator/pkg/apis/com/v1alpha1/dataset_types.go index f844f4c2..320c1078 100644 --- a/src/dataset-operator/pkg/apis/com/v1alpha1/dataset_types.go +++ b/src/dataset-operator/pkg/apis/com/v1alpha1/dataset_types.go @@ -14,7 +14,7 @@ type DatasetSpec struct { // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html // Conf map[string]string `json:"conf,omitempty"` - Local map[string]string `json:"local,omitempty"` + Local map[string]string `json:"local,omitempty"` Remote map[string]string `json:"remote,omitempty"` } diff --git a/src/dataset-operator/pkg/controller/dataset/dataset_controller.go b/src/dataset-operator/pkg/controller/dataset/dataset_controller.go index fd78a664..b01c916a 100644 --- a/src/dataset-operator/pkg/controller/dataset/dataset_controller.go +++ b/src/dataset-operator/pkg/controller/dataset/dataset_controller.go @@ -2,6 +2,8 @@ package dataset import ( "context" + "strconv" + comv1alpha1 "dataset-operator/pkg/apis/com/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -31,10 +33,10 @@ var log = logf.Log.WithName("controller_dataset") * Each function in the table should respect the following signature: * processLocalDatasetXYZ func(*comv1alpha1.Dataset, *ReconcileDataset) (reconcile.Result, error) */ -var datasetLocalProcessTable = map[string]func(*comv1alpha1.Dataset, - *ReconcileDataset) (reconcile.Result, error) { - "COS": processLocalDatasetCOS, - "NFS": processLocalDatasetNFS, +var datasetLocalProcessTable = map[string]func(*comv1alpha1.Dataset, + *ReconcileDataset) (reconcile.Result, error){ + "COS": processLocalDatasetCOS, + "NFS": processLocalDatasetNFS, } // Add creates a new Dataset Controller and adds it to the Manager. The Manager will set fields on the Controller @@ -97,37 +99,45 @@ func (r *ReconcileDataset) Reconcile(request reconcile.Request) (reconcile.Resul reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) reqLogger.Info("Reconciling Dataset") + result := reconcile.Result{} + var err error = nil + // Fetch the Dataset instance instance := &comv1alpha1.Dataset{} - err := r.client.Get(context.TODO(), request.NamespacedName, instance) + err = r.client.Get(context.TODO(), request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { // Request object not found, could have been deleted after reconcile request. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. // Return and don't requeue reqLogger.Info("Dataset is not found") - return reconcile.Result{}, nil + err = nil } // Error reading the object - requeue the request. - return reconcile.Result{}, err } + reqLogger.Info("All good, proceed") if instance.Spec.Local != nil { datasetType := instance.Spec.Local["type"] if f, ok := datasetLocalProcessTable[datasetType]; ok { - result, err := f(instance, r) - if err != nil { - return result, err - } + result, err = f(instance, r) } else { - reqLogger.Info("Dataset type %s not supported", datasetType) - err := errors.NewBadRequest("Dataset type not supported") - return reconcile.Result{}, err + reqLogger.Error(err, "Dataset type %s not supported", datasetType) + err = errors.NewBadRequest("Dataset type not supported") } + } else if instance.Spec.Remote != nil { + result, err = processRemoteDataset(instance, r) + if err != nil { + reqLogger.Error(err, "Could not process remote dataset entry: %v", instance.Name) + err = errors.NewBadRequest("Could not process remote dataset") + } + } else { + reqLogger.Info("Unknown spec entry") + err = errors.NewBadRequest("Dataset type not supported") } - return reconcile.Result{}, nil + return result, err } func processLocalDatasetCOS(cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reconcile.Result, error) { @@ -150,6 +160,18 @@ func processLocalDatasetCOS(cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reco labels := map[string]string{ "dataset": cr.Name, } + + configData := map[string]string{ + "endpoint": endpoint, + "bucket": bucket, + "region": region, + } + + if _, err := createConfigMapforDataset(configData, cr, rc); err != nil { + processLocalDatasetLogger.Error(err, "Could not create ConfigMap for dataset", "Dataset.Name", cr.Name) + return reconcile.Result{}, err + } + secretObj := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: cr.Name, @@ -231,27 +253,27 @@ func processLocalDatasetNFS(cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reco csiVolumeHandle := "data-id" csiVolumeAttributes := map[string]string{ "server": server, - "share": share, + "share": share, } pvSource := &corev1.CSIPersistentVolumeSource{ - Driver: csiDriverName, - VolumeHandle: csiVolumeHandle, + Driver: csiDriverName, + VolumeHandle: csiVolumeHandle, VolumeAttributes: csiVolumeAttributes, } - newPV := &corev1.PersistentVolume { + newPV := &corev1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: cr.Name, Namespace: cr.Namespace, Labels: labels, }, - Spec: corev1.PersistentVolumeSpec { + Spec: corev1.PersistentVolumeSpec{ StorageClassName: storageClassName, - AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany}, + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany}, Capacity: corev1.ResourceList{ corev1.ResourceStorage: resource.MustParse("5Gi"), //TODO: use proper size }, - PersistentVolumeSource: corev1.PersistentVolumeSource { + PersistentVolumeSource: corev1.PersistentVolumeSource{ CSI: pvSource, }, }, @@ -288,7 +310,7 @@ func processLocalDatasetNFS(cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reco }, }, StorageClassName: &storageClassName, - VolumeName: cr.Name, + VolumeName: cr.Name, }, } @@ -304,9 +326,275 @@ func processLocalDatasetNFS(cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reco if err != nil { return reconcile.Result{}, err } + } + return reconcile.Result{}, nil +} + +func processRemoteDataset(cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reconcile.Result, error) { + + processRemoteDatasetLogger := log.WithValues("Dataset.Namespace", cr.Namespace, "Dataset.Name", cr.Name, "Method", "processRemoteDataset") + result := reconcile.Result{} + var err error = nil + + entryType := cr.Spec.Remote["type"] + switch entryType { + case "CatalogEntry": + var catalogHost string + var catalogPort int + + catalogUri, ok := cr.Spec.Remote["catalogURI"] + if !ok { + processRemoteDatasetLogger.Info("no catalogURI provided in dataset spec.. now looking up cluster services", "Dataset.Name", cr.Name) + // Looking hivemetastore service endpoint in the cluster + //svcList := &corev1.ServiceList{} + + // Note: Below logic searches for a particular service implementation which requires a Fieldindexer to be + // initialised along with the controller. To make it easier, we are going to directly get the service + // endpoind using types.NamespacedName + //processRemoteDatasetLogger.Info("CatalogURI not provided, looking up catalog endpoint in the cluster") + //svcListOpts := client.MatchingField("metadata.name", "hivemetastore") + //svcListOpts = svcListOpts.InNamespace(cr.Namespace) + //err := rc.client.List(context.TODO(), svcListOpts, svcList) + + //We are only looking for the hivemetastore endpoint in the same namespace as the dataset being created + //TODO: Change this to be across all namespaces + catalogSvcName := "hivemetastore" + catalogSvcNamespace := cr.Namespace + svc := &corev1.Service{} + err = rc.client.Get(context.TODO(), types.NamespacedName{Name: catalogSvcName, Namespace: catalogSvcNamespace}, svc) + + if err != nil { + processRemoteDatasetLogger.Error(err, "Could not obtain any catalogs in the current cluster") + err = errors.NewBadRequest("no catalogURI provided") + return result, err + } else { + processRemoteDatasetLogger.Info("Endpoint", "name", svc) + catalogHost = svc.Spec.ClusterIP + for _, port := range svc.Spec.Ports { + processRemoteDatasetLogger.Info("Port", "name", svc) + if port.Name == "metastore" || + port.Name == "" { + catalogPort = int(port.Port) + } + } + + if catalogHost == "" { + processRemoteDatasetLogger.Error(nil, "no catalogURI provided.. cannot instantiate dataset") + err = errors.NewBadRequest("Catalog address was not found") + return result, err + } + } + } else { + catalogHost, catalogPort, err = parseCatalogUri(catalogUri) + if err != nil { + processRemoteDatasetLogger.Error(err, "Could not parse CatalogUri", "catalogURI", catalogUri) + err = errors.NewBadRequest("CatalogUri in the wrong format") + return result, err + } + } + //TODO: We expect this to change + table, ok := cr.Spec.Remote["table"] + if !ok { + processRemoteDatasetLogger.Error(nil, "no table provided for lookup") + err = errors.NewBadRequest("no table provided for lookup") + return result, err + } + //TODO: We expect that we'll get the endpoints differently + endpoint, ok := cr.Spec.Remote["endpoint"] + if !ok { + processRemoteDatasetLogger.Error(nil, "no endpoints provided for s3 buckets") + err = errors.NewBadRequest("no endpoints provided for s3 buckets") + return result, err + } + + var mountAllowed bool + + mountAllowedValue, ok := cr.Spec.Remote["mountAllowed"] + if !ok { + processRemoteDatasetLogger.Info("No mount allowed") + mountAllowed = false + } else { + var parseError error + mountAllowed, parseError = strconv.ParseBool(mountAllowedValue) + if parseError != nil { + mountAllowed = false + } + } + + var bukits []string + bukits, err = processCatalogEntry(catalogHost, int(catalogPort), table) + + if err != nil { + processRemoteDatasetLogger.Error(err, "Error in querying metastore", "catalogURI", catalogUri, "table", table) + err = errors.NewBadRequest("Could not query catalog at address: " + catalogUri) + return result, err + } else if len(bukits) == 0 { + processRemoteDatasetLogger.Error(nil, "0 records obtained from the catalog ", "catalogURI", catalogUri, "table", table) + err = errors.NewBadRequest("No records obtained from the catalog") + return result, err + } + + bucketData := make(map[string]string) + for i, bkt := range bukits { + bucketData["bucket."+strconv.Itoa(i)] = bkt + } + + // Create the config map anyway as we do not know if a pod will mount the bucket or opt for environment injection + + processRemoteDatasetLogger.Info("Creating a ConfigMap for the bucket data obtained for dataset", "datasetName", cr.Name, "tableName", table) + bucketData["catalogHost"] = catalogHost + bucketData["catalogPort"] = strconv.FormatInt(int64(catalogPort), 10) + bucketData["table"] = table + bucketData["numBuckets"] = strconv.Itoa(len(bukits)) + + if _, err := createConfigMapforDataset(bucketData, cr, rc); err != nil { + processRemoteDatasetLogger.Error(err, "Could not create ConfigMap for dataset", "dataset", cr.Name) + return reconcile.Result{}, errors.NewServiceUnavailable("Unable to initialise dataset") + } + + processRemoteDatasetLogger.Info("Creating a Secret for the bucket data obtain for table", "tableName", table) + labels := map[string]string{ + "dataset": cr.Name, + } + // We are creating this as a secret as it contains the access key and secret access key for Obj. Storage access + // We need this secret for both mounting and for environment variable injection + secretData := make(map[string]string) + secretData["endpoint"] = endpoint + secretData["accessKeyID"] = cr.Spec.Remote["accessKeyID"] + secretData["secretAccessKey"] = cr.Spec.Remote["secretAccessKey"] + secretData["region"] = cr.Spec.Remote["region"] + // To reduce duplication, we are entering the bucket information in this secret. This is so that csi-s3 + // can mount the right bucket in the pod. Be aware that this secret needs to be created for each + // bucket that has to be mounted in the pod. + // Currently, we are only supporting mounting a single bucket inside the pod. + secretData["bucket"] = bucketData["bucket.0"] + + secretObj := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: cr.Name, + Namespace: cr.Namespace, + Labels: labels, + }, + StringData: secretData, + } + + if err := controllerutil.SetControllerReference(cr, secretObj, rc.scheme); err != nil { + processRemoteDatasetLogger.Error(err, "Could not set secret object for dataset", "name", cr.Name) + return reconcile.Result{}, err + } + + found := &corev1.Secret{} + err := rc.client.Get(context.TODO(), types.NamespacedName{Name: secretObj.Name, Namespace: secretObj.Namespace}, found) + if err != nil && errors.IsNotFound(err) { + processRemoteDatasetLogger.Info("Creating new secrets", "Secret.Namespace", secretObj.Namespace, "Secret.Name", secretObj.Name) + err = rc.client.Create(context.TODO(), secretObj) + if err != nil { + return reconcile.Result{}, err + } + // Secrets created successfully - don't requeue + } else if err != nil { + return reconcile.Result{}, err + } + + //Supporting only a single location for the time being + if mountAllowed { + processRemoteDatasetLogger.Info("Creating a PVC for a single bucket", "bucketName", bucketData["bucket.0"]) + + if _, err := createPVCforObjectStorage(cr, rc); err != nil { + processRemoteDatasetLogger.Error(err, "Mounting of object storage bucket failed", "bucketName", bucketData["bucket.0"]) + return reconcile.Result{}, errors.NewServiceUnavailable("Unable to initialise dataset") + } + + } + + default: + err := errors.NewBadRequest("Unsupported dataset entry type") + return result, err + } + + return result, nil +} + +func createConfigMapforDataset(configMapData map[string]string, cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reconcile.Result, error) { + + createConfigMapLogger := log.WithValues("Dataset.Namespace", cr.Namespace, "Dataset.Name", cr.Name, "Method", "createConfigMapforObjectStorage") + result := reconcile.Result{} + var err error = nil + + labels := map[string]string{ + "dataset": cr.Name, + } + + configMapObject := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cr.Name, + Namespace: cr.Namespace, + Labels: labels, + }, + Data: configMapData, + } + + if err = controllerutil.SetControllerReference(cr, configMapObject, rc.scheme); err != nil { + return result, err + } + + foundConfigMap := &corev1.ConfigMap{} + err = rc.client.Get(context.TODO(), types.NamespacedName{Name: configMapObject.Name, Namespace: configMapObject.Namespace}, foundConfigMap) + + if err != nil && errors.IsNotFound(err) { + createConfigMapLogger.Info("Creating new configMap", "configMap.namespace", + configMapObject.Namespace, "configMap.Name", configMapObject.Name) + err = rc.client.Create(context.TODO(), configMapObject) + if err != nil { + return result, err + } } else if err != nil { - return reconcile.Result{}, err + return result, err } - return reconcile.Result{}, nil -} \ No newline at end of file + return result, err +} + +func createPVCforObjectStorage(cr *comv1alpha1.Dataset, rc *ReconcileDataset) (reconcile.Result, error) { + + createPVCLogger := log.WithValues("Dataset.Namespace", cr.Namespace, "Dataset.Name", cr.Name, "Method", "createPVCforObjectStorage") + result := reconcile.Result{} + var err error = nil + + storageClassName := "csi-s3" + + labels := map[string]string{ + "dataset": cr.Name, + } + + newPVC := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: cr.Name, + Namespace: cr.Namespace, + Labels: labels, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("5Gi"), + }, + }, + StorageClassName: &storageClassName, + }, + } + + if err = controllerutil.SetControllerReference(cr, newPVC, rc.scheme); err == nil { + + foundPVC := &corev1.PersistentVolumeClaim{} + err = rc.client.Get(context.TODO(), types.NamespacedName{Name: newPVC.Name, Namespace: newPVC.Namespace}, foundPVC) + if err != nil && errors.IsNotFound(err) { + //PVC not created - requeue + createPVCLogger.Info("Creating new pvc", "PVC.Namespace", newPVC.Namespace, "PVC.Name", newPVC.Name) + err = rc.client.Create(context.TODO(), newPVC) + } + } + + return result, err + +} diff --git a/src/dataset-operator/pkg/controller/dataset/metastore_client.go b/src/dataset-operator/pkg/controller/dataset/metastore_client.go new file mode 100644 index 00000000..f867efbd --- /dev/null +++ b/src/dataset-operator/pkg/controller/dataset/metastore_client.go @@ -0,0 +1,146 @@ +package dataset + +import ( + "errors" + "net/url" + "strconv" + "strings" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + + "github.com/akolb1/gometastore/hmsclient" +) + +var catalogLogger = logf.Log.WithName("metastore_client") + +func parseCatalogUri(catalogUri string) (string, int, error) { + catHostPort := strings.Split(catalogUri, ":") + + //if no port is given, assume standard Hive Metastore Port of 9083 + var catHost string + var catPort int + if len(catHostPort) == 1 { + catHost = catHostPort[0] + catPort = 9083 + } else if len(catHostPort) == 2 { + catHost = catHostPort[0] + catPort, _ = strconv.Atoi(catHostPort[1]) + } else { + catalogLogger.Error(nil, "CatalogURI cannot be parsed.. quitting") + return "", 0, k8serrors.NewInternalError(errors.New("CatalogURI cannot be parsed.. quitting")) + } + + return catHost, catPort, nil + +} + +func processCatalogEntry(catHost string, catPort int, table string) ([]string, error) { + + catalogLogger.Info("Catalog Host : %s, Catalog Port: %d", catHost, catPort) + + hiveclient, err := hmsclient.Open(catHost, catPort) + if err != nil { + catalogLogger.Error(err, "could not open connection to metastore") + return nil, k8serrors.NewInternalError(errors.New("Cannot connect to the metastore : " + catHost)) + } + defer hiveclient.Close() + + var dbName, tableName string + //We are assuming that the table entry will be in the form / + catDBTable := strings.Split(table, "/") + //If there is no / in the table input, we'll assume that the database name is 'default' + if len(catDBTable) == 1 { + dbName = "default" + tableName = catDBTable[0] + } else if len(catDBTable) == 2 { + dbName = catDBTable[0] + tableName = catDBTable[1] + } else { + catalogLogger.Error(nil, "Table name cannot be parsed..") + return nil, k8serrors.NewBadRequest("Table name is in incorrect format ") + } + + catalogLogger.Info("looking up " + dbName + "/" + tableName) + + hiveTbl, err := hiveclient.GetTable(dbName, tableName) + + if err != nil || hiveTbl == nil { + catalogLogger.Error(err, "Table could not be found in the catalog ") + return nil, k8serrors.NewBadRequest("Table could not be found in the catalog") + } + + catalogLogger.Info("finding partitions for " + dbName + "/" + tableName) + + //256 is a magic number but is the number of partitions you want to get from the table + tblPartNames, err := hiveclient.GetPartitionNames(dbName, tableName, 256) + + if err != nil { + //Something could go wrong, we don't know what it is now + catalogLogger.Error(err, "Could not lookup partition names for "+dbName+"/"+tableName) + //Let's return a nil for now + return nil, k8serrors.NewInternalError(errors.New("Table partitions could not be retrieved")) + } + + var locations []string + if len(tblPartNames) == 0 { + catalogLogger.Info("Table " + dbName + "/" + tableName + " is not partitioned") + //Get the location from the table properties, if present. + + table, _ := hiveclient.GetTable(dbName, tableName) + loc, err := url.Parse(table.GetSd().GetLocation()) + + catalogLogger.Info("Got a location: ", "location", loc) + + var proto, bucket string + if err != nil { + proto = "" + bucket = table.GetSd().GetLocation() + } else { + proto = loc.Scheme + } + if strings.HasPrefix(proto, "s3") { + + bucket = getBucketFromS3Uri(loc) + locations = append(locations, bucket) + } + } else { + tablePartitions, err := hiveclient.GetPartitionsByNames(dbName, tableName, tblPartNames) + if err != nil { + //Something reaaaally went wromg + catalogLogger.Error(err, "Could not look up Partition info for "+dbName+"/"+tableName) + return nil, k8serrors.NewBadRequest("Could not look up Partition info") + } + for _, part := range tablePartitions { + + loc, err := url.Parse(part.GetSd().GetLocation()) + catalogLogger.Info("Got a location: ", "location", loc) + + var proto, bucket string + if err != nil { + proto = "" + bucket = part.GetSd().GetLocation() + } else { + proto = loc.Scheme + bucket = getBucketFromS3Uri(loc) + } + if strings.HasPrefix(proto, "s3") { + + locations = append(locations, bucket) + } + + } + + } + + return locations, nil +} + +func getBucketFromS3Uri(loc *url.URL) string { + + bucket := strings.Join([]string{loc.Host, loc.Path}, "") + if strings.HasSuffix(bucket, "/") { + bucket = strings.TrimRight(bucket, "/") + } + return bucket +}