Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] add 1.12 ; remove 1.10 #9249

Merged
merged 1 commit into from
Jan 28, 2021
Merged

Conversation

rmetzger
Copy link
Contributor

@rmetzger rmetzger commented Dec 8, 2020

This might be a replacement for #9243

@dianfu
Copy link
Contributor

dianfu commented Dec 9, 2020

cc @yosifkit @tianon

@yosifkit
Copy link
Member

args by reference seems a bit weird, but is harmless. (since it is a global value, it could be accessed by the function if declared earlier.) No change necessary.


This inserted bash -c may be a non-intuitive interface. I'm not entirely certain it needs to change, just ensuring that it makes sense for users.

+    exec $(drop_privs_cmd) bash -c "${args[@]}"
$ # current
$ docker run -it flink native-k8s 'some-command --with --its arguments && other stuff'
$ # missing quotes would mean the arguments to "some-command" would be interpreted as arguments to "bash"
$ docker run -it flink native-k8s some-command --with --its arguments

$ # vs if you drop the bash -c
$ docker run -it flink native-k8s some-command --with --its arguments
$ docker run -it flink native-k8s bash -c 'some-command --with --its arguments && other stuff'

@rmetzger
Copy link
Contributor Author

Thank you for your review! I agree that the bash -c invocation is potentially confusing. However, as far as I understand, the native-k8s command is not meant to by used directly by humans. It's used by Flink's Kubernetes integration, where a java process generates the K8s deployments for starting Flink. This Java process also generates the arguments for launching the docker containers. @wangyang0918 correct me if I'm wrong here.

@wangyang0918
Copy link

@tianon @rmetzger Yes, the native-k8s is not meant to be used directly. The Flink client will generate the full command and then it is executed in the docker-entrypoint.sh. The command and arguments may be a little complex and looks like followings.

  Containers:
  flink-job-manager:
    Container ID:  docker://b02897603fdf7e74d94ffa928727ea594e6247e6cdf9adf9a393c957350fcda2
    Image:         registry.cn-beijing.aliyuncs.com/streamcompute/flink:flink-1.12.0
    Image ID:      docker-pullable://registry.cn-beijing.aliyuncs.com/streamcompute/flink@sha256:7e8985b0c1250f62c5b3e5fb9d13c5ef287241fc24af2a84902e37ebafd018ea
    Ports:         8081/TCP, 6123/TCP, 6124/TCP
    Host Ports:    0/TCP, 0/TCP, 0/TCP
    Command:
      /docker-entrypoint.sh
    Args:
      native-k8s
      $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx654311424 -Xms654311424 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=654311424b -D jobmanager.memory.jvm-overhead.max=201326592b

@yosifkit
Copy link
Member

Extra [script functions] that are only for a specific vendor platform (Amazon, Azure, Google Cloud, OpenShift, OpenStack, etc) breaks the whole "run anywhere" Docker experience. Images in Official-Images should be platform independent. It is up to image maintainers whether or not to adjust their images to better support the specific needs of a platform in a general way (e.g. docker-library/postgres#359 + docker-library/postgres#448).

\ -#7281 (comment)

Seems a little strange for the Flink client to generate a command reliant on interpreted env vars, shouldn't it just generate some args for java and let the entrypoint just run java with the correct classpath? This would be especially relevant if any arguments ever have a path that includes spaces. And what about non-kubernetes users, would they not also want to use these calculated memory values and other custom flags? In other words, why have a just-for-kubernetes part of the entrypoint?

@wangyang0918
Copy link

wangyang0918 commented Dec 15, 2020

@yosifkit Thanks for your comments and suggestions.

Just as you said, we could move the $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH into the docker-entrypoint.sh and only generate the jvm args in Flink. Then exec $(drop_privs_cmd) bash -c "${args[@]}" could be exec $(drop_privs_cmd) $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH "${args[@]}". And the native-k8s command should be renamed to generic-java.

Another alternative is we could rename the native-k8s to generic, which means we could run any commands with the Flink environment already set(e.g. FLINK_CLASSPATH, etc.). Also I will remove the bach -c wrapper. I prefer this option.

BTW, could we defer this improvement in the following release version(Flink 1.12.1)? Both the two solutions require us to update the Flink source codes, but we have released the Flink 1.12.0 and only one step left(publishing the image to dockerhub).

@rogaha
Copy link

rogaha commented Dec 18, 2020

thanks @wangyang0918 for submitting the PR. @tianon thanks for keeping up with official images support.

@wangyang0918
Copy link

wangyang0918 commented Dec 18, 2020

Hi @rogaha @tianon @yosifkit, do you think the new docker-entrypoint.sh in apache/flink-docker#49 make sense? We will mark native-k8s as deprecated and do the preparatory work for pass-through mode in docker-entrypoint.sh. And native-k8s will be removed in next Flink major release(1.13).

xintongsong added a commit to xintongsong/flink-docker that referenced this pull request Dec 23, 2020
Deprecate command `native-k8` and add preparation for pass-through mode.

This is to address the comments in the following docker-library/official-images PR.
docker-library/official-images#9249
xintongsong added a commit to apache/flink-docker that referenced this pull request Dec 23, 2020
Deprecate command `native-k8` and add preparation for pass-through mode.

This is to address the comments in the following docker-library/official-images PR.
docker-library/official-images#9249

This closes #50
@xintongsong
Copy link
Contributor

Thanks all for the efforts so far.
Since @rmetzger is out-of-office, I opened #9345 as a replacement of this one.

@tianon
Copy link
Member

tianon commented Jan 8, 2021

(Commenting here instead of on #9345 because this is where the previous discussion is and I'd really appreciate if we didn't splinter it.)

I've given this more thought, and figured out what's got me so confused/conflicted.

I see that there exist a lot of (useful looking) scripts in https://github.com/apache/flink/tree/2eaee8344bdcab961799127c7e72d3f4abe75726/flink-dist/src/main/flink-bin that I can only assume exist in this directory because they are part of Flink releases, and are how users are expected to use Flink directly (outside containers), correct?

My main concern (and confusion) here is that we have a separate custom script that's specific to the Dockerization, and it keeps growing, and it feels like it's really duplicating the efforts that exist there. If this had been implemented as part of those scripts and been part of the Flink release artifacts, we wouldn't have even noticed their addition, their usage would still fall under the "consistent interface" guidelines we are looking for, and we wouldn't still be sitting here with a PR that both adds and deprecates a feature in the same new release, which feels really off.

@zentol
Copy link
Contributor

zentol commented Jan 8, 2021

@tianon Your assessment is somewhat correct.

All deployments of Flink makes use of the the flink-bin scripts in some form; for YARN or non-containerized setups users are directly using the contained scripts to create a cluster.

For docker deployments, users work against the interface provided by the docker entrypoint, which ultimately just wraps the flink-bin scripts which just add some additional docker-specific logic.

Finally, the native kubernetes integration has users interact with the flink-bin scripts to issue the creation of a cluster, where Flink requests containers and generates a command to be executed within the container through the docker-entrypoint.sh.

In other words the Flink docker image is used in 2 ways; directly by users and in some essentially-internal fashion where we're just passing a command through.

Re deprecating a new feature: The annoying reality is that the 1.12.0 code expects the (now deprecated) command to exists, and we would like 1.12.0 clients to continue working with 1.12.1 images. (The used images are not hard-wired to a specific release)

If I understand you correctly, then it would be fine for us to move everything in the docker-entrypoint.sh into a new scripts within distribution, and just call that scripts from docker-entrypoint.sh with all passed arguments. We had the same idea on our side, and would probably be fine with doing that starting from 1.13.

@tianon
Copy link
Member

tianon commented Jan 8, 2021

Re deprecating a new feature: The annoying reality is that the 1.12.0 code expects the (now deprecated) command to exists, and we would like 1.12.0 clients to continue working with 1.12.1 images. (The used images are not hard-wired to a specific release)

If I understand you correctly, then it would be fine for us to move everything in the docker-entrypoint.sh into a new scripts within distribution, and just call that scripts from docker-entrypoint.sh with all passed arguments. We had the same idea on our side, and would probably be fine with doing that starting from 1.13.

Yes, I'd love to see this (anything that decreases the delta between a "standard" Flink install and a "Docker" Flink install is a big win IMO) -- the part about this particular instance that feels so off is that this feature appears to have been worked on for months with the intention of adding this additional code to the bits of the image we review (which is one of the core purposes of this program), but we weren't looped into the conversation until it was already too late to discuss it, which is why we're having this conversation. 😬 😄

@wangyang0918
Copy link

@tianon
We could make the docker-entrypoint.sh in flink-docker project as thin as possible and move the most functionalities into flink distribution. Of cause, the native K8s mode will have a dedicated entrypoint kubernetes-entrypoint.sh in flink distribution, which will export some environment variables for internal use. Then we will not need to do it in the common docker-entrypoint.sh. But I think we could only do this for next major release(1.13).

How could we move the Flink 1.12.* official images publishing forward? Is marking the native-k8s command deprecated enough? We have already done this[1].

[1]. https://github.com/apache/flink-docker/blob/master/1.12/scala_2.11-java8-debian/docker-entrypoint.sh#L22

@zentol
Copy link
Contributor

zentol commented Jan 13, 2021

Of cause, the native K8s mode will have a dedicated entrypoint kubernetes-entrypoint.sh in flink distribution.

This is TBD. There may be other ways to solve this, but let's not have this discussion here.


Tags: 1.10.2-scala_2.11, 1.10-scala_2.11
Tags: 1.12.0-scala_2.12-java8, 1.12-scala_2.12-java8, scala_2.12-java8, 1.12.0-scala_2.12, 1.12-scala_2.12, scala_2.12, 1.12.0-java8, 1.12-java8, java8, 1.12.0, 1.12, latest

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to allow java8 images?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java 8 is the lowest version required and the maven target version for Flink, and is AFAIK used by the majority of Flink developers and users.

@github-actions
Copy link

Diff for 4bb82a9:
diff --git a/_bashbrew-cat b/_bashbrew-cat
index 2817ce3..c02207a 100644
--- a/_bashbrew-cat
+++ b/_bashbrew-cat
@@ -1,14 +1,6 @@
 Maintainers: Patrick Lucas <me@patricklucas.com> (@patricklucas), Ismaël Mejía <iemejia@gmail.com> (@iemejia)
 GitRepo: https://github.com/apache/flink-docker.git
 
-Tags: 1.10.2-scala_2.11, 1.10-scala_2.11
-GitCommit: 58a29fca7c6ff05999fad4371638d16335f7e93e
-Directory: 1.10/scala_2.11-debian
-
-Tags: 1.10.2-scala_2.12, 1.10-scala_2.12, 1.10.2, 1.10
-GitCommit: 58a29fca7c6ff05999fad4371638d16335f7e93e
-Directory: 1.10/scala_2.12-debian
-
 Tags: 1.11.3-scala_2.11-java8, 1.11-scala_2.11-java8, 1.11.3-scala_2.11, 1.11-scala_2.11
 GitCommit: 7035f03679b11352f2fdecd9f6a9bb0ec8bc2022
 Directory: 1.11/scala_2.11-java8-debian
@@ -24,3 +16,19 @@ Directory: 1.11/scala_2.12-java8-debian
 Tags: 1.11.3-scala_2.12-java11, 1.11-scala_2.12-java11, 1.11.3-java11, 1.11-java11
 GitCommit: 7035f03679b11352f2fdecd9f6a9bb0ec8bc2022
 Directory: 1.11/scala_2.12-java11-debian
+
+Tags: 1.12.0-scala_2.11-java8, 1.12-scala_2.11-java8, scala_2.11-java8, 1.12.0-scala_2.11, 1.12-scala_2.11, scala_2.11
+GitCommit: 4d7cb3068d670bef274975c9412d5e40317fba3f
+Directory: 1.12/scala_2.11-java8-debian
+
+Tags: 1.12.0-scala_2.11-java11, 1.12-scala_2.11-java11, scala_2.11-java11
+GitCommit: 4d7cb3068d670bef274975c9412d5e40317fba3f
+Directory: 1.12/scala_2.11-java11-debian
+
+Tags: 1.12.0-scala_2.12-java8, 1.12-scala_2.12-java8, scala_2.12-java8, 1.12.0-scala_2.12, 1.12-scala_2.12, scala_2.12, 1.12.0-java8, 1.12-java8, java8, 1.12.0, 1.12, latest
+GitCommit: 4d7cb3068d670bef274975c9412d5e40317fba3f
+Directory: 1.12/scala_2.12-java8-debian
+
+Tags: 1.12.0-scala_2.12-java11, 1.12-scala_2.12-java11, scala_2.12-java11, 1.12.0-java11, 1.12-java11, java11
+GitCommit: 4d7cb3068d670bef274975c9412d5e40317fba3f
+Directory: 1.12/scala_2.12-java11-debian
diff --git a/_bashbrew-list b/_bashbrew-list
index f4a0459..9fe01b8 100644
--- a/_bashbrew-list
+++ b/_bashbrew-list
@@ -1,9 +1,3 @@
-flink:1.10
-flink:1.10-scala_2.11
-flink:1.10-scala_2.12
-flink:1.10.2
-flink:1.10.2-scala_2.11
-flink:1.10.2-scala_2.12
 flink:1.11
 flink:1.11-java8
 flink:1.11-java11
@@ -22,3 +16,30 @@ flink:1.11.3-scala_2.11-java11
 flink:1.11.3-scala_2.12
 flink:1.11.3-scala_2.12-java8
 flink:1.11.3-scala_2.12-java11
+flink:1.12
+flink:1.12-java8
+flink:1.12-java11
+flink:1.12-scala_2.11
+flink:1.12-scala_2.11-java8
+flink:1.12-scala_2.11-java11
+flink:1.12-scala_2.12
+flink:1.12-scala_2.12-java8
+flink:1.12-scala_2.12-java11
+flink:1.12.0
+flink:1.12.0-java8
+flink:1.12.0-java11
+flink:1.12.0-scala_2.11
+flink:1.12.0-scala_2.11-java8
+flink:1.12.0-scala_2.11-java11
+flink:1.12.0-scala_2.12
+flink:1.12.0-scala_2.12-java8
+flink:1.12.0-scala_2.12-java11
+flink:java8
+flink:java11
+flink:latest
+flink:scala_2.11
+flink:scala_2.11-java8
+flink:scala_2.11-java11
+flink:scala_2.12
+flink:scala_2.12-java8
+flink:scala_2.12-java11
diff --git a/flink_1.11-java11/Dockerfile b/flink_java11/Dockerfile
similarity index 90%
copy from flink_1.11-java11/Dockerfile
copy to flink_java11/Dockerfile
index 8b27b61..1a6e48a 100644
--- a/flink_1.11-java11/Dockerfile
+++ b/flink_java11/Dockerfile
@@ -21,7 +21,7 @@ FROM openjdk:11-jre
 # Install dependencies
 RUN set -ex; \
   apt-get update; \
-  apt-get -y install libsnappy1v5 gettext-base; \
+  apt-get -y install libsnappy1v5 gettext-base libjemalloc-dev; \
   rm -rf /var/lib/apt/lists/*
 
 # Grab gosu for easy step-down from root
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.11.3/flink-1.11.3-bin-scala_2.12.tgz \
-    FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.11.3/flink-1.11.3-bin-scala_2.12.tgz.asc \
-    GPG_KEY=F8E419AA0B60C28879E876859DFF40967ABFC5A4 \
+ENV FLINK_TGZ_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz \
+    FLINK_ASC_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz.asc \
+    GPG_KEY=43CE299BC305AFF8B912AA95183F6944D9839159 \
     CHECK_GPG=true
 
 # Prepare environment
diff --git a/flink_1.10/docker-entrypoint.sh b/flink_java11/docker-entrypoint.sh
similarity index 65%
copy from flink_1.10/docker-entrypoint.sh
copy to flink_java11/docker-entrypoint.sh
index 6084f06..275488e 100755
--- a/flink_1.10/docker-entrypoint.sh
+++ b/flink_java11/docker-entrypoint.sh
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/usr/bin/env bash
 
 ###############################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +19,9 @@
 ###############################################################################
 
 COMMAND_STANDALONE="standalone-job"
+# Deprecated, should be remove in Flink release 1.13
+COMMAND_NATIVE_KUBERNETES="native-k8s"
+COMMAND_HISTORY_SERVER="history-server"
 
 # If unspecified, the hostname of the container is taken as the JobManager address
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
@@ -73,17 +76,13 @@ set_config_option() {
   fi
 }
 
-set_common_options() {
+prepare_configuration() {
     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
     set_config_option blob.server.port 6124
     set_config_option query.server.port 6125
-}
-
-prepare_job_manager_start() {
-    echo "Starting Job Manager"
-    copy_plugins_if_required
 
-    set_common_options
+    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-1}
+    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
 
     if [ -n "${FLINK_PROPERTIES}" ]; then
         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
@@ -91,35 +90,65 @@ prepare_job_manager_start() {
     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
 }
 
+maybe_enable_jemalloc() {
+    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
+        export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
+    fi
+}
+
+maybe_enable_jemalloc
+
+copy_plugins_if_required
+
+prepare_configuration
+
+args=("$@")
 if [ "$1" = "help" ]; then
-    echo "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|help)"
+    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
+    printf "    Or $(basename "$0") help\n\n"
+    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
     exit 0
 elif [ "$1" = "jobmanager" ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
+
+    echo "Starting Job Manager"
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
 elif [ "$1" = ${COMMAND_STANDALONE} ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "$@"
+    echo "Starting Job Manager"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
+elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
+    args=("${args[@]:1}")
+
+    echo "Starting History Server"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
 elif [ "$1" = "taskmanager" ]; then
-    shift 1
+    args=("${args[@]:1}")
+
     echo "Starting Task Manager"
-    copy_plugins_if_required
 
-    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
+elif [ "$1" = "$COMMAND_NATIVE_KUBERNETES" ]; then
+    args=("${args[@]:1}")
 
-    set_common_options
-    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
+    export _FLINK_HOME_DETERMINED=true
+    . $FLINK_HOME/bin/config.sh
+    export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
+    # Start commands for jobmanager and taskmanager are generated by Flink internally.
+    echo "Start command: ${args[@]}"
+    exec $(drop_privs_cmd) bash -c "${args[@]}"
+fi
 
-    if [ -n "${FLINK_PROPERTIES}" ]; then
-        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
-    fi
-    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
+args=("${args[@]}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "$@"
-fi
+# Set the Flink related environments
+export _FLINK_HOME_DETERMINED=true
+. $FLINK_HOME/bin/config.sh
+export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
 
-exec "$@"
+# Running command in pass-through mode
+exec $(drop_privs_cmd) "${args[@]}"
diff --git a/flink_1.10/Dockerfile b/flink_latest/Dockerfile
similarity index 90%
rename from flink_1.10/Dockerfile
rename to flink_latest/Dockerfile
index 29afb1a..48164fd 100644
--- a/flink_1.10/Dockerfile
+++ b/flink_latest/Dockerfile
@@ -21,7 +21,7 @@ FROM openjdk:8-jre
 # Install dependencies
 RUN set -ex; \
   apt-get update; \
-  apt-get -y install libsnappy1v5 gettext-base; \
+  apt-get -y install libsnappy1v5 gettext-base libjemalloc-dev; \
   rm -rf /var/lib/apt/lists/*
 
 # Grab gosu for easy step-down from root
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.10.2/flink-1.10.2-bin-scala_2.12.tgz \
-    FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.10.2/flink-1.10.2-bin-scala_2.12.tgz.asc \
-    GPG_KEY=C63E230EFFF519A5BBF2C9AE6767487CD505859C \
+ENV FLINK_TGZ_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz \
+    FLINK_ASC_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz.asc \
+    GPG_KEY=43CE299BC305AFF8B912AA95183F6944D9839159 \
     CHECK_GPG=true
 
 # Prepare environment
diff --git a/flink_1.10-scala_2.11/docker-entrypoint.sh b/flink_latest/docker-entrypoint.sh
similarity index 65%
rename from flink_1.10-scala_2.11/docker-entrypoint.sh
rename to flink_latest/docker-entrypoint.sh
index 6084f06..275488e 100755
--- a/flink_1.10-scala_2.11/docker-entrypoint.sh
+++ b/flink_latest/docker-entrypoint.sh
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/usr/bin/env bash
 
 ###############################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +19,9 @@
 ###############################################################################
 
 COMMAND_STANDALONE="standalone-job"
+# Deprecated, should be remove in Flink release 1.13
+COMMAND_NATIVE_KUBERNETES="native-k8s"
+COMMAND_HISTORY_SERVER="history-server"
 
 # If unspecified, the hostname of the container is taken as the JobManager address
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
@@ -73,17 +76,13 @@ set_config_option() {
   fi
 }
 
-set_common_options() {
+prepare_configuration() {
     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
     set_config_option blob.server.port 6124
     set_config_option query.server.port 6125
-}
-
-prepare_job_manager_start() {
-    echo "Starting Job Manager"
-    copy_plugins_if_required
 
-    set_common_options
+    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-1}
+    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
 
     if [ -n "${FLINK_PROPERTIES}" ]; then
         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
@@ -91,35 +90,65 @@ prepare_job_manager_start() {
     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
 }
 
+maybe_enable_jemalloc() {
+    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
+        export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
+    fi
+}
+
+maybe_enable_jemalloc
+
+copy_plugins_if_required
+
+prepare_configuration
+
+args=("$@")
 if [ "$1" = "help" ]; then
-    echo "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|help)"
+    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
+    printf "    Or $(basename "$0") help\n\n"
+    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
     exit 0
 elif [ "$1" = "jobmanager" ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
+
+    echo "Starting Job Manager"
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
 elif [ "$1" = ${COMMAND_STANDALONE} ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "$@"
+    echo "Starting Job Manager"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
+elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
+    args=("${args[@]:1}")
+
+    echo "Starting History Server"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
 elif [ "$1" = "taskmanager" ]; then
-    shift 1
+    args=("${args[@]:1}")
+
     echo "Starting Task Manager"
-    copy_plugins_if_required
 
-    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
+elif [ "$1" = "$COMMAND_NATIVE_KUBERNETES" ]; then
+    args=("${args[@]:1}")
 
-    set_common_options
-    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
+    export _FLINK_HOME_DETERMINED=true
+    . $FLINK_HOME/bin/config.sh
+    export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
+    # Start commands for jobmanager and taskmanager are generated by Flink internally.
+    echo "Start command: ${args[@]}"
+    exec $(drop_privs_cmd) bash -c "${args[@]}"
+fi
 
-    if [ -n "${FLINK_PROPERTIES}" ]; then
-        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
-    fi
-    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
+args=("${args[@]}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "$@"
-fi
+# Set the Flink related environments
+export _FLINK_HOME_DETERMINED=true
+. $FLINK_HOME/bin/config.sh
+export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
 
-exec "$@"
+# Running command in pass-through mode
+exec $(drop_privs_cmd) "${args[@]}"
diff --git a/flink_1.11-java11/Dockerfile b/flink_scala_2.11-java11/Dockerfile
similarity index 90%
copy from flink_1.11-java11/Dockerfile
copy to flink_scala_2.11-java11/Dockerfile
index 8b27b61..70310d6 100644
--- a/flink_1.11-java11/Dockerfile
+++ b/flink_scala_2.11-java11/Dockerfile
@@ -21,7 +21,7 @@ FROM openjdk:11-jre
 # Install dependencies
 RUN set -ex; \
   apt-get update; \
-  apt-get -y install libsnappy1v5 gettext-base; \
+  apt-get -y install libsnappy1v5 gettext-base libjemalloc-dev; \
   rm -rf /var/lib/apt/lists/*
 
 # Grab gosu for easy step-down from root
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.11.3/flink-1.11.3-bin-scala_2.12.tgz \
-    FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.11.3/flink-1.11.3-bin-scala_2.12.tgz.asc \
-    GPG_KEY=F8E419AA0B60C28879E876859DFF40967ABFC5A4 \
+ENV FLINK_TGZ_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz \
+    FLINK_ASC_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz.asc \
+    GPG_KEY=43CE299BC305AFF8B912AA95183F6944D9839159 \
     CHECK_GPG=true
 
 # Prepare environment
diff --git a/flink_1.10/docker-entrypoint.sh b/flink_scala_2.11-java11/docker-entrypoint.sh
similarity index 65%
copy from flink_1.10/docker-entrypoint.sh
copy to flink_scala_2.11-java11/docker-entrypoint.sh
index 6084f06..275488e 100755
--- a/flink_1.10/docker-entrypoint.sh
+++ b/flink_scala_2.11-java11/docker-entrypoint.sh
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/usr/bin/env bash
 
 ###############################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +19,9 @@
 ###############################################################################
 
 COMMAND_STANDALONE="standalone-job"
+# Deprecated, should be remove in Flink release 1.13
+COMMAND_NATIVE_KUBERNETES="native-k8s"
+COMMAND_HISTORY_SERVER="history-server"
 
 # If unspecified, the hostname of the container is taken as the JobManager address
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
@@ -73,17 +76,13 @@ set_config_option() {
   fi
 }
 
-set_common_options() {
+prepare_configuration() {
     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
     set_config_option blob.server.port 6124
     set_config_option query.server.port 6125
-}
-
-prepare_job_manager_start() {
-    echo "Starting Job Manager"
-    copy_plugins_if_required
 
-    set_common_options
+    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-1}
+    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
 
     if [ -n "${FLINK_PROPERTIES}" ]; then
         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
@@ -91,35 +90,65 @@ prepare_job_manager_start() {
     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
 }
 
+maybe_enable_jemalloc() {
+    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
+        export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
+    fi
+}
+
+maybe_enable_jemalloc
+
+copy_plugins_if_required
+
+prepare_configuration
+
+args=("$@")
 if [ "$1" = "help" ]; then
-    echo "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|help)"
+    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
+    printf "    Or $(basename "$0") help\n\n"
+    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
     exit 0
 elif [ "$1" = "jobmanager" ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
+
+    echo "Starting Job Manager"
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
 elif [ "$1" = ${COMMAND_STANDALONE} ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "$@"
+    echo "Starting Job Manager"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
+elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
+    args=("${args[@]:1}")
+
+    echo "Starting History Server"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
 elif [ "$1" = "taskmanager" ]; then
-    shift 1
+    args=("${args[@]:1}")
+
     echo "Starting Task Manager"
-    copy_plugins_if_required
 
-    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
+elif [ "$1" = "$COMMAND_NATIVE_KUBERNETES" ]; then
+    args=("${args[@]:1}")
 
-    set_common_options
-    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
+    export _FLINK_HOME_DETERMINED=true
+    . $FLINK_HOME/bin/config.sh
+    export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
+    # Start commands for jobmanager and taskmanager are generated by Flink internally.
+    echo "Start command: ${args[@]}"
+    exec $(drop_privs_cmd) bash -c "${args[@]}"
+fi
 
-    if [ -n "${FLINK_PROPERTIES}" ]; then
-        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
-    fi
-    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
+args=("${args[@]}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "$@"
-fi
+# Set the Flink related environments
+export _FLINK_HOME_DETERMINED=true
+. $FLINK_HOME/bin/config.sh
+export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
 
-exec "$@"
+# Running command in pass-through mode
+exec $(drop_privs_cmd) "${args[@]}"
diff --git a/flink_1.10-scala_2.11/Dockerfile b/flink_scala_2.11/Dockerfile
similarity index 90%
rename from flink_1.10-scala_2.11/Dockerfile
rename to flink_scala_2.11/Dockerfile
index 7c07b15..8c1291a 100644
--- a/flink_1.10-scala_2.11/Dockerfile
+++ b/flink_scala_2.11/Dockerfile
@@ -21,7 +21,7 @@ FROM openjdk:8-jre
 # Install dependencies
 RUN set -ex; \
   apt-get update; \
-  apt-get -y install libsnappy1v5 gettext-base; \
+  apt-get -y install libsnappy1v5 gettext-base libjemalloc-dev; \
   rm -rf /var/lib/apt/lists/*
 
 # Grab gosu for easy step-down from root
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.10.2/flink-1.10.2-bin-scala_2.11.tgz \
-    FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.10.2/flink-1.10.2-bin-scala_2.11.tgz.asc \
-    GPG_KEY=C63E230EFFF519A5BBF2C9AE6767487CD505859C \
+ENV FLINK_TGZ_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz \
+    FLINK_ASC_URL=https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz.asc \
+    GPG_KEY=43CE299BC305AFF8B912AA95183F6944D9839159 \
     CHECK_GPG=true
 
 # Prepare environment
diff --git a/flink_1.10/docker-entrypoint.sh b/flink_scala_2.11/docker-entrypoint.sh
similarity index 65%
rename from flink_1.10/docker-entrypoint.sh
rename to flink_scala_2.11/docker-entrypoint.sh
index 6084f06..275488e 100755
--- a/flink_1.10/docker-entrypoint.sh
+++ b/flink_scala_2.11/docker-entrypoint.sh
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/usr/bin/env bash
 
 ###############################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +19,9 @@
 ###############################################################################
 
 COMMAND_STANDALONE="standalone-job"
+# Deprecated, should be remove in Flink release 1.13
+COMMAND_NATIVE_KUBERNETES="native-k8s"
+COMMAND_HISTORY_SERVER="history-server"
 
 # If unspecified, the hostname of the container is taken as the JobManager address
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
@@ -73,17 +76,13 @@ set_config_option() {
   fi
 }
 
-set_common_options() {
+prepare_configuration() {
     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
     set_config_option blob.server.port 6124
     set_config_option query.server.port 6125
-}
-
-prepare_job_manager_start() {
-    echo "Starting Job Manager"
-    copy_plugins_if_required
 
-    set_common_options
+    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-1}
+    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
 
     if [ -n "${FLINK_PROPERTIES}" ]; then
         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
@@ -91,35 +90,65 @@ prepare_job_manager_start() {
     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
 }
 
+maybe_enable_jemalloc() {
+    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
+        export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
+    fi
+}
+
+maybe_enable_jemalloc
+
+copy_plugins_if_required
+
+prepare_configuration
+
+args=("$@")
 if [ "$1" = "help" ]; then
-    echo "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|help)"
+    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
+    printf "    Or $(basename "$0") help\n\n"
+    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
     exit 0
 elif [ "$1" = "jobmanager" ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
+
+    echo "Starting Job Manager"
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
 elif [ "$1" = ${COMMAND_STANDALONE} ]; then
-    shift 1
-    prepare_job_manager_start
+    args=("${args[@]:1}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "$@"
+    echo "Starting Job Manager"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
+elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
+    args=("${args[@]:1}")
+
+    echo "Starting History Server"
+
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
 elif [ "$1" = "taskmanager" ]; then
-    shift 1
+    args=("${args[@]:1}")
+
     echo "Starting Task Manager"
-    copy_plugins_if_required
 
-    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
+    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
+elif [ "$1" = "$COMMAND_NATIVE_KUBERNETES" ]; then
+    args=("${args[@]:1}")
 
-    set_common_options
-    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
+    export _FLINK_HOME_DETERMINED=true
+    . $FLINK_HOME/bin/config.sh
+    export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
+    # Start commands for jobmanager and taskmanager are generated by Flink internally.
+    echo "Start command: ${args[@]}"
+    exec $(drop_privs_cmd) bash -c "${args[@]}"
+fi
 
-    if [ -n "${FLINK_PROPERTIES}" ]; then
-        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
-    fi
-    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
+args=("${args[@]}")
 
-    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "$@"
-fi
+# Set the Flink related environments
+export _FLINK_HOME_DETERMINED=true
+. $FLINK_HOME/bin/config.sh
+export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
 
-exec "$@"
+# Running command in pass-through mode
+exec $(drop_privs_cmd) "${args[@]}"

@tianon
Copy link
Member

tianon commented Jan 27, 2021

(Reviewing again given this was updated and I'm assuming it's ready? 😅)

Per apache/flink#14630 (comment), this jemalloc usage looks much better; thank you. Looking forward to improved/extracted k8s integration for 1.13 (https://issues.apache.org/jira/browse/FLINK-21128). 👍

(Happy to merge now following confirmation that this is indeed "ready" from your perspective)

@zentol
Copy link
Contributor

zentol commented Jan 28, 2021

@tianon Yes, from our side it is ready to go 👍

@yosifkit yosifkit merged commit 4037f9b into docker-library:master Jan 28, 2021
@xintongsong
Copy link
Contributor

@tianon @yosifkit @rogaha
Thank you all for helping us work this out. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants