From acc174d7ca2295ffb64f68339dbd6c74a2c53840 Mon Sep 17 00:00:00 2001 From: vegetableysm Date: Thu, 28 Dec 2023 19:06:37 +0800 Subject: [PATCH 1/2] Fix dependency conflict between tez and vineyard. Signed-off-by: vegetableysm --- java/hive/docker/dependency/images/Dockerfile | 18 +- .../docker/dependency/images/bootstrap.sh | 12 +- .../images/hadoop-config/log4j.properties | 321 ++++ .../images/hadoop-config/mapred-site.xml | 50 + .../images/hadoop-config/yarn-hosts-exclude | 0 .../images/hadoop-config/yarn-hosts-include | 3 + java/hive/pom.xml | 16 + .../com/google/common/base/CharMatcher.java | 1148 +++++++++++++ .../java/com/google/common/base/Objects.java | 236 +++ .../java/com/google/common/base/Splitter.java | 539 ++++++ .../concurrent/AbstractCheckedFuture.java | 117 ++ .../common/util/concurrent/CheckedFuture.java | 76 + .../common/util/concurrent/Futures.java | 1464 +++++++++++++++++ .../ListeningScheduledExecutorService.java | 40 + .../common/util/concurrent/MoreExecutors.java | 478 ++++++ 15 files changed, 4507 insertions(+), 11 deletions(-) create mode 100644 java/hive/docker/dependency/images/hadoop-config/log4j.properties create mode 100644 java/hive/docker/dependency/images/hadoop-config/mapred-site.xml create mode 100644 java/hive/docker/dependency/images/hadoop-config/yarn-hosts-exclude create mode 100644 java/hive/docker/dependency/images/hadoop-config/yarn-hosts-include create mode 100644 java/hive/src/main/java/com/google/common/base/CharMatcher.java create mode 100644 java/hive/src/main/java/com/google/common/base/Objects.java create mode 100644 java/hive/src/main/java/com/google/common/base/Splitter.java create mode 100644 java/hive/src/main/java/com/google/common/util/concurrent/AbstractCheckedFuture.java create mode 100644 java/hive/src/main/java/com/google/common/util/concurrent/CheckedFuture.java create mode 100644 java/hive/src/main/java/com/google/common/util/concurrent/Futures.java create mode 100644 java/hive/src/main/java/com/google/common/util/concurrent/ListeningScheduledExecutorService.java create mode 100644 java/hive/src/main/java/com/google/common/util/concurrent/MoreExecutors.java diff --git a/java/hive/docker/dependency/images/Dockerfile b/java/hive/docker/dependency/images/Dockerfile index a86ac6bc..785afc41 100755 --- a/java/hive/docker/dependency/images/Dockerfile +++ b/java/hive/docker/dependency/images/Dockerfile @@ -1,5 +1,11 @@ -FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/hadoop:v1 - +# FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/hadoop:v1 +FROM apache/hadoop:3.3.5 + +RUN mkdir -p /opt/apache/ +RUN mv /opt/hadoop/ /opt/apache/ +ENV HADOOP_HOME=/opt/apache/hadoop +ENV HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop +ENV HIVE_HOME=/opt/apache/hive COPY ./vineyard-hive-0.1-SNAPSHOT.jar ${HADOOP_HOME}/share/hadoop/common/ # prepare hdoop config @@ -19,14 +25,16 @@ COPY hive /opt/apache/hive/ COPY hive-config/ /hive-config COPY hive-config-distributed/ /hive-config-distributed +ENV PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${HIVE_HOME}/bin:${PATH} + COPY bootstrap.sh /opt/apache/ COPY mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar ${HIVE_HOME}/lib/ RUN sudo yum -y install unzip RUN wget http://www.vuln.cn/wp-content/uploads/2019/08/libstdc.so_.6.0.26.zip -q && unzip libstdc.so_.6.0.26.zip -RUN cp libstdc++.so.6.0.26 /usr/lib64 -RUN rm /usr/lib64/libstdc++.so.6 -RUN ln -s /usr/lib64/libstdc++.so.6.0.26 /usr/lib64/libstdc++.so.6 +RUN sudo cp libstdc++.so.6.0.26 /usr/lib64 +RUN sudo rm /usr/lib64/libstdc++.so.6 +RUN sudo ln -s /usr/lib64/libstdc++.so.6.0.26 /usr/lib64/libstdc++.so.6 RUN sudo yum -y install vim diff --git a/java/hive/docker/dependency/images/bootstrap.sh b/java/hive/docker/dependency/images/bootstrap.sh index 97b36ef4..b5e9c401 100755 --- a/java/hive/docker/dependency/images/bootstrap.sh +++ b/java/hive/docker/dependency/images/bootstrap.sh @@ -12,33 +12,33 @@ start_hdfs_namenode() { fi ${HADOOP_HOME}/bin/hdfs --loglevel INFO namenode - tail -f ${HADOOP_HOME}/logs/*namenode*.log + tail -f /var/log/hadoop/*namenode*.log } start_hdfs_datanode() { wait_for $1 $2 ${HADOOP_HOME}/bin/hdfs --loglevel INFO --daemon start datanode - tail -f ${HADOOP_HOME}/logs/*datanode*.log + tail -f /var/log/hadoop/*datanode*.log } start_yarn_resourcemanager() { ${HADOOP_HOME}/bin/yarn --loglevel INFO --daemon start resourcemanager - tail -f ${HADOOP_HOME}/logs/*resourcemanager*.log + tail -f /var/log/hadoop/*resourcemanager*.log } start_yarn_nodemanager() { wait_for $1 $2 ${HADOOP_HOME}/bin/yarn --loglevel INFO --daemon start nodemanager - tail -f ${HADOOP_HOME}/logs/*nodemanager*.log + tail -f /var/log/hadoop/*nodemanager*.log } start_yarn_proxyserver() { wait_for $1 $2 ${HADOOP_HOME}/bin/yarn --loglevel INFO --daemon start proxyserver - tail -f ${HADOOP_HOME}/logs/*proxyserver*.log + tail -f /var/log/hadoop/*proxyserver*.log } start_mr_historyserver() { @@ -46,7 +46,7 @@ start_mr_historyserver() { wait_for $1 $2 ${HADOOP_HOME}/bin/mapred --loglevel INFO --daemon start historyserver - tail -f ${HADOOP_HOME}/logs/*historyserver*.log + tail -f /var/log/hadoop/*historyserver*.log } start_hive_metastore() { diff --git a/java/hive/docker/dependency/images/hadoop-config/log4j.properties b/java/hive/docker/dependency/images/hadoop-config/log4j.properties new file mode 100644 index 00000000..52d2c1ff --- /dev/null +++ b/java/hive/docker/dependency/images/hadoop-config/log4j.properties @@ -0,0 +1,321 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hadoop.root.logger=INFO,console +hadoop.log.dir=. +hadoop.log.file=hadoop.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hadoop.root.logger}, EventCounter + +# Logging Threshold +log4j.threshold=ALL + +# Null Appender +log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender + +# +# Rolling File Appender - cap space usage at 5gb. +# +hadoop.log.maxfilesize=256MB +hadoop.log.maxbackupindex=20 +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file} + +log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize} +log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex} + +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# Daily Rolling File Appender +# + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file} + +# Rollover at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n + +# +# TaskLog Appender +# +log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender + +log4j.appender.TLA.layout=org.apache.log4j.PatternLayout +log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# +# HDFS block state change log from block manager +# +# Uncomment the following to log normal block state change +# messages from BlockManager in NameNode. +#log4j.logger.BlockStateChange=DEBUG + +# +#Security appender +# +hadoop.security.logger=INFO,NullAppender +hadoop.security.log.maxfilesize=256MB +hadoop.security.log.maxbackupindex=20 +log4j.category.SecurityLogger=${hadoop.security.logger} +hadoop.security.log.file=SecurityAuth-${user.name}.audit +log4j.appender.RFAS=org.apache.log4j.RollingFileAppender +log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file} +log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout +log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize} +log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex} + +# +# Daily Rolling Security appender +# +log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file} +log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout +log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd + +# +# hadoop configuration logging +# + +# Uncomment the following line to turn off configuration deprecation warnings. +# log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN + +# +# hdfs audit logging +# +hdfs.audit.logger=INFO,NullAppender +hdfs.audit.log.maxfilesize=256MB +hdfs.audit.log.maxbackupindex=20 +log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger} +log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false +log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender +log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log +log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout +log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n +log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize} +log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex} + +# +# NameNode metrics logging. +# The default is to retain two namenode-metrics.log files up to 64MB each. +# +namenode.metrics.logger=INFO,NullAppender +log4j.logger.NameNodeMetricsLog=${namenode.metrics.logger} +log4j.additivity.NameNodeMetricsLog=false +log4j.appender.NNMETRICSRFA=org.apache.log4j.RollingFileAppender +log4j.appender.NNMETRICSRFA.File=${hadoop.log.dir}/namenode-metrics.log +log4j.appender.NNMETRICSRFA.layout=org.apache.log4j.PatternLayout +log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n +log4j.appender.NNMETRICSRFA.MaxBackupIndex=1 +log4j.appender.NNMETRICSRFA.MaxFileSize=64MB + +# +# DataNode metrics logging. +# The default is to retain two datanode-metrics.log files up to 64MB each. +# +datanode.metrics.logger=INFO,NullAppender +log4j.logger.DataNodeMetricsLog=${datanode.metrics.logger} +log4j.additivity.DataNodeMetricsLog=false +log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender +log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log +log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout +log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n +log4j.appender.DNMETRICSRFA.MaxBackupIndex=1 +log4j.appender.DNMETRICSRFA.MaxFileSize=64MB + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG +#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG +#log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=DEBUG + + +# AWS SDK & S3A FileSystem +#log4j.logger.com.amazonaws=ERROR +log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR +#log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter + + +# +# shuffle connection log from shuffleHandler +# Uncomment the following line to enable logging of shuffle connections +# log4j.logger.org.apache.hadoop.mapred.ShuffleHandler.audit=DEBUG + +# +# Yarn ResourceManager Application Summary Log +# +# Set the ResourceManager summary log filename +yarn.server.resourcemanager.appsummary.log.file=rm-appsummary.log +# Set the ResourceManager summary log level and appender +yarn.server.resourcemanager.appsummary.logger=${hadoop.root.logger} +#yarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY + +# To enable AppSummaryLogging for the RM, +# set yarn.server.resourcemanager.appsummary.logger to +# ,RMSUMMARY in hadoop-env.sh + +# Appender for ResourceManager Application Summary Log +# Requires the following properties to be set +# - hadoop.log.dir (Hadoop Log directory) +# - yarn.server.resourcemanager.appsummary.log.file (resource manager app summary log filename) +# - yarn.server.resourcemanager.appsummary.logger (resource manager app summary log level and appender) + +log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger} +log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false +log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender +log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file} +log4j.appender.RMSUMMARY.MaxFileSize=256MB +log4j.appender.RMSUMMARY.MaxBackupIndex=20 +log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout +log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n + +# +# YARN ResourceManager audit logging +# +rm.audit.logger=INFO,NullAppender +rm.audit.log.maxfilesize=256MB +rm.audit.log.maxbackupindex=20 +log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger=${rm.audit.logger} +log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger=false +log4j.appender.RMAUDIT=org.apache.log4j.RollingFileAppender +log4j.appender.RMAUDIT.File=${hadoop.log.dir}/rm-audit.log +log4j.appender.RMAUDIT.layout=org.apache.log4j.PatternLayout +log4j.appender.RMAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n +log4j.appender.RMAUDIT.MaxFileSize=${rm.audit.log.maxfilesize} +log4j.appender.RMAUDIT.MaxBackupIndex=${rm.audit.log.maxbackupindex} + +# +# YARN NodeManager audit logging +# +nm.audit.logger=INFO,NullAppender +nm.audit.log.maxfilesize=256MB +nm.audit.log.maxbackupindex=20 +log4j.logger.org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger=${nm.audit.logger} +log4j.additivity.org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger=false +log4j.appender.NMAUDIT=org.apache.log4j.RollingFileAppender +log4j.appender.NMAUDIT.File=${hadoop.log.dir}/nm-audit.log +log4j.appender.NMAUDIT.layout=org.apache.log4j.PatternLayout +log4j.appender.NMAUDIT.layout.ConversionPattern=%d{ISO8601}%p %c{2}: %m%n +log4j.appender.NMAUDIT.MaxFileSize=${nm.audit.log.maxfilesize} +log4j.appender.NMAUDIT.MaxBackupIndex=${nm.audit.log.maxbackupindex} + +# HS audit log configs +#mapreduce.hs.audit.logger=INFO,HSAUDIT +#log4j.logger.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=${mapreduce.hs.audit.logger} +#log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false +#log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender +#log4j.appender.HSAUDIT.File=${hadoop.log.dir}/hs-audit.log +#log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout +#log4j.appender.HSAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n +#log4j.appender.HSAUDIT.DatePattern=.yyyy-MM-dd + +# Http Server Request Logs +#log4j.logger.http.requests.namenode=INFO,namenoderequestlog +#log4j.appender.namenoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender +#log4j.appender.namenoderequestlog.Filename=${hadoop.log.dir}/jetty-namenode-yyyy_mm_dd.log +#log4j.appender.namenoderequestlog.RetainDays=3 + +#log4j.logger.http.requests.datanode=INFO,datanoderequestlog +#log4j.appender.datanoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender +#log4j.appender.datanoderequestlog.Filename=${hadoop.log.dir}/jetty-datanode-yyyy_mm_dd.log +#log4j.appender.datanoderequestlog.RetainDays=3 + +#log4j.logger.http.requests.resourcemanager=INFO,resourcemanagerrequestlog +#log4j.appender.resourcemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender +#log4j.appender.resourcemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-resourcemanager-yyyy_mm_dd.log +#log4j.appender.resourcemanagerrequestlog.RetainDays=3 + +#log4j.logger.http.requests.jobhistory=INFO,jobhistoryrequestlog +#log4j.appender.jobhistoryrequestlog=org.apache.hadoop.http.HttpRequestLogAppender +#log4j.appender.jobhistoryrequestlog.Filename=${hadoop.log.dir}/jetty-jobhistory-yyyy_mm_dd.log +#log4j.appender.jobhistoryrequestlog.RetainDays=3 + +#log4j.logger.http.requests.nodemanager=INFO,nodemanagerrequestlog +#log4j.appender.nodemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender +#log4j.appender.nodemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-nodemanager-yyyy_mm_dd.log +#log4j.appender.nodemanagerrequestlog.RetainDays=3 + +# WebHdfs request log on datanodes +# Specify -Ddatanode.webhdfs.logger=INFO,HTTPDRFA on datanode startup to +# direct the log to a separate file. +#datanode.webhdfs.logger=INFO,console +#log4j.logger.datanode.webhdfs=${datanode.webhdfs.logger} +#log4j.appender.HTTPDRFA=org.apache.log4j.DailyRollingFileAppender +#log4j.appender.HTTPDRFA.File=${hadoop.log.dir}/hadoop-datanode-webhdfs.log +#log4j.appender.HTTPDRFA.layout=org.apache.log4j.PatternLayout +#log4j.appender.HTTPDRFA.layout.ConversionPattern=%d{ISO8601} %m%n +#log4j.appender.HTTPDRFA.DatePattern=.yyyy-MM-dd + + +# Appender for viewing information for errors and warnings +yarn.ewma.cleanupInterval=300 +yarn.ewma.messageAgeLimitSeconds=86400 +yarn.ewma.maxUniqueMessages=250 +log4j.appender.EWMA=org.apache.hadoop.yarn.util.Log4jWarningErrorMetricsAppender +log4j.appender.EWMA.cleanupInterval=${yarn.ewma.cleanupInterval} +log4j.appender.EWMA.messageAgeLimitSeconds=${yarn.ewma.messageAgeLimitSeconds} +log4j.appender.EWMA.maxUniqueMessages=${yarn.ewma.maxUniqueMessages} + +# +# Fair scheduler state dump +# +# Use following logger to dump the state to a separate file + +#log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.statedump=DEBUG,FSSTATEDUMP +#log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.statedump=false +#log4j.appender.FSSTATEDUMP=org.apache.log4j.RollingFileAppender +#log4j.appender.FSSTATEDUMP.File=${hadoop.log.dir}/fairscheduler-statedump.log +#log4j.appender.FSSTATEDUMP.layout=org.apache.log4j.PatternLayout +#log4j.appender.FSSTATEDUMP.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +#log4j.appender.FSSTATEDUMP.MaxFileSize=${hadoop.log.maxfilesize} +#log4j.appender.FSSTATEDUMP.MaxBackupIndex=${hadoop.log.maxbackupindex} + +# Log levels of third-party libraries +log4j.logger.org.apache.commons.beanutils=WARN diff --git a/java/hive/docker/dependency/images/hadoop-config/mapred-site.xml b/java/hive/docker/dependency/images/hadoop-config/mapred-site.xml new file mode 100644 index 00000000..52116911 --- /dev/null +++ b/java/hive/docker/dependency/images/hadoop-config/mapred-site.xml @@ -0,0 +1,50 @@ + + + + + + + + + mapreduce.framework.name + yarn + + + + mapreduce.jobhistory.address + hadoop-mr-historyserver:10020 + + + + mapreduce.jobhistory.webapp.address + hadoop-mr-historyserver:19888 + + + + yarn.app.mapreduce.am.env + HADOOP_MAPRED_HOME=${HADOOP_HOME} + + + + mapreduce.map.env + HADOOP_MAPRED_HOME=${HADOOP_HOME} + + + + mapreduce.reduce.env + HADOOP_MAPRED_HOME=${HADOOP_HOME} + + + diff --git a/java/hive/docker/dependency/images/hadoop-config/yarn-hosts-exclude b/java/hive/docker/dependency/images/hadoop-config/yarn-hosts-exclude new file mode 100644 index 00000000..e69de29b diff --git a/java/hive/docker/dependency/images/hadoop-config/yarn-hosts-include b/java/hive/docker/dependency/images/hadoop-config/yarn-hosts-include new file mode 100644 index 00000000..35d8ab21 --- /dev/null +++ b/java/hive/docker/dependency/images/hadoop-config/yarn-hosts-include @@ -0,0 +1,3 @@ +hadoop-yarn-nm-0 +hadoop-yarn-nm-1 +hadoop-yarn-nm-2 diff --git a/java/hive/pom.xml b/java/hive/pom.xml index ce064ee7..d2148698 100644 --- a/java/hive/pom.xml +++ b/java/hive/pom.xml @@ -186,6 +186,22 @@ com/google/common/jimfs/FileSystemState.class + + com.google.guava:guava + + ** + + + com/google/common/base/Objects.class + com/google/common/base/CharMatcher.class + com/google/common/base/Splitter.class + com/google/common/util/concurrent/AbstractCheckedFuture.class + com/google/common/util/concurrent/CheckedFuture.class + com/google/common/util/concurrent/Futures.class + com/google/common/util/concurrent/ListeningScheduledExecutorService.class + com/google/common/util/concurrent/MoreExecutors.class + + diff --git a/java/hive/src/main/java/com/google/common/base/CharMatcher.java b/java/hive/src/main/java/com/google/common/base/CharMatcher.java new file mode 100644 index 00000000..1bea5c8e --- /dev/null +++ b/java/hive/src/main/java/com/google/common/base/CharMatcher.java @@ -0,0 +1,1148 @@ +/* + * Copyright (C) 2008 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.base; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.GwtCompatible; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.CheckReturnValue; + +/** + * Determines a true or false value for any Java {@code char} value, just as {@link Predicate} does + * for any {@link Object}. Also offers basic text processing methods based on this function. + * Implementations are strongly encouraged to be side-effect-free and immutable. + * + *

Throughout the documentation of this class, the phrase "matching character" is used to mean + * "any character {@code c} for which {@code this.matches(c)} returns {@code true}". + * + *

Note: This class deals only with {@code char} values; it does not understand + * supplementary Unicode code points in the range {@code 0x10000} to {@code 0x10FFFF}. Such logical + * characters are encoded into a {@code String} using surrogate pairs, and a {@code CharMatcher} + * treats these just as two separate characters. + * + *

Example usages:

+ *   String trimmed = {@link #WHITESPACE WHITESPACE}.{@link #trimFrom trimFrom}(userInput);
+ *   if ({@link #ASCII ASCII}.{@link #matchesAllOf matchesAllOf}(s)) { ... }
+ * + * @author Kevin Bourrillion + * @since 1.0 + */ +@Beta // Possibly change from chars to code points; decide constants vs. methods +@GwtCompatible +public abstract class CharMatcher implements Predicate { + // Constants + + // Excludes 2000-2000a, which is handled as a range + private static final String BREAKING_WHITESPACE_CHARS = + "\t\n\013\f\r \u0085\u1680\u2028\u2029\u205f\u3000"; + + // Excludes 2007, which is handled as a gap in a pair of ranges + private static final String NON_BREAKING_WHITESPACE_CHARS = + "\u00a0\u180e\u202f"; + + /** + * Determines whether a character is whitespace according to the latest Unicode standard, as + * illustrated + * here. + * This is not the same definition used by other Java APIs. (See a + * comparison of several + * definitions of "whitespace".) + * + *

Note: as the Unicode definition evolves, we will modify this constant to keep it up + * to date. + */ + public static final CharMatcher WHITESPACE = + anyOf(BREAKING_WHITESPACE_CHARS + NON_BREAKING_WHITESPACE_CHARS) + .or(inRange('\u2000', '\u200a')) + .precomputed(); + + /** + * Determines whether a character is a breaking whitespace (that is, a whitespace which can be + * interpreted as a break between words for formatting purposes). See {@link #WHITESPACE} for a + * discussion of that term. + * + * @since 2.0 + */ + public static final CharMatcher BREAKING_WHITESPACE = + anyOf(BREAKING_WHITESPACE_CHARS) + .or(inRange('\u2000', '\u2006')) + .or(inRange('\u2008', '\u200a')) + .precomputed(); + + /** + * Determines whether a character is ASCII, meaning that its code point is less than 128. + */ + public static final CharMatcher ASCII = inRange('\0', '\u007f'); + + /** + * Determines whether a character is a digit according to + * Unicode. + */ + public static final CharMatcher DIGIT; + + static { + CharMatcher digit = inRange('0', '9'); + String zeroes = + "\u0660\u06f0\u07c0\u0966\u09e6\u0a66\u0ae6\u0b66\u0be6\u0c66" + + "\u0ce6\u0d66\u0e50\u0ed0\u0f20\u1040\u1090\u17e0\u1810\u1946" + + "\u19d0\u1b50\u1bb0\u1c40\u1c50\ua620\ua8d0\ua900\uaa50\uff10"; + for (char base : zeroes.toCharArray()) { + digit = digit.or(inRange(base, (char) (base + 9))); + } + DIGIT = digit.precomputed(); + } + + /** + * Determines whether a character is a digit according to {@link Character#isDigit(char) Java's + * definition}. If you only care to match ASCII digits, you can use {@code inRange('0', '9')}. + */ + public static final CharMatcher JAVA_DIGIT = new CharMatcher() { + @Override public boolean matches(char c) { + return Character.isDigit(c); + } + }; + + /** + * Determines whether a character is a letter according to {@link Character#isLetter(char) Java's + * definition}. If you only care to match letters of the Latin alphabet, you can use {@code + * inRange('a', 'z').or(inRange('A', 'Z'))}. + */ + public static final CharMatcher JAVA_LETTER = new CharMatcher() { + @Override public boolean matches(char c) { + return Character.isLetter(c); + } + }; + + /** + * Determines whether a character is a letter or digit according to {@link + * Character#isLetterOrDigit(char) Java's definition}. + */ + public static final CharMatcher JAVA_LETTER_OR_DIGIT = new CharMatcher() { + @Override public boolean matches(char c) { + return Character.isLetterOrDigit(c); + } + }; + + /** + * Determines whether a character is upper case according to {@link Character#isUpperCase(char) + * Java's definition}. + */ + public static final CharMatcher JAVA_UPPER_CASE = new CharMatcher() { + @Override public boolean matches(char c) { + return Character.isUpperCase(c); + } + }; + + /** + * Determines whether a character is lower case according to {@link Character#isLowerCase(char) + * Java's definition}. + */ + public static final CharMatcher JAVA_LOWER_CASE = new CharMatcher() { + @Override public boolean matches(char c) { + return Character.isLowerCase(c); + } + }; + + /** + * Determines whether a character is an ISO control character as specified by {@link + * Character#isISOControl(char)}. + */ + public static final CharMatcher JAVA_ISO_CONTROL = + inRange('\u0000', '\u001f').or(inRange('\u007f', '\u009f')); + + /** + * Determines whether a character is invisible; that is, if its Unicode category is any of + * SPACE_SEPARATOR, LINE_SEPARATOR, PARAGRAPH_SEPARATOR, CONTROL, FORMAT, SURROGATE, and + * PRIVATE_USE according to ICU4J. + */ + public static final CharMatcher INVISIBLE = inRange('\u0000', '\u0020') + .or(inRange('\u007f', '\u00a0')) + .or(is('\u00ad')) + .or(inRange('\u0600', '\u0603')) + .or(anyOf("\u06dd\u070f\u1680\u17b4\u17b5\u180e")) + .or(inRange('\u2000', '\u200f')) + .or(inRange('\u2028', '\u202f')) + .or(inRange('\u205f', '\u2064')) + .or(inRange('\u206a', '\u206f')) + .or(is('\u3000')) + .or(inRange('\ud800', '\uf8ff')) + .or(anyOf("\ufeff\ufff9\ufffa\ufffb")) + .precomputed(); + + /** + * Determines whether a character is single-width (not double-width). When in doubt, this matcher + * errs on the side of returning {@code false} (that is, it tends to assume a character is + * double-width). + * + *

Note: as the reference file evolves, we will modify this constant to keep it up to + * date. + */ + public static final CharMatcher SINGLE_WIDTH = inRange('\u0000', '\u04f9') + .or(is('\u05be')) + .or(inRange('\u05d0', '\u05ea')) + .or(is('\u05f3')) + .or(is('\u05f4')) + .or(inRange('\u0600', '\u06ff')) + .or(inRange('\u0750', '\u077f')) + .or(inRange('\u0e00', '\u0e7f')) + .or(inRange('\u1e00', '\u20af')) + .or(inRange('\u2100', '\u213a')) + .or(inRange('\ufb50', '\ufdff')) + .or(inRange('\ufe70', '\ufeff')) + .or(inRange('\uff61', '\uffdc')) + .precomputed(); + + /** Matches any character. */ + public static final CharMatcher ANY = + new CharMatcher() { + @Override public boolean matches(char c) { + return true; + } + + @Override public int indexIn(CharSequence sequence) { + return (sequence.length() == 0) ? -1 : 0; + } + + @Override public int indexIn(CharSequence sequence, int start) { + int length = sequence.length(); + Preconditions.checkPositionIndex(start, length); + return (start == length) ? -1 : start; + } + + @Override public int lastIndexIn(CharSequence sequence) { + return sequence.length() - 1; + } + + @Override public boolean matchesAllOf(CharSequence sequence) { + checkNotNull(sequence); + return true; + } + + @Override public boolean matchesNoneOf(CharSequence sequence) { + return sequence.length() == 0; + } + + @Override public String removeFrom(CharSequence sequence) { + checkNotNull(sequence); + return ""; + } + + @Override public String replaceFrom(CharSequence sequence, char replacement) { + char[] array = new char[sequence.length()]; + Arrays.fill(array, replacement); + return new String(array); + } + + @Override public String replaceFrom(CharSequence sequence, CharSequence replacement) { + StringBuilder retval = new StringBuilder(sequence.length() * replacement.length()); + for (int i = 0; i < sequence.length(); i++) { + retval.append(replacement); + } + return retval.toString(); + } + + @Override public String collapseFrom(CharSequence sequence, char replacement) { + return (sequence.length() == 0) ? "" : String.valueOf(replacement); + } + + @Override public String trimFrom(CharSequence sequence) { + checkNotNull(sequence); + return ""; + } + + @Override public int countIn(CharSequence sequence) { + return sequence.length(); + } + + @Override public CharMatcher and(CharMatcher other) { + return checkNotNull(other); + } + + @Override public CharMatcher or(CharMatcher other) { + checkNotNull(other); + return this; + } + + @Override public CharMatcher negate() { + return NONE; + } + + @Override public CharMatcher precomputed() { + return this; + } + }; + + /** Matches no characters. */ + public static final CharMatcher NONE = + new CharMatcher() { + @Override public boolean matches(char c) { + return false; + } + + @Override public int indexIn(CharSequence sequence) { + checkNotNull(sequence); + return -1; + } + + @Override public int indexIn(CharSequence sequence, int start) { + int length = sequence.length(); + Preconditions.checkPositionIndex(start, length); + return -1; + } + + @Override public int lastIndexIn(CharSequence sequence) { + checkNotNull(sequence); + return -1; + } + + @Override public boolean matchesAllOf(CharSequence sequence) { + return sequence.length() == 0; + } + + @Override public boolean matchesNoneOf(CharSequence sequence) { + checkNotNull(sequence); + return true; + } + + @Override public String removeFrom(CharSequence sequence) { + return sequence.toString(); + } + + @Override public String replaceFrom(CharSequence sequence, char replacement) { + return sequence.toString(); + } + + @Override public String replaceFrom(CharSequence sequence, CharSequence replacement) { + checkNotNull(replacement); + return sequence.toString(); + } + + @Override public String collapseFrom(CharSequence sequence, char replacement) { + return sequence.toString(); + } + + @Override public String trimFrom(CharSequence sequence) { + return sequence.toString(); + } + + @Override public int countIn(CharSequence sequence) { + checkNotNull(sequence); + return 0; + } + + @Override public CharMatcher and(CharMatcher other) { + checkNotNull(other); + return this; + } + + @Override public CharMatcher or(CharMatcher other) { + return checkNotNull(other); + } + + @Override public CharMatcher negate() { + return ANY; + } + + @Override void setBits(LookupTable table) {} + + @Override public CharMatcher precomputed() { + return this; + } + }; + + // Static factories + + /** + * Returns a {@code char} matcher that matches only one specified character. + */ + public static CharMatcher is(final char match) { + return new CharMatcher() { + @Override public boolean matches(char c) { + return c == match; + } + + @Override public String replaceFrom(CharSequence sequence, char replacement) { + return sequence.toString().replace(match, replacement); + } + + @Override public CharMatcher and(CharMatcher other) { + return other.matches(match) ? this : NONE; + } + + @Override public CharMatcher or(CharMatcher other) { + return other.matches(match) ? other : super.or(other); + } + + @Override public CharMatcher negate() { + return isNot(match); + } + + @Override void setBits(LookupTable table) { + table.set(match); + } + + @Override public CharMatcher precomputed() { + return this; + } + }; + } + + /** + * Returns a {@code char} matcher that matches any character except the one specified. + * + *

To negate another {@code CharMatcher}, use {@link #negate()}. + */ + public static CharMatcher isNot(final char match) { + return new CharMatcher() { + @Override public boolean matches(char c) { + return c != match; + } + + @Override public CharMatcher and(CharMatcher other) { + return other.matches(match) ? super.and(other) : other; + } + + @Override public CharMatcher or(CharMatcher other) { + return other.matches(match) ? ANY : this; + } + + @Override public CharMatcher negate() { + return is(match); + } + }; + } + + /** + * Returns a {@code char} matcher that matches any character present in the given character + * sequence. + */ + public static CharMatcher anyOf(final CharSequence sequence) { + switch (sequence.length()) { + case 0: + return NONE; + case 1: + return is(sequence.charAt(0)); + case 2: + final char match1 = sequence.charAt(0); + final char match2 = sequence.charAt(1); + return new CharMatcher() { + @Override public boolean matches(char c) { + return c == match1 || c == match2; + } + + @Override void setBits(LookupTable table) { + table.set(match1); + table.set(match2); + } + + @Override public CharMatcher precomputed() { + return this; + } + }; + } + + final char[] chars = sequence.toString().toCharArray(); + Arrays.sort(chars); // not worth collapsing duplicates + + return new CharMatcher() { + @Override public boolean matches(char c) { + return Arrays.binarySearch(chars, c) >= 0; + } + + @Override void setBits(LookupTable table) { + for (char c : chars) { + table.set(c); + } + } + }; + } + + /** + * Returns a {@code char} matcher that matches any character not present in the given character + * sequence. + */ + public static CharMatcher noneOf(CharSequence sequence) { + return anyOf(sequence).negate(); + } + + /** + * Returns a {@code char} matcher that matches any character in a given range (both endpoints are + * inclusive). For example, to match any lowercase letter of the English alphabet, use {@code + * CharMatcher.inRange('a', 'z')}. + * + * @throws IllegalArgumentException if {@code endInclusive < startInclusive} + */ + public static CharMatcher inRange(final char startInclusive, final char endInclusive) { + checkArgument(endInclusive >= startInclusive); + return new CharMatcher() { + @Override public boolean matches(char c) { + return startInclusive <= c && c <= endInclusive; + } + + @Override void setBits(LookupTable table) { + char c = startInclusive; + while (true) { + table.set(c); + if (c++ == endInclusive) { + break; + } + } + } + + @Override public CharMatcher precomputed() { + return this; + } + }; + } + + /** + * Returns a matcher with identical behavior to the given {@link Character}-based predicate, but + * which operates on primitive {@code char} instances instead. + */ + public static CharMatcher forPredicate(final Predicate predicate) { + checkNotNull(predicate); + if (predicate instanceof CharMatcher) { + return (CharMatcher) predicate; + } + return new CharMatcher() { + @Override public boolean matches(char c) { + return predicate.apply(c); + } + + @Override public boolean apply(Character character) { + return predicate.apply(checkNotNull(character)); + } + }; + } + + // Constructors + + /** + * Constructor for use by subclasses. + */ + protected CharMatcher() {} + + // Abstract methods + + /** Determines a true or false value for the given character. */ + public abstract boolean matches(char c); + + // Non-static factories + + /** + * Returns a matcher that matches any character not matched by this matcher. + */ + public CharMatcher negate() { + final CharMatcher original = this; + return new CharMatcher() { + @Override public boolean matches(char c) { + return !original.matches(c); + } + + @Override public boolean matchesAllOf(CharSequence sequence) { + return original.matchesNoneOf(sequence); + } + + @Override public boolean matchesNoneOf(CharSequence sequence) { + return original.matchesAllOf(sequence); + } + + @Override public int countIn(CharSequence sequence) { + return sequence.length() - original.countIn(sequence); + } + + @Override public CharMatcher negate() { + return original; + } + }; + } + + /** + * Returns a matcher that matches any character matched by both this matcher and {@code other}. + */ + public CharMatcher and(CharMatcher other) { + return new And(Arrays.asList(this, checkNotNull(other))); + } + + private static class And extends CharMatcher { + List components; + + And(List components) { + this.components = components; // Skip defensive copy (private) + } + + @Override public boolean matches(char c) { + for (CharMatcher matcher : components) { + if (!matcher.matches(c)) { + return false; + } + } + return true; + } + + @Override public CharMatcher and(CharMatcher other) { + List newComponents = new ArrayList(components); + newComponents.add(checkNotNull(other)); + return new And(newComponents); + } + } + + /** + * Returns a matcher that matches any character matched by either this matcher or {@code other}. + */ + public CharMatcher or(CharMatcher other) { + return new Or(Arrays.asList(this, checkNotNull(other))); + } + + private static class Or extends CharMatcher { + List components; + + Or(List components) { + this.components = components; // Skip defensive copy (private) + } + + @Override public boolean matches(char c) { + for (CharMatcher matcher : components) { + if (matcher.matches(c)) { + return true; + } + } + return false; + } + + @Override public CharMatcher or(CharMatcher other) { + List newComponents = new ArrayList(components); + newComponents.add(checkNotNull(other)); + return new Or(newComponents); + } + + @Override void setBits(LookupTable table) { + for (CharMatcher matcher : components) { + matcher.setBits(table); + } + } + } + + /** + * Returns a {@code char} matcher functionally equivalent to this one, but which may be faster to + * query than the original; your mileage may vary. Precomputation takes time and is likely to be + * worthwhile only if the precomputed matcher is queried many thousands of times. + * + *

This method has no effect (returns {@code this}) when called in GWT: it's unclear whether a + * precomputed matcher is faster, but it certainly consumes more memory, which doesn't seem like a + * worthwhile tradeoff in a browser. + */ + public CharMatcher precomputed() { + return Platform.precomputeCharMatcher(this); + } + + /** + * This is the actual implementation of {@link #precomputed}, but we bounce calls through a method + * on {@link Platform} so that we can have different behavior in GWT. + * + *

The default precomputation is to cache the configuration of the original matcher in an + * eight-kilobyte bit array. In some situations this produces a matcher which is faster to query + * than the original. + * + *

The default implementation creates a new bit array and passes it to {@link + * #setBits(LookupTable)}. + */ + CharMatcher precomputedInternal() { + final LookupTable table = new LookupTable(); + setBits(table); + + return new CharMatcher() { + @Override public boolean matches(char c) { + return table.get(c); + } + + // TODO(kevinb): make methods like negate() smart? + + @Override public CharMatcher precomputed() { + return this; + } + }; + } + + /** + * For use by implementors; sets the bit corresponding to each character ('\0' to '{@literal + * \}uFFFF') that matches this matcher in the given bit array, leaving all other bits untouched. + * + *

The default implementation loops over every possible character value, invoking {@link + * #matches} for each one. + */ + void setBits(LookupTable table) { + char c = Character.MIN_VALUE; + while (true) { + if (matches(c)) { + table.set(c); + } + if (c++ == Character.MAX_VALUE) { + break; + } + } + } + + /** + * A bit array with one bit per {@code char} value, used by {@link CharMatcher#precomputed}. + * + *

TODO(kevinb): possibly share a common BitArray class with BloomFilter and others... a + * simpler java.util.BitSet. + */ + private static final class LookupTable { + int[] data = new int[2048]; + + void set(char index) { + data[index >> 5] |= (1 << index); + } + + boolean get(char index) { + return (data[index >> 5] & (1 << index)) != 0; + } + } + + // Text processing routines + + /** + * Returns {@code true} if a character sequence contains at least one matching character. + * Equivalent to {@code !matchesNoneOf(sequence)}. + * + *

The default implementation iterates over the sequence, invoking {@link #matches} for each + * character, until this returns {@code true} or the end is reached. + * + * @param sequence the character sequence to examine, possibly empty + * @return {@code true} if this matcher matches at least one character in the sequence + * @since 8.0 + */ + public boolean matchesAnyOf(CharSequence sequence) { + return !matchesNoneOf(sequence); + } + + /** + * Returns {@code true} if a character sequence contains only matching characters. + * + *

The default implementation iterates over the sequence, invoking {@link #matches} for each + * character, until this returns {@code false} or the end is reached. + * + * @param sequence the character sequence to examine, possibly empty + * @return {@code true} if this matcher matches every character in the sequence, including when + * the sequence is empty + */ + public boolean matchesAllOf(CharSequence sequence) { + for (int i = sequence.length() - 1; i >= 0; i--) { + if (!matches(sequence.charAt(i))) { + return false; + } + } + return true; + } + + /** + * Returns {@code true} if a character sequence contains no matching characters. Equivalent to + * {@code !matchesAnyOf(sequence)}. + * + *

The default implementation iterates over the sequence, invoking {@link #matches} for each + * character, until this returns {@code false} or the end is reached. + * + * @param sequence the character sequence to examine, possibly empty + * @return {@code true} if this matcher matches every character in the sequence, including when + * the sequence is empty + */ + public boolean matchesNoneOf(CharSequence sequence) { + return indexIn(sequence) == -1; + } + + // TODO(kevinb): add matchesAnyOf() + + /** + * Returns the index of the first matching character in a character sequence, or {@code -1} if no + * matching character is present. + * + *

The default implementation iterates over the sequence in forward order calling {@link + * #matches} for each character. + * + * @param sequence the character sequence to examine from the beginning + * @return an index, or {@code -1} if no character matches + */ + public int indexIn(CharSequence sequence) { + int length = sequence.length(); + for (int i = 0; i < length; i++) { + if (matches(sequence.charAt(i))) { + return i; + } + } + return -1; + } + + /** + * Returns the index of the first matching character in a character sequence, starting from a + * given position, or {@code -1} if no character matches after that position. + * + *

The default implementation iterates over the sequence in forward order, beginning at {@code + * start}, calling {@link #matches} for each character. + * + * @param sequence the character sequence to examine + * @param start the first index to examine; must be nonnegative and no greater than {@code + * sequence.length()} + * @return the index of the first matching character, guaranteed to be no less than {@code start}, + * or {@code -1} if no character matches + * @throws IndexOutOfBoundsException if start is negative or greater than {@code + * sequence.length()} + */ + public int indexIn(CharSequence sequence, int start) { + int length = sequence.length(); + Preconditions.checkPositionIndex(start, length); + for (int i = start; i < length; i++) { + if (matches(sequence.charAt(i))) { + return i; + } + } + return -1; + } + + /** + * Returns the index of the last matching character in a character sequence, or {@code -1} if no + * matching character is present. + * + *

The default implementation iterates over the sequence in reverse order calling {@link + * #matches} for each character. + * + * @param sequence the character sequence to examine from the end + * @return an index, or {@code -1} if no character matches + */ + public int lastIndexIn(CharSequence sequence) { + for (int i = sequence.length() - 1; i >= 0; i--) { + if (matches(sequence.charAt(i))) { + return i; + } + } + return -1; + } + + /** + * Returns the number of matching characters found in a character sequence. + */ + public int countIn(CharSequence sequence) { + int count = 0; + for (int i = 0; i < sequence.length(); i++) { + if (matches(sequence.charAt(i))) { + count++; + } + } + return count; + } + + /** + * Returns a string containing all non-matching characters of a character sequence, in order. For + * example:

   {@code
+   *
+   *   CharMatcher.is('a').removeFrom("bazaar")}
+ * + * ... returns {@code "bzr"}. + */ + @CheckReturnValue + public String removeFrom(CharSequence sequence) { + String string = sequence.toString(); + int pos = indexIn(string); + if (pos == -1) { + return string; + } + + char[] chars = string.toCharArray(); + int spread = 1; + + // This unusual loop comes from extensive benchmarking + OUT: while (true) { + pos++; + while (true) { + if (pos == chars.length) { + break OUT; + } + if (matches(chars[pos])) { + break; + } + chars[pos - spread] = chars[pos]; + pos++; + } + spread++; + } + return new String(chars, 0, pos - spread); + } + + /** + * Returns a string containing all matching characters of a character sequence, in order. For + * example:
   {@code
+   *
+   *   CharMatcher.is('a').retainFrom("bazaar")}
+ * + * ... returns {@code "aaa"}. + */ + @CheckReturnValue + public String retainFrom(CharSequence sequence) { + return negate().removeFrom(sequence); + } + + /** + * Returns a string copy of the input character sequence, with each character that matches this + * matcher replaced by a given replacement character. For example:
   {@code
+   *
+   *   CharMatcher.is('a').replaceFrom("radar", 'o')}
+ * + * ... returns {@code "rodor"}. + * + *

The default implementation uses {@link #indexIn(CharSequence)} to find the first matching + * character, then iterates the remainder of the sequence calling {@link #matches(char)} for each + * character. + * + * @param sequence the character sequence to replace matching characters in + * @param replacement the character to append to the result string in place of each matching + * character in {@code sequence} + * @return the new string + */ + @CheckReturnValue + public String replaceFrom(CharSequence sequence, char replacement) { + String string = sequence.toString(); + int pos = indexIn(string); + if (pos == -1) { + return string; + } + char[] chars = string.toCharArray(); + chars[pos] = replacement; + for (int i = pos + 1; i < chars.length; i++) { + if (matches(chars[i])) { + chars[i] = replacement; + } + } + return new String(chars); + } + + /** + * Returns a string copy of the input character sequence, with each character that matches this + * matcher replaced by a given replacement sequence. For example:

   {@code
+   *
+   *   CharMatcher.is('a').replaceFrom("yaha", "oo")}
+ * + * ... returns {@code "yoohoo"}. + * + *

Note: If the replacement is a fixed string with only one character, you are better + * off calling {@link #replaceFrom(CharSequence, char)} directly. + * + * @param sequence the character sequence to replace matching characters in + * @param replacement the characters to append to the result string in place of each matching + * character in {@code sequence} + * @return the new string + */ + @CheckReturnValue + public String replaceFrom(CharSequence sequence, CharSequence replacement) { + int replacementLen = replacement.length(); + if (replacementLen == 0) { + return removeFrom(sequence); + } + if (replacementLen == 1) { + return replaceFrom(sequence, replacement.charAt(0)); + } + + String string = sequence.toString(); + int pos = indexIn(string); + if (pos == -1) { + return string; + } + + int len = string.length(); + StringBuilder buf = new StringBuilder((len * 3 / 2) + 16); + + int oldpos = 0; + do { + buf.append(string, oldpos, pos); + buf.append(replacement); + oldpos = pos + 1; + pos = indexIn(string, oldpos); + } while (pos != -1); + + buf.append(string, oldpos, len); + return buf.toString(); + } + + /** + * Returns a substring of the input character sequence that omits all characters this matcher + * matches from the beginning and from the end of the string. For example:

   {@code
+   *
+   *   CharMatcher.anyOf("ab").trimFrom("abacatbab")}
+ * + * ... returns {@code "cat"}. + * + *

Note that:

   {@code
+   *
+   *   CharMatcher.inRange('\0', ' ').trimFrom(str)}
+ * + * ... is equivalent to {@link String#trim()}. + */ + @CheckReturnValue + public String trimFrom(CharSequence sequence) { + int len = sequence.length(); + int first; + int last; + + for (first = 0; first < len; first++) { + if (!matches(sequence.charAt(first))) { + break; + } + } + for (last = len - 1; last > first; last--) { + if (!matches(sequence.charAt(last))) { + break; + } + } + + return sequence.subSequence(first, last + 1).toString(); + } + + /** + * Returns a substring of the input character sequence that omits all characters this matcher + * matches from the beginning of the string. For example:
 {@code
+   *
+   *   CharMatcher.anyOf("ab").trimLeadingFrom("abacatbab")}
+ * + * ... returns {@code "catbab"}. + */ + @CheckReturnValue + public String trimLeadingFrom(CharSequence sequence) { + int len = sequence.length(); + int first; + + for (first = 0; first < len; first++) { + if (!matches(sequence.charAt(first))) { + break; + } + } + + return sequence.subSequence(first, len).toString(); + } + + /** + * Returns a substring of the input character sequence that omits all characters this matcher + * matches from the end of the string. For example:
 {@code
+   *
+   *   CharMatcher.anyOf("ab").trimTrailingFrom("abacatbab")}
+ * + * ... returns {@code "abacat"}. + */ + @CheckReturnValue + public String trimTrailingFrom(CharSequence sequence) { + int len = sequence.length(); + int last; + + for (last = len - 1; last >= 0; last--) { + if (!matches(sequence.charAt(last))) { + break; + } + } + + return sequence.subSequence(0, last + 1).toString(); + } + + /** + * Returns a string copy of the input character sequence, with each group of consecutive + * characters that match this matcher replaced by a single replacement character. For example: + *
   {@code
+   *
+   *   CharMatcher.anyOf("eko").collapseFrom("bookkeeper", '-')}
+ * + * ... returns {@code "b-p-r"}. + * + *

The default implementation uses {@link #indexIn(CharSequence)} to find the first matching + * character, then iterates the remainder of the sequence calling {@link #matches(char)} for each + * character. + * + * @param sequence the character sequence to replace matching groups of characters in + * @param replacement the character to append to the result string in place of each group of + * matching characters in {@code sequence} + * @return the new string + */ + @CheckReturnValue + public String collapseFrom(CharSequence sequence, char replacement) { + int first = indexIn(sequence); + if (first == -1) { + return sequence.toString(); + } + + // TODO(kevinb): see if this implementation can be made faster + StringBuilder builder = new StringBuilder(sequence.length()) + .append(sequence.subSequence(0, first)) + .append(replacement); + boolean in = true; + for (int i = first + 1; i < sequence.length(); i++) { + char c = sequence.charAt(i); + if (apply(c)) { + if (!in) { + builder.append(replacement); + in = true; + } + } else { + builder.append(c); + in = false; + } + } + return builder.toString(); + } + + /** + * Collapses groups of matching characters exactly as {@link #collapseFrom} does, except that + * groups of matching characters at the start or end of the sequence are removed without + * replacement. + */ + @CheckReturnValue + public String trimAndCollapseFrom(CharSequence sequence, char replacement) { + int first = negate().indexIn(sequence); + if (first == -1) { + return ""; // everything matches. nothing's left. + } + StringBuilder builder = new StringBuilder(sequence.length()); + boolean inMatchingGroup = false; + for (int i = first; i < sequence.length(); i++) { + char c = sequence.charAt(i); + if (apply(c)) { + inMatchingGroup = true; + } else { + if (inMatchingGroup) { + builder.append(replacement); + inMatchingGroup = false; + } + builder.append(c); + } + } + return builder.toString(); + } + + // Predicate interface + + /** + * Returns {@code true} if this matcher matches the given character. + * + * @throws NullPointerException if {@code character} is null + */ + @Override public boolean apply(Character character) { + return matches(character); + } +} diff --git a/java/hive/src/main/java/com/google/common/base/Objects.java b/java/hive/src/main/java/com/google/common/base/Objects.java new file mode 100644 index 00000000..6e49f6d7 --- /dev/null +++ b/java/hive/src/main/java/com/google/common/base/Objects.java @@ -0,0 +1,236 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.base; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.GwtCompatible; + +import groovy.model.ValueHolder; + +import java.util.StringJoiner; +import java.util.Arrays; + +import javax.annotation.Nullable; + +/** + * Helper functions that can operate on any {@code Object}. + * + * @author Laurence Gonsalves + * @since 2.0 (imported from Google Collections Library) + */ +@GwtCompatible +public final class Objects { + private Objects() {} + + /** + * Determines whether two possibly-null objects are equal. Returns: + * + *

    + *
  • {@code true} if {@code a} and {@code b} are both null. + *
  • {@code true} if {@code a} and {@code b} are both non-null and they are + * equal according to {@link Object#equals(Object)}. + *
  • {@code false} in all other situations. + *
+ * + *

This assumes that any non-null objects passed to this function conform + * to the {@code equals()} contract. + */ + public static boolean equal(@Nullable Object a, @Nullable Object b) { + return a == b || (a != null && a.equals(b)); + } + + public static int hashCode(@Nullable Object... objects) { + return Arrays.hashCode(objects); + } + + /** + * Creates an instance of {@link ToStringHelper}. + * + *

This is helpful for implementing {@link Object#toString()}. + * Specification by example:

   {@code
+   *   // Returns "ClassName{}"
+   *   Objects.toStringHelper(this)
+   *       .toString();
+   *
+   *   // Returns "ClassName{x=1}"
+   *   Objects.toStringHelper(this)
+   *       .add("x", 1)
+   *       .toString();
+   *
+   *   // Returns "MyObject{x=1}"
+   *   Objects.toStringHelper("MyObject")
+   *       .add("x", 1)
+   *       .toString();
+   *
+   *   // Returns "ClassName{x=1, y=foo}"
+   *   Objects.toStringHelper(this)
+   *       .add("x", 1)
+   *       .add("y", "foo")
+   *       .toString();
+   *   }}
+ * + *

Note that in GWT, class names are often obfuscated. + * + * @param self the object to generate the string for (typically {@code this}), + * used only for its class name + * @since 2.0 + */ + public static ToStringHelper toStringHelper(Object self) { + return new ToStringHelper(simpleName(self.getClass())); + } + + /** + * {@link Class#getSimpleName()} is not GWT compatible yet, so we + * provide our own implementation. + */ + private static String simpleName(Class clazz) { + String name = clazz.getName(); + + // the nth anonymous class has a class name ending in "Outer$n" + // and local inner classes have names ending in "Outer.$1Inner" + name = name.replaceAll("\\$[0-9]+", "\\$"); + + // we want the name of the inner class all by its lonesome + int start = name.lastIndexOf('$'); + + // if this isn't an inner class, just find the start of the + // top level class name. + if (start == -1) { + start = name.lastIndexOf('.'); + } + return name.substring(start + 1); + } + + /** + * Support class for {@link Objects#toStringHelper}. + * + * @author Jason Lee + * @since 2.0 + */ + public static final class ToStringHelper { + private final StringBuilder builder; + private boolean needsSeparator = false; + + /** + * Use {@link Objects#toStringHelper(Object)} to create an instance. + */ + private ToStringHelper(String className) { + checkNotNull(className); + this.builder = new StringBuilder(32).append(className).append('{'); + } + + /** + * Returns a string in the format specified by {@link + * Objects#toStringHelper(Object)}. + */ + @Override public String toString() { + try { + return builder.append('}').toString(); + } finally { + // Slice off the closing brace in case there are additional calls to + // #add or #addValue. + builder.setLength(builder.length() - 1); + } + } + + public ToStringHelper add(String name, @Nullable Object value) { + checkNameAndAppend(name).append(value); + return this; + } + + /** + * Adds a name/value pair to the formatted output in {@code name=value} + * format. + * + * @since 11.0 (source-compatible since 2.0) + */ + public ToStringHelper add(String name, boolean value) { + checkNameAndAppend(name).append(value); + return this; + } + + /** + * Adds a name/value pair to the formatted output in {@code name=value} + * format. + * + * @since 11.0 (source-compatible since 2.0) + */ + public ToStringHelper add(String name, char value) { + checkNameAndAppend(name).append(value); + return this; + } + + /** + * Adds a name/value pair to the formatted output in {@code name=value} + * format. + * + * @since 11.0 (source-compatible since 2.0) + */ + public ToStringHelper add(String name, double value) { + checkNameAndAppend(name).append(value); + return this; + } + + /** + * Adds a name/value pair to the formatted output in {@code name=value} + * format. + * + * @since 11.0 (source-compatible since 2.0) + */ + public ToStringHelper add(String name, float value) { + checkNameAndAppend(name).append(value); + return this; + } + + /** + * Adds a name/value pair to the formatted output in {@code name=value} + * format. + * + * @since 11.0 (source-compatible since 2.0) + */ + public ToStringHelper add(String name, int value) { + checkNameAndAppend(name).append(value); + return this; + } + + /** + * Adds a name/value pair to the formatted output in {@code name=value} + * format. + * + * @since 11.0 (source-compatible since 2.0) + */ + public ToStringHelper add(String name, long value) { + checkNameAndAppend(name).append(value); + return this; + } + + private StringBuilder checkNameAndAppend(String name) { + checkNotNull(name); + return maybeAppendSeparator().append(name).append('='); + } + + private StringBuilder maybeAppendSeparator() { + if (needsSeparator) { + return builder.append(", "); + } else { + needsSeparator = true; + return builder; + } + } + } +} diff --git a/java/hive/src/main/java/com/google/common/base/Splitter.java b/java/hive/src/main/java/com/google/common/base/Splitter.java new file mode 100644 index 00000000..d3d62a12 --- /dev/null +++ b/java/hive/src/main/java/com/google/common/base/Splitter.java @@ -0,0 +1,539 @@ +/* + * Copyright (C) 2009 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.base; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.GwtCompatible; +import com.google.common.annotations.GwtIncompatible; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.CheckReturnValue; + +/** + * An object that divides strings (or other instances of {@code CharSequence}) + * into substrings, by recognizing a separator (a.k.a. "delimiter") + * which can be expressed as a single character, literal string, regular + * expression, {@code CharMatcher}, or by using a fixed substring length. This + * class provides the complementary functionality to {@link Joiner}. + * + *

Here is the most basic example of {@code Splitter} usage:

   {@code
+ *
+ *   Splitter.on(',').split("foo,bar")}
+ * + * This invocation returns an {@code Iterable} containing {@code "foo"} + * and {@code "bar"}, in that order. + * + *

By default {@code Splitter}'s behavior is very simplistic:

   {@code
+ *
+ *   Splitter.on(',').split("foo,,bar, quux")}
+ * + * This returns an iterable containing {@code ["foo", "", "bar", " quux"]}. + * Notice that the splitter does not assume that you want empty strings removed, + * or that you wish to trim whitespace. If you want features like these, simply + * ask for them:
 {@code
+ *
+ *   private static final Splitter MY_SPLITTER = Splitter.on(',')
+ *       .trimResults()
+ *       .omitEmptyStrings();}
+ * + * Now {@code MY_SPLITTER.split("foo, ,bar, quux,")} returns an iterable + * containing just {@code ["foo", "bar", "quux"]}. Note that the order in which + * the configuration methods are called is never significant; for instance, + * trimming is always applied first before checking for an empty result, + * regardless of the order in which the {@link #trimResults()} and + * {@link #omitEmptyStrings()} methods were invoked. + * + *

Warning: splitter instances are always immutable; a configuration + * method such as {@code omitEmptyStrings} has no effect on the instance it + * is invoked on! You must store and use the new splitter instance returned by + * the method. This makes splitters thread-safe, and safe to store as {@code + * static final} constants (as illustrated above).

   {@code
+ *
+ *   // Bad! Do not do this!
+ *   Splitter splitter = Splitter.on('/');
+ *   splitter.trimResults(); // does nothing!
+ *   return splitter.split("wrong / wrong / wrong");}
+ * + * The separator recognized by the splitter does not have to be a single + * literal character as in the examples above. See the methods {@link + * #on(String)}, {@link #on(Pattern)} and {@link #on(CharMatcher)} for examples + * of other ways to specify separators. + * + *

Note: this class does not mimic any of the quirky behaviors of + * similar JDK methods; for instance, it does not silently discard trailing + * separators, as does {@link String#split(String)}, nor does it have a default + * behavior of using five particular whitespace characters as separators, like + * {@link java.util.StringTokenizer}. + * + * @author Julien Silland + * @author Jesse Wilson + * @author Kevin Bourrillion + * @author Louis Wasserman + * @since 1.0 + */ +@GwtCompatible(emulated = true) +public final class Splitter { + private final CharMatcher trimmer; + private final boolean omitEmptyStrings; + private final Strategy strategy; + private final int limit; + + private Splitter(Strategy strategy) { + this(strategy, false, CharMatcher.NONE, Integer.MAX_VALUE); + } + + private Splitter(Strategy strategy, boolean omitEmptyStrings, + CharMatcher trimmer, int limit) { + this.strategy = strategy; + this.omitEmptyStrings = omitEmptyStrings; + this.trimmer = trimmer; + this.limit = limit; + } + + /** + * Returns a splitter that uses the given single-character separator. For + * example, {@code Splitter.on(',').split("foo,,bar")} returns an iterable + * containing {@code ["foo", "", "bar"]}. + * + * @param separator the character to recognize as a separator + * @return a splitter, with default settings, that recognizes that separator + */ + public static Splitter on(char separator) { + return on(CharMatcher.is(separator)); + } + + /** + * Returns a splitter that considers any single character matched by the + * given {@code CharMatcher} to be a separator. For example, {@code + * Splitter.on(CharMatcher.anyOf(";,")).split("foo,;bar,quux")} returns an + * iterable containing {@code ["foo", "", "bar", "quux"]}. + * + * @param separatorMatcher a {@link CharMatcher} that determines whether a + * character is a separator + * @return a splitter, with default settings, that uses this matcher + */ + public static Splitter on(final CharMatcher separatorMatcher) { + checkNotNull(separatorMatcher); + + return new Splitter(new Strategy() { + @Override public SplittingIterator iterator( + Splitter splitter, final CharSequence toSplit) { + return new SplittingIterator(splitter, toSplit) { + @Override int separatorStart(int start) { + return separatorMatcher.indexIn(toSplit, start); + } + + @Override int separatorEnd(int separatorPosition) { + return separatorPosition + 1; + } + }; + } + }); + } + + /** + * Returns a splitter that uses the given fixed string as a separator. For + * example, {@code Splitter.on(", ").split("foo, bar, baz,qux")} returns an + * iterable containing {@code ["foo", "bar", "baz,qux"]}. + * + * @param separator the literal, nonempty string to recognize as a separator + * @return a splitter, with default settings, that recognizes that separator + */ + public static Splitter on(final String separator) { + checkArgument(separator.length() != 0, + "The separator may not be the empty string."); + + return new Splitter(new Strategy() { + @Override public SplittingIterator iterator( + Splitter splitter, CharSequence toSplit) { + return new SplittingIterator(splitter, toSplit) { + @Override public int separatorStart(int start) { + int delimeterLength = separator.length(); + + positions: + for (int p = start, last = toSplit.length() - delimeterLength; + p <= last; p++) { + for (int i = 0; i < delimeterLength; i++) { + if (toSplit.charAt(i + p) != separator.charAt(i)) { + continue positions; + } + } + return p; + } + return -1; + } + + @Override public int separatorEnd(int separatorPosition) { + return separatorPosition + separator.length(); + } + }; + } + }); + } + + /** + * Returns a splitter that considers any subsequence matching {@code + * pattern} to be a separator. For example, {@code + * Splitter.on(Pattern.compile("\r?\n")).split(entireFile)} splits a string + * into lines whether it uses DOS-style or UNIX-style line terminators. + * + * @param separatorPattern the pattern that determines whether a subsequence + * is a separator. This pattern may not match the empty string. + * @return a splitter, with default settings, that uses this pattern + * @throws IllegalArgumentException if {@code separatorPattern} matches the + * empty string + */ + @GwtIncompatible("java.util.regex") + public static Splitter on(final Pattern separatorPattern) { + checkNotNull(separatorPattern); + checkArgument(!separatorPattern.matcher("").matches(), + "The pattern may not match the empty string: %s", separatorPattern); + + return new Splitter(new Strategy() { + @Override public SplittingIterator iterator( + final Splitter splitter, CharSequence toSplit) { + final Matcher matcher = separatorPattern.matcher(toSplit); + return new SplittingIterator(splitter, toSplit) { + @Override public int separatorStart(int start) { + return matcher.find(start) ? matcher.start() : -1; + } + + @Override public int separatorEnd(int separatorPosition) { + return matcher.end(); + } + }; + } + }); + } + + /** + * Returns a splitter that considers any subsequence matching a given + * pattern (regular expression) to be a separator. For example, {@code + * Splitter.onPattern("\r?\n").split(entireFile)} splits a string into lines + * whether it uses DOS-style or UNIX-style line terminators. This is + * equivalent to {@code Splitter.on(Pattern.compile(pattern))}. + * + * @param separatorPattern the pattern that determines whether a subsequence + * is a separator. This pattern may not match the empty string. + * @return a splitter, with default settings, that uses this pattern + * @throws java.util.regex.PatternSyntaxException if {@code separatorPattern} + * is a malformed expression + * @throws IllegalArgumentException if {@code separatorPattern} matches the + * empty string + */ + @GwtIncompatible("java.util.regex") + public static Splitter onPattern(String separatorPattern) { + return on(Pattern.compile(separatorPattern)); + } + + /** + * Returns a splitter that divides strings into pieces of the given length. + * For example, {@code Splitter.fixedLength(2).split("abcde")} returns an + * iterable containing {@code ["ab", "cd", "e"]}. The last piece can be + * smaller than {@code length} but will never be empty. + * + * @param length the desired length of pieces after splitting + * @return a splitter, with default settings, that can split into fixed sized + * pieces + */ + public static Splitter fixedLength(final int length) { + checkArgument(length > 0, "The length may not be less than 1"); + + return new Splitter(new Strategy() { + @Override public SplittingIterator iterator( + final Splitter splitter, CharSequence toSplit) { + return new SplittingIterator(splitter, toSplit) { + @Override public int separatorStart(int start) { + int nextChunkStart = start + length; + return (nextChunkStart < toSplit.length() ? nextChunkStart : -1); + } + + @Override public int separatorEnd(int separatorPosition) { + return separatorPosition; + } + }; + } + }); + } + + /** + * Returns a splitter that behaves equivalently to {@code this} splitter, but + * automatically omits empty strings from the results. For example, {@code + * Splitter.on(',').omitEmptyStrings().split(",a,,,b,c,,")} returns an + * iterable containing only {@code ["a", "b", "c"]}. + * + *

If either {@code trimResults} option is also specified when creating a + * splitter, that splitter always trims results first before checking for + * emptiness. So, for example, {@code + * Splitter.on(':').omitEmptyStrings().trimResults().split(": : : ")} returns + * an empty iterable. + * + *

Note that it is ordinarily not possible for {@link #split(CharSequence)} + * to return an empty iterable, but when using this option, it can (if the + * input sequence consists of nothing but separators). + * + * @return a splitter with the desired configuration + */ + @CheckReturnValue + public Splitter omitEmptyStrings() { + return new Splitter(strategy, true, trimmer, limit); + } + + /** + * Returns a splitter that behaves equivalently to {@code this} splitter but + * stops splitting after it reaches the limit. + * The limit defines the maximum number of items returned by the iterator. + * + *

For example, + * {@code Splitter.on(',').limit(3).split("a,b,c,d")} returns an iterable + * containing {@code ["a", "b", "c,d"]}. When omitting empty strings, the + * omitted strings do no count. Hence, + * {@code Splitter.on(',').limit(3).omitEmptyStrings().split("a,,,b,,,c,d")} + * returns an iterable containing {@code ["a", "b", "c,d"}. + * When trim is requested, all entries, including the last are trimmed. Hence + * {@code Splitter.on(',').limit(3).trimResults().split(" a , b , c , d ")} + * results in @{code ["a", "b", "c , d"]}. + * + * @param limit the maximum number of items returns + * @return a splitter with the desired configuration + * @since 9.0 + */ + @CheckReturnValue + public Splitter limit(int limit) { + checkArgument(limit > 0, "must be greater than zero: %s", limit); + return new Splitter(strategy, omitEmptyStrings, trimmer, limit); + } + + /** + * Returns a splitter that behaves equivalently to {@code this} splitter, but + * automatically removes leading and trailing {@linkplain + * CharMatcher#WHITESPACE whitespace} from each returned substring; equivalent + * to {@code trimResults(CharMatcher.WHITESPACE)}. For example, {@code + * Splitter.on(',').trimResults().split(" a, b ,c ")} returns an iterable + * containing {@code ["a", "b", "c"]}. + * + * @return a splitter with the desired configuration + */ + @CheckReturnValue + public Splitter trimResults() { + return trimResults(CharMatcher.WHITESPACE); + } + + /** + * Returns a splitter that behaves equivalently to {@code this} splitter, but + * removes all leading or trailing characters matching the given {@code + * CharMatcher} from each returned substring. For example, {@code + * Splitter.on(',').trimResults(CharMatcher.is('_')).split("_a ,_b_ ,c__")} + * returns an iterable containing {@code ["a ", "b_ ", "c"]}. + * + * @param trimmer a {@link CharMatcher} that determines whether a character + * should be removed from the beginning/end of a subsequence + * @return a splitter with the desired configuration + */ + // TODO(kevinb): throw if a trimmer was already specified! + @CheckReturnValue + public Splitter trimResults(CharMatcher trimmer) { + checkNotNull(trimmer); + return new Splitter(strategy, omitEmptyStrings, trimmer, limit); + } + + /** + * Splits {@code sequence} into string components and makes them available + * through an {@link Iterator}, which may be lazily evaluated. + * + * @param sequence the sequence of characters to split + * @return an iteration over the segments split from the parameter. + */ + public Iterable split(final CharSequence sequence) { + checkNotNull(sequence); + + return new Iterable() { + @Override public Iterator iterator() { + return spliterator1(sequence); + } + }; + } + + private Iterator spliterator1(CharSequence sequence) { + return strategy.iterator(this, sequence); + } + + /** + * Returns a {@code MapSplitter} which splits entries based on this splitter, + * and splits entries into keys and values using the specified separator. + * + * @since 10.0 + */ + @CheckReturnValue + @Beta + public MapSplitter withKeyValueSeparator(String separator) { + return withKeyValueSeparator(on(separator)); + } + + /** + * Returns a {@code MapSplitter} which splits entries based on this splitter, + * and splits entries into keys and values using the specified key-value + * splitter. + * + * @since 10.0 + */ + @CheckReturnValue + @Beta + public MapSplitter withKeyValueSeparator(Splitter keyValueSplitter) { + return new MapSplitter(this, keyValueSplitter); + } + + /** + * An object that splits strings into maps as {@code Splitter} splits + * iterables and lists. Like {@code Splitter}, it is thread-safe and + * immutable. + * + * @since 10.0 + */ + @Beta + public static final class MapSplitter { + private static final String INVALID_ENTRY_MESSAGE = + "Chunk [%s] is not a valid entry"; + private final Splitter outerSplitter; + private final Splitter entrySplitter; + + private MapSplitter(Splitter outerSplitter, Splitter entrySplitter) { + this.outerSplitter = outerSplitter; // only "this" is passed + this.entrySplitter = checkNotNull(entrySplitter); + } + + /** + * Splits {@code sequence} into substrings, splits each substring into + * an entry, and returns an unmodifiable map with each of the entries. For + * example, + * Splitter.on(';').trimResults().withKeyValueSeparator("=>") + * .split("a=>b ; c=>b") + * will return a mapping from {@code "a"} to {@code "b"} and + * {@code "c"} to {@code b}. + * + *

The returned map preserves the order of the entries from + * {@code sequence}. + * + * @throws IllegalArgumentException if the specified sequence does not split + * into valid map entries, or if there are duplicate keys + */ + public Map split(CharSequence sequence) { + Map map = new LinkedHashMap(); + for (String entry : outerSplitter.split(sequence)) { + Iterator entryFields = entrySplitter.spliterator1(entry); + + checkArgument(entryFields.hasNext(), INVALID_ENTRY_MESSAGE, entry); + String key = entryFields.next(); + checkArgument(!map.containsKey(key), "Duplicate key [%s] found.", key); + + checkArgument(entryFields.hasNext(), INVALID_ENTRY_MESSAGE, entry); + String value = entryFields.next(); + map.put(key, value); + + checkArgument(!entryFields.hasNext(), INVALID_ENTRY_MESSAGE, entry); + } + return Collections.unmodifiableMap(map); + } + } + + private interface Strategy { + Iterator iterator(Splitter splitter, CharSequence toSplit); + } + + private abstract static class SplittingIterator + extends AbstractIterator { + final CharSequence toSplit; + final CharMatcher trimmer; + final boolean omitEmptyStrings; + + /** + * Returns the first index in {@code toSplit} at or after {@code start} + * that contains the separator. + */ + abstract int separatorStart(int start); + + /** + * Returns the first index in {@code toSplit} after {@code + * separatorPosition} that does not contain a separator. This method is only + * invoked after a call to {@code separatorStart}. + */ + abstract int separatorEnd(int separatorPosition); + + int offset = 0; + int limit; + + protected SplittingIterator(Splitter splitter, CharSequence toSplit) { + this.trimmer = splitter.trimmer; + this.omitEmptyStrings = splitter.omitEmptyStrings; + this.limit = splitter.limit; + this.toSplit = toSplit; + } + + @Override protected String computeNext() { + while (offset != -1) { + int start = offset; + int end; + + int separatorPosition = separatorStart(offset); + if (separatorPosition == -1) { + end = toSplit.length(); + offset = -1; + } else { + end = separatorPosition; + offset = separatorEnd(separatorPosition); + } + + while (start < end && trimmer.matches(toSplit.charAt(start))) { + start++; + } + while (end > start && trimmer.matches(toSplit.charAt(end - 1))) { + end--; + } + + if (omitEmptyStrings && start == end) { + continue; + } + + if (limit == 1) { + // The limit has been reached, return the rest of the string as the + // final item. This is tested after empty string removal so that + // empty strings do not count towards the limit. + end = toSplit.length(); + offset = -1; + // Since we may have changed the end, we need to trim it again. + while (end > start && trimmer.matches(toSplit.charAt(end - 1))) { + end--; + } + } else { + limit--; + } + + return toSplit.subSequence(start, end).toString(); + } + return endOfData(); + } + } +} diff --git a/java/hive/src/main/java/com/google/common/util/concurrent/AbstractCheckedFuture.java b/java/hive/src/main/java/com/google/common/util/concurrent/AbstractCheckedFuture.java new file mode 100644 index 00000000..dfaf3435 --- /dev/null +++ b/java/hive/src/main/java/com/google/common/util/concurrent/AbstractCheckedFuture.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2008 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.util.concurrent; + +import com.google.common.annotations.Beta; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A delegating wrapper around a {@link ListenableFuture} that adds support for + * the {@link #checkedGet()} and {@link #checkedGet(long, TimeUnit)} methods. + * + * @author Sven Mawson + * @since 1.0 + */ +@Beta +public abstract class AbstractCheckedFuture + extends ForwardingListenableFuture.SimpleForwardingListenableFuture + implements CheckedFuture { + /** + * Constructs an {@code AbstractCheckedFuture} that wraps a delegate. + */ + protected AbstractCheckedFuture(ListenableFuture delegate) { + super(delegate); + } + + /** + * Translates from an {@link InterruptedException}, + * {@link CancellationException} or {@link ExecutionException} thrown by + * {@code get} to an exception of type {@code X} to be thrown by + * {@code checkedGet}. Subclasses must implement this method. + * + *

If {@code e} is an {@code InterruptedException}, the calling + * {@code checkedGet} method has already restored the interrupt after catching + * the exception. If an implementation of {@link #mapException(Exception)} + * wishes to swallow the interrupt, it can do so by calling + * {@link Thread#interrupted()}. + * + *

Subclasses may choose to throw, rather than return, a subclass of + * {@code RuntimeException} to allow creating a CheckedFuture that throws + * both checked and unchecked exceptions. + */ + protected abstract X mapException(Exception e); + + /** + * {@inheritDoc} + * + *

This implementation calls {@link #get()} and maps that method's standard + * exceptions to instances of type {@code X} using {@link #mapException}. + * + *

In addition, if {@code get} throws an {@link InterruptedException}, this + * implementation will set the current thread's interrupt status before + * calling {@code mapException}. + * + * @throws X if {@link #get()} throws an {@link InterruptedException}, + * {@link CancellationException}, or {@link ExecutionException} + */ + @Override + public V checkedGet() throws X { + try { + return get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw mapException(e); + } catch (CancellationException e) { + throw mapException(e); + } catch (ExecutionException e) { + throw mapException(e); + } + } + + /** + * {@inheritDoc} + * + *

This implementation calls {@link #get(long, TimeUnit)} and maps that + * method's standard exceptions (excluding {@link TimeoutException}, which is + * propagated) to instances of type {@code X} using {@link #mapException}. + * + *

In addition, if {@code get} throws an {@link InterruptedException}, this + * implementation will set the current thread's interrupt status before + * calling {@code mapException}. + * + * @throws X if {@link #get()} throws an {@link InterruptedException}, + * {@link CancellationException}, or {@link ExecutionException} + * @throws TimeoutException {@inheritDoc} + */ + @Override + public V checkedGet(long timeout, TimeUnit unit) throws TimeoutException, X { + try { + return get(timeout, unit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw mapException(e); + } catch (CancellationException e) { + throw mapException(e); + } catch (ExecutionException e) { + throw mapException(e); + } + } +} diff --git a/java/hive/src/main/java/com/google/common/util/concurrent/CheckedFuture.java b/java/hive/src/main/java/com/google/common/util/concurrent/CheckedFuture.java new file mode 100644 index 00000000..31504e2b --- /dev/null +++ b/java/hive/src/main/java/com/google/common/util/concurrent/CheckedFuture.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2008 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.util.concurrent; + +import com.google.common.annotations.Beta; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A {@code CheckedFuture} is a {@link ListenableFuture} that includes versions + * of the {@code get} methods that can throw a checked exception. This makes it + * easier to create a future that executes logic which can throw an exception. + * + *

A common implementation is {@link Futures#immediateCheckedFuture}. + * + *

Implementations of this interface must adapt the exceptions thrown by + * {@code Future#get()}: {@link CancellationException}, + * {@link ExecutionException} and {@link InterruptedException} into the type + * specified by the {@code E} type parameter. + * + *

This interface also extends the ListenableFuture interface to allow + * listeners to be added. This allows the future to be used as a normal + * {@link Future} or as an asynchronous callback mechanism as needed. This + * allows multiple callbacks to be registered for a particular task, and the + * future will guarantee execution of all listeners when the task completes. + * + *

For a simpler alternative to CheckedFuture, consider accessing Future + * values with {@link Futures#get(Future, Class) Futures.get()}. + * + * @author Sven Mawson + * @since 1.0 + */ +@Beta +public interface CheckedFuture + extends ListenableFuture { + + /** + * Exception checking version of {@link Future#get()} that will translate + * {@link InterruptedException}, {@link CancellationException} and + * {@link ExecutionException} into application-specific exceptions. + * + * @return the result of executing the future. + * @throws X on interruption, cancellation or execution exceptions. + */ + V checkedGet() throws X; + + /** + * Exception checking version of {@link Future#get(long, TimeUnit)} that will + * translate {@link InterruptedException}, {@link CancellationException} and + * {@link ExecutionException} into application-specific exceptions. On + * timeout this method throws a normal {@link TimeoutException}. + * + * @return the result of executing the future. + * @throws TimeoutException if retrieving the result timed out. + * @throws X on interruption, cancellation or execution exceptions. + */ + V checkedGet(long timeout, TimeUnit unit) throws TimeoutException, X; +} diff --git a/java/hive/src/main/java/com/google/common/util/concurrent/Futures.java b/java/hive/src/main/java/com/google/common/util/concurrent/Futures.java new file mode 100644 index 00000000..57a5ca89 --- /dev/null +++ b/java/hive/src/main/java/com/google/common/util/concurrent/Futures.java @@ -0,0 +1,1464 @@ +/* + * Copyright (C) 2006 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.util.concurrent; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; +import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; +import static java.lang.Thread.currentThread; +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nullable; + +/** + * Static utility methods pertaining to the {@link Future} interface. + * + * @author Kevin Bourrillion + * @author Nishant Thakkar + * @author Sven Mawson + * @since 1.0 + */ +@Beta +public final class Futures { + private Futures() {} + + /** + * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} + * and a {@link Function} that maps from {@link Exception} instances into the + * appropriate checked type. + * + *

The given mapping function will be applied to an + * {@link InterruptedException}, a {@link CancellationException}, or an + * {@link ExecutionException} with the actual cause of the exception. + * See {@link Future#get()} for details on the exceptions thrown. + * + * @since 9.0 (source-compatible since 1.0) + */ + public static CheckedFuture makeChecked( + ListenableFuture future, Function mapper) { + return new MappingCheckedFuture(checkNotNull(future), mapper); + } + + /** + * Creates a {@code ListenableFuture} which has its value set immediately upon + * construction. The getters just return the value. This {@code Future} can't + * be canceled or timed out and its {@code isDone()} method always returns + * {@code true}. + */ + public static ListenableFuture immediateFuture(@Nullable V value) { + SettableFuture future = SettableFuture.create(); + future.set(value); + return future; + } + + /** + * Returns a {@code CheckedFuture} which has its value set immediately upon + * construction. + * + *

The returned {@code Future} can't be cancelled, and its {@code isDone()} + * method always returns {@code true}. Calling {@code get()} or {@code + * checkedGet()} will immediately return the provided value. + */ + public static CheckedFuture + immediateCheckedFuture(@Nullable V value) { + SettableFuture future = SettableFuture.create(); + future.set(value); + return Futures.makeChecked(future, new Function() { + @Override + public X apply(Exception e) { + throw new AssertionError("impossible"); + } + }); + } + + /** + * Returns a {@code ListenableFuture} which has an exception set immediately + * upon construction. + * + *

The returned {@code Future} can't be cancelled, and its {@code isDone()} + * method always returns {@code true}. Calling {@code get()} will immediately + * throw the provided {@code Throwable} wrapped in an {@code + * ExecutionException}. + * + * @throws Error if the throwable is an {@link Error}. + */ + public static ListenableFuture immediateFailedFuture( + Throwable throwable) { + checkNotNull(throwable); + SettableFuture future = SettableFuture.create(); + future.setException(throwable); + return future; + } + + /** + * Returns a {@code CheckedFuture} which has an exception set immediately upon + * construction. + * + *

The returned {@code Future} can't be cancelled, and its {@code isDone()} + * method always returns {@code true}. Calling {@code get()} will immediately + * throw the provided {@code Throwable} wrapped in an {@code + * ExecutionException}, and calling {@code checkedGet()} will throw the + * provided exception itself. + * + * @throws Error if the throwable is an {@link Error}. + */ + public static CheckedFuture + immediateFailedCheckedFuture(final X exception) { + checkNotNull(exception); + return makeChecked(Futures.immediateFailedFuture(exception), + new Function() { + @Override + public X apply(Exception e) { + return exception; + } + }); + } + + /** + *

Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code Function} to the result of the original {@code + * Future}. Example: + * + *

   {@code
+   *   ListenableFuture rowKeyFuture = indexService.lookUp(query);
+   *   Function> queryFunction =
+   *       new Function>() {
+   *         public ListenableFuture apply(RowKey rowKey) {
+   *           return dataService.read(rowKey);
+   *         }
+   *       };
+   *   ListenableFuture queryFuture =
+   *       chain(rowKeyFuture, queryFunction);
+   * }
+ * + *

Note: This overload of {@code chain} is designed for cases in which the + * work of creating the derived future is fast and lightweight, as the method + * does not accept an {@code Executor} in which to perform the the work. For + * heavier derivations, this overload carries some caveats: First, the thread + * that the derivation runs in depends on whether the input {@code Future} is + * done at the time {@code chain} is called. In particular, if called late, + * {@code chain} will run the derivation in the thread that called {@code + * chain}. Second, derivations may run in an internal thread of the system + * responsible for the input {@code Future}, such as an RPC network thread. + * Finally, during the execution of a {@code sameThreadExecutor} {@code + * chain} function, all other registered but unexecuted listeners are + * prevented from running, even if those listeners are to run in other + * executors. + * + *

The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * chain function. That is, if the returned {@code Future} is cancelled, it + * will attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. + * + * @param input The future to chain + * @param function A function to chain the results of the provided future + * to the results of the returned future. This will be run in the thread + * that notifies input it is complete. + * @return A future that holds result of the chain. + * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and + * use {@link #transform(ListenableFuture, AsyncFunction)}. This method is + * scheduled to be removed from Guava in Guava release 12.0. + */ + @Deprecated + public static ListenableFuture chain( + ListenableFuture input, + Function> function) { + return chain(input, function, MoreExecutors.sameThreadExecutor()); + } + + /** + *

Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code Function} to the result of the original {@code + * Future}. Example: + * + *

   {@code
+   *   ListenableFuture rowKeyFuture = indexService.lookUp(query);
+   *   Function> queryFunction =
+   *       new Function>() {
+   *         public ListenableFuture apply(RowKey rowKey) {
+   *           return dataService.read(rowKey);
+   *         }
+   *       };
+   *   ListenableFuture queryFuture =
+   *       chain(rowKeyFuture, queryFunction, executor);
+   * }
+ * + *

The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * chain function. That is, if the returned {@code Future} is cancelled, it + * will attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. + * + *

Note: For cases in which the work of creating the derived future is + * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture, + * Function) the other overload} or explicit use of {@code + * sameThreadExecutor}. For heavier derivations, this choice carries some + * caveats: First, the thread that the derivation runs in depends on whether + * the input {@code Future} is done at the time {@code chain} is called. In + * particular, if called late, {@code chain} will run the derivation in the + * thread that called {@code chain}. Second, derivations may run in an + * internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} {@code chain} function, all other registered but + * unexecuted listeners are prevented from running, even if those listeners + * are to run in other executors. + * + * @param input The future to chain + * @param function A function to chain the results of the provided future + * to the results of the returned future. + * @param executor Executor to run the function in. + * @return A future that holds result of the chain. + * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and + * use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This + * method is scheduled to be removed from Guava in Guava release 12.0. + */ + @Deprecated + public static ListenableFuture chain(ListenableFuture input, + final Function> + function, + Executor executor) { + checkNotNull(function); + ChainingListenableFuture chain = + new ChainingListenableFuture(new AsyncFunction() { + @Override + /* + * All methods of ListenableFuture are covariant, and we don't expose + * the object anywhere that would allow it to be downcast. + */ + @SuppressWarnings("unchecked") + public ListenableFuture apply(I input) { + return (ListenableFuture) function.apply(input); + } + }, input); + input.addListener(chain, executor); + return chain; + } + + /** + * Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code AsyncFunction} to the result of the original + * {@code Future}. Example: + * + *

   {@code
+   *   ListenableFuture rowKeyFuture = indexService.lookUp(query);
+   *   AsyncFunction queryFunction =
+   *       new AsyncFunction() {
+   *         public ListenableFuture apply(RowKey rowKey) {
+   *           return dataService.read(rowKey);
+   *         }
+   *       };
+   *   ListenableFuture queryFuture =
+   *       transform(rowKeyFuture, queryFunction);
+   * }
+ * + *

Note: This overload of {@code transform} is designed for cases in which + * the work of creating the derived {@code Future} is fast and lightweight, + * as the method does not accept an {@code Executor} in which to perform the + * the work. (The created {@code Future} itself need not complete quickly.) + * For heavier operations, this overload carries some caveats: First, the + * thread that {@code function.apply} runs in depends on whether the input + * {@code Future} is done at the time {@code transform} is called. In + * particular, if called late, {@code transform} will run the operation in + * the thread that called {@code transform}. Second, {@code function.apply} + * may run in an internal thread of the system responsible for the input + * {@code Future}, such as an RPC network thread. Finally, during the + * execution of a {@code sameThreadExecutor} {@code function.apply}, all + * other registered but unexecuted listeners are prevented from running, even + * if those listeners are to run in other executors. + * + *

The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * function. That is, if the returned {@code Future} is cancelled, it will + * attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. + * + * @param input The future to transform + * @param function A function to transform the result of the input future + * to the result of the output future + * @return A future that holds result of the function (if the input succeeded) + * or the original input's failure (if not) + * @since 11.0 + */ + public static ListenableFuture transform(ListenableFuture input, + AsyncFunction function) { + return transform(input, function, MoreExecutors.sameThreadExecutor()); + } + + /** + * Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code AsyncFunction} to the result of the original + * {@code Future}. Example: + * + *

   {@code
+   *   ListenableFuture rowKeyFuture = indexService.lookUp(query);
+   *   AsyncFunction queryFunction =
+   *       new AsyncFunction() {
+   *         public ListenableFuture apply(RowKey rowKey) {
+   *           return dataService.read(rowKey);
+   *         }
+   *       };
+   *   ListenableFuture queryFuture =
+   *       transform(rowKeyFuture, queryFunction, executor);
+   * }
+ * + *

The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * chain function. That is, if the returned {@code Future} is cancelled, it + * will attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. + * + *

Note: For cases in which the work of creating the derived future is + * fast and lightweight, consider {@linkplain + * Futures#transform(ListenableFuture, Function) the other overload} or + * explicit use of {@code sameThreadExecutor}. For heavier derivations, this + * choice carries some caveats: First, the thread that {@code function.apply} + * runs in depends on whether the input {@code Future} is done at the time + * {@code transform} is called. In particular, if called late, {@code + * transform} will run the operation in the thread that called {@code + * transform}. Second, {@code function.apply} may run in an internal thread + * of the system responsible for the input {@code Future}, such as an RPC + * network thread. Finally, during the execution of a {@code + * sameThreadExecutor} {@code function.apply}, all other registered but + * unexecuted listeners are prevented from running, even if those listeners + * are to run in other executors. + * + * @param input The future to transform + * @param function A function to transform the result of the input future + * to the result of the output future + * @param executor Executor to run the function in. + * @return A future that holds result of the function (if the input succeeded) + * or the original input's failure (if not) + * @since 11.0 + */ + public static ListenableFuture transform(ListenableFuture input, + AsyncFunction function, + Executor executor) { + ChainingListenableFuture output = + new ChainingListenableFuture(function, input); + input.addListener(output, executor); + return output; + } + + /** + * Returns a new {@code ListenableFuture} whose result is the product of + * applying the given {@code Function} to the result of the given {@code + * Future}. Example: + * + *

   {@code
+   *   ListenableFuture queryFuture = ...;
+   *   Function> rowsFunction =
+   *       new Function>() {
+   *         public List apply(QueryResult queryResult) {
+   *           return queryResult.getRows();
+   *         }
+   *       };
+   *   ListenableFuture> rowsFuture =
+   *       transform(queryFuture, rowsFunction);
+   * }
+ * + *

Note: This overload of {@code transform} is designed for cases in which + * the transformation is fast and lightweight, as the method does not accept + * an {@code Executor} in which to perform the the work. For heavier + * transformations, this overload carries some caveats: First, the thread + * that the transformation runs in depends on whether the input {@code + * Future} is done at the time {@code transform} is called. In particular, if + * called late, {@code transform} will perform the transformation in the + * thread that called {@code transform}. Second, transformations may run in + * an internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} transformation, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. + * + *

The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future. That is, if the returned {@code Future} + * is cancelled, it will attempt to cancel the input, and if the input is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. + * + *

An example use of this method is to convert a serializable object + * returned from an RPC into a POJO. + * + * @param future The future to transform + * @param function A Function to transform the results of the provided future + * to the results of the returned future. This will be run in the thread + * that notifies input it is complete. + * @return A future that holds result of the transformation. + * @since 9.0 (in 1.0 as {@code compose}) + */ + public static ListenableFuture transform(ListenableFuture future, + final Function function) { + return transform(future, function, MoreExecutors.sameThreadExecutor()); + } + + /** + * Returns a new {@code ListenableFuture} whose result is the product of + * applying the given {@code Function} to the result of the given {@code + * Future}. Example: + * + *

   {@code
+   *   ListenableFuture queryFuture = ...;
+   *   Function> rowsFunction =
+   *       new Function>() {
+   *         public List apply(QueryResult queryResult) {
+   *           return queryResult.getRows();
+   *         }
+   *       };
+   *   ListenableFuture> rowsFuture =
+   *       transform(queryFuture, rowsFunction, executor);
+   * }
+ * + *

The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future. That is, if the returned {@code Future} + * is cancelled, it will attempt to cancel the input, and if the input is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. + * + *

An example use of this method is to convert a serializable object + * returned from an RPC into a POJO. + * + *

Note: For cases in which the transformation is fast and lightweight, + * consider {@linkplain Futures#transform(ListenableFuture, Function) the + * other overload} or explicit use of {@link + * MoreExecutors#sameThreadExecutor}. For heavier transformations, this + * choice carries some caveats: First, the thread that the transformation + * runs in depends on whether the input {@code Future} is done at the time + * {@code transform} is called. In particular, if called late, {@code + * transform} will perform the transformation in the thread that called + * {@code transform}. Second, transformations may run in an internal thread + * of the system responsible for the input {@code Future}, such as an RPC + * network thread. Finally, during the execution of a {@code + * sameThreadExecutor} transformation, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. + * + * @param future The future to transform + * @param function A Function to transform the results of the provided future + * to the results of the returned future. + * @param executor Executor to run the function in. + * @return A future that holds result of the transformation. + * @since 9.0 (in 2.0 as {@code compose}) + */ + public static ListenableFuture transform(ListenableFuture future, + final Function function, Executor executor) { + checkNotNull(function); + Function> wrapperFunction + = new Function>() { + @Override public ListenableFuture apply(I input) { + O output = function.apply(input); + return immediateFuture(output); + } + }; + return chain(future, wrapperFunction, executor); + } + + /** + * Like {@link #transform(ListenableFuture, Function)} except that the + * transformation {@code function} is invoked on each call to + * {@link Future#get() get()} on the returned future. + * + *

The returned {@code Future} reflects the input's cancellation + * state directly, and any attempt to cancel the returned Future is likewise + * passed through to the input Future. + * + *

Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} + * only apply the timeout to the execution of the underlying {@code Future}, + * not to the execution of the transformation function. + * + *

The primary audience of this method is callers of {@code transform} + * who don't have a {@code ListenableFuture} available and + * do not mind repeated, lazy function evaluation. + * + * @param future The future to transform + * @param function A Function to transform the results of the provided future + * to the results of the returned future. + * @return A future that returns the result of the transformation. + * @since 10.0 + */ + @Beta + public static Future lazyTransform(final Future future, + final Function function) { + checkNotNull(future); + checkNotNull(function); + return new Future() { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public O get() throws InterruptedException, ExecutionException { + return applyTransformation(future.get()); + } + + @Override + public O get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return applyTransformation(future.get(timeout, unit)); + } + + private O applyTransformation(I input) throws ExecutionException { + try { + return function.apply(input); + } catch (Throwable t) { + throw new ExecutionException(t); + } + } + }; + } + + /** + * An implementation of {@code ListenableFuture} that also implements + * {@code Runnable} so that it can be used to nest ListenableFutures. + * Once the passed-in {@code ListenableFuture} is complete, it calls the + * passed-in {@code Function} to generate the result. + * + *

If the function throws any checked exceptions, they should be wrapped + * in a {@code UndeclaredThrowableException} so that this class can get + * access to the cause. + */ + private static class ChainingListenableFuture + extends AbstractFuture implements Runnable { + + private AsyncFunction function; + private ListenableFuture inputFuture; + private volatile ListenableFuture outputFuture; + private final BlockingQueue mayInterruptIfRunningChannel = + new LinkedBlockingQueue(1); + private final CountDownLatch outputCreated = new CountDownLatch(1); + + private ChainingListenableFuture( + AsyncFunction function, + ListenableFuture inputFuture) { + this.function = checkNotNull(function); + this.inputFuture = checkNotNull(inputFuture); + } + + /** + * Delegate the get() to the input and output futures, in case + * their implementations defer starting computation until their + * own get() is invoked. + */ + @Override + public O get() throws InterruptedException, ExecutionException { + if (!isDone()) { + // Invoking get on the inputFuture will ensure our own run() + // method below is invoked as a listener when inputFuture sets + // its value. Therefore when get() returns we should then see + // the outputFuture be created. + ListenableFuture inputFuture = this.inputFuture; + if (inputFuture != null) { + inputFuture.get(); + } + + // If our listener was scheduled to run on an executor we may + // need to wait for our listener to finish running before the + // outputFuture has been constructed by the function. + outputCreated.await(); + + // Like above with the inputFuture, we have a listener on + // the outputFuture that will set our own value when its + // value is set. Invoking get will ensure the output can + // complete and invoke our listener, so that we can later + // get the result. + ListenableFuture outputFuture = this.outputFuture; + if (outputFuture != null) { + outputFuture.get(); + } + } + return super.get(); + } + + /** + * Delegate the get() to the input and output futures, in case + * their implementations defer starting computation until their + * own get() is invoked. + */ + @Override + public O get(long timeout, TimeUnit unit) throws TimeoutException, + ExecutionException, InterruptedException { + if (!isDone()) { + // Use a single time unit so we can decrease remaining timeout + // as we wait for various phases to complete. + if (unit != NANOSECONDS) { + timeout = NANOSECONDS.convert(timeout, unit); + unit = NANOSECONDS; + } + + // Invoking get on the inputFuture will ensure our own run() + // method below is invoked as a listener when inputFuture sets + // its value. Therefore when get() returns we should then see + // the outputFuture be created. + ListenableFuture inputFuture = this.inputFuture; + if (inputFuture != null) { + long start = System.nanoTime(); + inputFuture.get(timeout, unit); + timeout -= Math.max(0, System.nanoTime() - start); + } + + // If our listener was scheduled to run on an executor we may + // need to wait for our listener to finish running before the + // outputFuture has been constructed by the function. + long start = System.nanoTime(); + if (!outputCreated.await(timeout, unit)) { + throw new TimeoutException(); + } + timeout -= Math.max(0, System.nanoTime() - start); + + // Like above with the inputFuture, we have a listener on + // the outputFuture that will set our own value when its + // value is set. Invoking get will ensure the output can + // complete and invoke our listener, so that we can later + // get the result. + ListenableFuture outputFuture = this.outputFuture; + if (outputFuture != null) { + outputFuture.get(timeout, unit); + } + } + return super.get(timeout, unit); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + /* + * Our additional cancellation work needs to occur even if + * !mayInterruptIfRunning, so we can't move it into interruptTask(). + */ + if (super.cancel(mayInterruptIfRunning)) { + // This should never block since only one thread is allowed to cancel + // this Future. + putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning); + cancel(inputFuture, mayInterruptIfRunning); + cancel(outputFuture, mayInterruptIfRunning); + return true; + } + return false; + } + + private void cancel(@Nullable Future future, + boolean mayInterruptIfRunning) { + if (future != null) { + future.cancel(mayInterruptIfRunning); + } + } + + @Override + public void run() { + try { + I sourceResult; + try { + sourceResult = getUninterruptibly(inputFuture); + } catch (CancellationException e) { + // Cancel this future and return. + // At this point, inputFuture is cancelled and outputFuture doesn't + // exist, so the value of mayInterruptIfRunning is irrelevant. + cancel(false); + return; + } catch (ExecutionException e) { + // Set the cause of the exception as this future's exception + setException(e.getCause()); + return; + } + + final ListenableFuture outputFuture = this.outputFuture = + function.apply(sourceResult); + if (isCancelled()) { + // Handles the case where cancel was called while the function was + // being applied. + // There is a gap in cancel(boolean) between calling sync.cancel() + // and storing the value of mayInterruptIfRunning, so this thread + // needs to block, waiting for that value. + outputFuture.cancel( + takeUninterruptibly(mayInterruptIfRunningChannel)); + this.outputFuture = null; + return; + } + outputFuture.addListener(new Runnable() { + @Override + public void run() { + try { + // Here it would have been nice to have had an + // UninterruptibleListenableFuture, but we don't want to start a + // combinatorial explosion of interfaces, so we have to make do. + set(getUninterruptibly(outputFuture)); + } catch (CancellationException e) { + // Cancel this future and return. + // At this point, inputFuture and outputFuture are done, so the + // value of mayInterruptIfRunning is irrelevant. + cancel(false); + return; + } catch (ExecutionException e) { + // Set the cause of the exception as this future's exception + setException(e.getCause()); + } finally { + // Don't pin inputs beyond completion + ChainingListenableFuture.this.outputFuture = null; + } + } + }, MoreExecutors.sameThreadExecutor()); + } catch (UndeclaredThrowableException e) { + // Set the cause of the exception as this future's exception + setException(e.getCause()); + } catch (Exception e) { + // This exception is irrelevant in this thread, but useful for the + // client + setException(e); + } catch (Error e) { + // Propagate errors up ASAP - our superclass will rethrow the error + setException(e); + } finally { + // Don't pin inputs beyond completion + function = null; + inputFuture = null; + // Allow our get routines to examine outputFuture now. + outputCreated.countDown(); + } + } + } + + /** + * Creates a new {@code ListenableFuture} whose value is a list containing the + * values of all its input futures, if all succeed. If any input fails, the + * returned future fails. + * + *

The list of results is in the same order as the input list. + * + *

Canceling this future does not cancel any of the component futures; + * however, if any of the provided futures fails or is canceled, this one is, + * too. + * + * @param futures futures to combine + * @return a future that provides a list of the results of the component + * futures + * @since 10.0 + */ + @Beta + public static ListenableFuture> allAsList( + ListenableFuture... futures) { + return new ListFuture(ImmutableList.copyOf(futures), true, + MoreExecutors.sameThreadExecutor()); + } + + /** + * Creates a new {@code ListenableFuture} whose value is a list containing the + * values of all its input futures, if all succeed. If any input fails, the + * returned future fails. + * + *

The list of results is in the same order as the input list. + * + *

Canceling this future does not cancel any of the component futures; + * however, if any of the provided futures fails or is canceled, this one is, + * too. + * + * @param futures futures to combine + * @return a future that provides a list of the results of the component + * futures + * @since 10.0 + */ + @Beta + public static ListenableFuture> allAsList( + Iterable> futures) { + return new ListFuture(ImmutableList.copyOf(futures), true, + MoreExecutors.sameThreadExecutor()); + } + + /** + * Creates a new {@code ListenableFuture} whose value is a list containing the + * values of all its successful input futures. The list of results is in the + * same order as the input list, and if any of the provided futures fails or + * is canceled, its corresponding position will contain {@code null} (which is + * indistinguishable from the future having a successful value of + * {@code null}). + * + * @param futures futures to combine + * @return a future that provides a list of the results of the component + * futures + * @since 10.0 + */ + @Beta + public static ListenableFuture> successfulAsList( + ListenableFuture... futures) { + return new ListFuture(ImmutableList.copyOf(futures), false, + MoreExecutors.sameThreadExecutor()); + } + + /** + * Creates a new {@code ListenableFuture} whose value is a list containing the + * values of all its successful input futures. The list of results is in the + * same order as the input list, and if any of the provided futures fails or + * is canceled, its corresponding position will contain {@code null} (which is + * indistinguishable from the future having a successful value of + * {@code null}). + * + * @param futures futures to combine + * @return a future that provides a list of the results of the component + * futures + * @since 10.0 + */ + @Beta + public static ListenableFuture> successfulAsList( + Iterable> futures) { + return new ListFuture(ImmutableList.copyOf(futures), false, + MoreExecutors.sameThreadExecutor()); + } + + /** + * Registers separate success and failure callbacks to be run when the {@code + * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() + * complete} or, if the computation is already complete, immediately. + * + *

There is no guaranteed ordering of execution of callbacks, but any + * callback added through this method is guaranteed to be called once the + * computation is complete. + * + * Example:

 {@code
+   * ListenableFuture future = ...;
+   * addCallback(future,
+   *     new FutureCallback {
+   *       public void onSuccess(QueryResult result) {
+   *         storeInCache(result);
+   *       }
+   *       public void onFailure(Throwable t) {
+   *         reportError(t);
+   *       }
+   *     });}
+ * + *

Note: This overload of {@code addCallback} is designed for cases in + * which the callack is fast and lightweight, as the method does not accept + * an {@code Executor} in which to perform the the work. For heavier + * callbacks, this overload carries some caveats: First, the thread that the + * callback runs in depends on whether the input {@code Future} is done at the + * time {@code addCallback} is called and on whether the input {@code Future} + * is ever cancelled. In particular, {@code addCallback} may execute the + * callback in the thread that calls {@code addCallback} or {@code + * Future.cancel}. Second, callbacks may run in an internal thread of the + * system responsible for the input {@code Future}, such as an RPC network + * thread. Finally, during the execution of a {@code sameThreadExecutor} + * callback, all other registered but unexecuted listeners are prevented from + * running, even if those listeners are to run in other executors. + * + *

For a more general interface to attach a completion listener to a + * {@code Future}, see {@link ListenableFuture#addListener addListener}. + * + * @param future The future attach the callback to. + * @param callback The callback to invoke when {@code future} is completed. + * @since 10.0 + */ + public static void addCallback(ListenableFuture future, + FutureCallback callback) { + addCallback(future, callback, MoreExecutors.sameThreadExecutor()); + } + + /** + * Registers separate success and failure callbacks to be run when the {@code + * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() + * complete} or, if the computation is already complete, immediately. + * + *

The callback is run in {@code executor}. + * There is no guaranteed ordering of execution of callbacks, but any + * callback added through this method is guaranteed to be called once the + * computation is complete. + * + * Example:

 {@code
+   * ListenableFuture future = ...;
+   * Executor e = ...
+   * addCallback(future, e,
+   *     new FutureCallback {
+   *       public void onSuccess(QueryResult result) {
+   *         storeInCache(result);
+   *       }
+   *       public void onFailure(Throwable t) {
+   *         reportError(t);
+   *       }
+   *     });}
+ * + * When the callback is fast and lightweight consider {@linkplain + * Futures#addCallback(ListenableFuture, FutureCallback) the other overload} + * or explicit use of {@link MoreExecutors#sameThreadExecutor + * sameThreadExecutor}. For heavier callbacks, this choice carries some + * caveats: First, the thread that the callback runs in depends on whether + * the input {@code Future} is done at the time {@code addCallback} is called + * and on whether the input {@code Future} is ever cancelled. In particular, + * {@code addCallback} may execute the callback in the thread that calls + * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in + * an internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} callback, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. + * + *

For a more general interface to attach a completion listener to a + * {@code Future}, see {@link ListenableFuture#addListener addListener}. + * + * @param future The future attach the callback to. + * @param callback The callback to invoke when {@code future} is completed. + * @param executor The executor to run {@code callback} when the future + * completes. + * @since 10.0 + */ + public static void addCallback(final ListenableFuture future, + final FutureCallback callback, Executor executor) { + Preconditions.checkNotNull(callback); + Runnable callbackListener = new Runnable() { + @Override + public void run() { + try { + // TODO(user): (Before Guava release), validate that this + // is the thing for IE. + V value = getUninterruptibly(future); + callback.onSuccess(value); + } catch (ExecutionException e) { + callback.onFailure(e.getCause()); + } catch (RuntimeException e) { + callback.onFailure(e); + } catch (Error e) { + callback.onFailure(e); + } + } + }; + future.addListener(callbackListener, executor); + } + + /** + * Returns the result of {@link Future#get()}, converting most exceptions to a + * new instance of the given checked exception type. This reduces boilerplate + * for a common use of {@code Future} in which it is unnecessary to + * programmatically distinguish between exception types or to extract other + * information from the exception instance. + * + *

Exceptions from {@code Future.get} are treated as follows: + *

    + *
  • Any {@link ExecutionException} has its cause wrapped in an + * {@code X} if the cause is a checked exception, an {@link + * UncheckedExecutionException} if the cause is a {@code + * RuntimeException}, or an {@link ExecutionError} if the cause is an + * {@code Error}. + *
  • Any {@link InterruptedException} is wrapped in an {@code X} (after + * restoring the interrupt). + *
  • Any {@link CancellationException} is propagated untouched, as is any + * other {@link RuntimeException} (though {@code get} implementations are + * discouraged from throwing such exceptions). + *
+ * + * The overall principle is to continue to treat every checked exception as a + * checked exception, every unchecked exception as an unchecked exception, and + * every error as an error. In addition, the cause of any {@code + * ExecutionException} is wrapped in order to ensure that the new stack trace + * matches that of the current thread. + * + *

Instances of {@code exceptionClass} are created by choosing an arbitrary + * public constructor that accepts zero or more arguments, all of type {@code + * String} or {@code Throwable} (preferring constructors with at least one + * {@code String}) and calling the constructor via reflection. If the + * exception did not already have a cause, one is set by calling {@link + * Throwable#initCause(Throwable)} on it. If no such constructor exists, an + * {@code IllegalArgumentException} is thrown. + * + * @throws X if {@code get} throws any checked exception except for an {@code + * ExecutionException} whose cause is not itself a checked exception + * @throws UncheckedExecutionException if {@code get} throws an {@code + * ExecutionException} with a {@code RuntimeException} as its cause + * @throws ExecutionError if {@code get} throws an {@code ExecutionException} + * with an {@code Error} as its cause + * @throws CancellationException if {@code get} throws a {@code + * CancellationException} + * @throws IllegalArgumentException if {@code exceptionClass} extends {@code + * RuntimeException} or does not have a suitable constructor + * @since 10.0 + */ + @Beta + public static V get( + Future future, Class exceptionClass) throws X { + checkNotNull(future); + checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), + "Futures.get exception type (%s) must not be a RuntimeException", + exceptionClass); + try { + return future.get(); + } catch (InterruptedException e) { + currentThread().interrupt(); + throw newWithCause(exceptionClass, e); + } catch (ExecutionException e) { + wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); + throw new AssertionError(); + } + } + + /** + * Returns the result of {@link Future#get(long, TimeUnit)}, converting most + * exceptions to a new instance of the given checked exception type. This + * reduces boilerplate for a common use of {@code Future} in which it is + * unnecessary to programmatically distinguish between exception types or to + * extract other information from the exception instance. + * + *

Exceptions from {@code Future.get} are treated as follows: + *

    + *
  • Any {@link ExecutionException} has its cause wrapped in an + * {@code X} if the cause is a checked exception, an {@link + * UncheckedExecutionException} if the cause is a {@code + * RuntimeException}, or an {@link ExecutionError} if the cause is an + * {@code Error}. + *
  • Any {@link InterruptedException} is wrapped in an {@code X} (after + * restoring the interrupt). + *
  • Any {@link TimeoutException} is wrapped in an {@code X}. + *
  • Any {@link CancellationException} is propagated untouched, as is any + * other {@link RuntimeException} (though {@code get} implementations are + * discouraged from throwing such exceptions). + *
+ * + * The overall principle is to continue to treat every checked exception as a + * checked exception, every unchecked exception as an unchecked exception, and + * every error as an error. In addition, the cause of any {@code + * ExecutionException} is wrapped in order to ensure that the new stack trace + * matches that of the current thread. + * + *

Instances of {@code exceptionClass} are created by choosing an arbitrary + * public constructor that accepts zero or more arguments, all of type {@code + * String} or {@code Throwable} (preferring constructors with at least one + * {@code String}) and calling the constructor via reflection. If the + * exception did not already have a cause, one is set by calling {@link + * Throwable#initCause(Throwable)} on it. If no such constructor exists, an + * {@code IllegalArgumentException} is thrown. + * + * @throws X if {@code get} throws any checked exception except for an {@code + * ExecutionException} whose cause is not itself a checked exception + * @throws UncheckedExecutionException if {@code get} throws an {@code + * ExecutionException} with a {@code RuntimeException} as its cause + * @throws ExecutionError if {@code get} throws an {@code ExecutionException} + * with an {@code Error} as its cause + * @throws CancellationException if {@code get} throws a {@code + * CancellationException} + * @throws IllegalArgumentException if {@code exceptionClass} extends {@code + * RuntimeException} or does not have a suitable constructor + * @since 10.0 + */ + @Beta + public static V get( + Future future, long timeout, TimeUnit unit, Class exceptionClass) + throws X { + checkNotNull(future); + checkNotNull(unit); + checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), + "Futures.get exception type (%s) must not be a RuntimeException", + exceptionClass); + try { + return future.get(timeout, unit); + } catch (InterruptedException e) { + currentThread().interrupt(); + throw newWithCause(exceptionClass, e); + } catch (TimeoutException e) { + throw newWithCause(exceptionClass, e); + } catch (ExecutionException e) { + wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); + throw new AssertionError(); + } + } + + private static void wrapAndThrowExceptionOrError( + Throwable cause, Class exceptionClass) throws X { + if (cause instanceof Error) { + throw new ExecutionError((Error) cause); + } + if (cause instanceof RuntimeException) { + throw new UncheckedExecutionException(cause); + } + throw newWithCause(exceptionClass, cause); + } + + /** + * Returns the result of calling {@link Future#get()} uninterruptibly on a + * task known not to throw a checked exception. This makes {@code Future} more + * suitable for lightweight, fast-running tasks that, barring bugs in the + * code, will not fail. This gives it exception-handling behavior similar to + * that of {@code ForkJoinTask.join}. + * + *

Exceptions from {@code Future.get} are treated as follows: + *

    + *
  • Any {@link ExecutionException} has its cause wrapped in an + * {@link UncheckedExecutionException} (if the cause is an {@code + * Exception}) or {@link ExecutionError} (if the cause is an {@code + * Error}). + *
  • Any {@link InterruptedException} causes a retry of the {@code get} + * call. The interrupt is restored before {@code getUnchecked} returns. + *
  • Any {@link CancellationException} is propagated untouched. So is any + * other {@link RuntimeException} ({@code get} implementations are + * discouraged from throwing such exceptions). + *
+ * + * The overall principle is to eliminate all checked exceptions: to loop to + * avoid {@code InterruptedException}, to pass through {@code + * CancellationException}, and to wrap any exception from the underlying + * computation in an {@code UncheckedExecutionException} or {@code + * ExecutionError}. + * + *

For an uninterruptible {@code get} that preserves other exceptions, see + * {@link Uninterruptibles#getUninterruptibly(Future)}. + * + * @throws UncheckedExecutionException if {@code get} throws an {@code + * ExecutionException} with an {@code Exception} as its cause + * @throws ExecutionError if {@code get} throws an {@code ExecutionException} + * with an {@code Error} as its cause + * @throws CancellationException if {@code get} throws a {@code + * CancellationException} + * @since 10.0 + */ + @Beta + public static V getUnchecked(Future future) { + checkNotNull(future); + try { + return getUninterruptibly(future); + } catch (ExecutionException e) { + wrapAndThrowUnchecked(e.getCause()); + throw new AssertionError(); + } + } + + private static void wrapAndThrowUnchecked(Throwable cause) { + if (cause instanceof Error) { + throw new ExecutionError((Error) cause); + } + /* + * It's a non-Error, non-Exception Throwable. From my survey of such + * classes, I believe that most users intended to extend Exception, so we'll + * treat it like an Exception. + */ + throw new UncheckedExecutionException(cause); + } + + /* + * TODO(user): FutureChecker interface for these to be static methods on? If + * so, refer to it in the (static-method) Futures.get documentation + */ + + /* + * Arguably we don't need a timed getUnchecked because any operation slow + * enough to require a timeout is heavyweight enough to throw a checked + * exception and therefore be inappropriate to use with getUnchecked. Further, + * it's not clear that converting the checked TimeoutException to a + * RuntimeException -- especially to an UncheckedExecutionException, since it + * wasn't thrown by the computation -- makes sense, and if we don't convert + * it, the user still has to write a try-catch block. + * + * If you think you would use this method, let us know. + */ + + private static X newWithCause( + Class exceptionClass, Throwable cause) { + // getConstructors() guarantees this as long as we don't modify the array. + @SuppressWarnings("unchecked") + List> constructors = + (List) Arrays.asList(exceptionClass.getConstructors()); + for (Constructor constructor : preferringStrings(constructors)) { + @Nullable X instance = newFromConstructor(constructor, cause); + if (instance != null) { + if (instance.getCause() == null) { + instance.initCause(cause); + } + return instance; + } + } + throw new IllegalArgumentException( + "No appropriate constructor for exception of type " + exceptionClass + + " in response to chained exception", cause); + } + + private static List> + preferringStrings(List> constructors) { + return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); + } + + private static final Ordering> WITH_STRING_PARAM_FIRST = + Ordering.natural().onResultOf(new Function, Boolean>() { + @Override public Boolean apply(Constructor input) { + return asList(input.getParameterTypes()).contains(String.class); + } + }).reverse(); + + @Nullable private static X newFromConstructor( + Constructor constructor, Throwable cause) { + Class[] paramTypes = constructor.getParameterTypes(); + Object[] params = new Object[paramTypes.length]; + for (int i = 0; i < paramTypes.length; i++) { + Class paramType = paramTypes[i]; + if (paramType.equals(String.class)) { + params[i] = cause.toString(); + } else if (paramType.equals(Throwable.class)) { + params[i] = cause; + } else { + return null; + } + } + try { + return constructor.newInstance(params); + } catch (IllegalArgumentException e) { + return null; + } catch (InstantiationException e) { + return null; + } catch (IllegalAccessException e) { + return null; + } catch (InvocationTargetException e) { + return null; + } + } + + /** + * Class that implements {@link #allAsList} and {@link #successfulAsList}. + * The idea is to create a (null-filled) List and register a listener with + * each component future to fill out the value in the List when that future + * completes. + */ + private static class ListFuture extends AbstractFuture> { + ImmutableList> futures; + final boolean allMustSucceed; + final AtomicInteger remaining; + List values; + + /** + * Constructor. + * + * @param futures all the futures to build the list from + * @param allMustSucceed whether a single failure or cancellation should + * propagate to this future + * @param listenerExecutor used to run listeners on all the passed in + * futures. + */ + ListFuture( + final ImmutableList> futures, + final boolean allMustSucceed, final Executor listenerExecutor) { + this.futures = futures; + this.values = Lists.newArrayListWithCapacity(futures.size()); + this.allMustSucceed = allMustSucceed; + this.remaining = new AtomicInteger(futures.size()); + + init(listenerExecutor); + } + + private void init(final Executor listenerExecutor) { + // First, schedule cleanup to execute when the Future is done. + addListener(new Runnable() { + @Override + public void run() { + // By now the values array has either been set as the Future's value, + // or (in case of failure) is no longer useful. + ListFuture.this.values = null; + + // Let go of the memory held by other futures + ListFuture.this.futures = null; + } + }, MoreExecutors.sameThreadExecutor()); + + // Now begin the "real" initialization. + + // Corner case: List is empty. + if (futures.isEmpty()) { + set(Lists.newArrayList(values)); + return; + } + + // Populate the results list with null initially. + for (int i = 0; i < futures.size(); ++i) { + values.add(null); + } + + // Register a listener on each Future in the list to update + // the state of this future. + // Note that if all the futures on the list are done prior to completing + // this loop, the last call to addListener() will callback to + // setOneValue(), transitively call our cleanup listener, and set + // this.futures to null. + // We store a reference to futures to avoid the NPE. + ImmutableList> localFutures = futures; + for (int i = 0; i < localFutures.size(); i++) { + final ListenableFuture listenable = localFutures.get(i); + final int index = i; + listenable.addListener(new Runnable() { + @Override + public void run() { + setOneValue(index, listenable); + } + }, listenerExecutor); + } + } + + /** + * Sets the value at the given index to that of the given future. + */ + private void setOneValue(int index, Future future) { + List localValues = values; + if (isDone() || localValues == null) { + // Some other future failed or has been cancelled, causing this one to + // also be cancelled or have an exception set. This should only happen + // if allMustSucceed is true. + checkState(allMustSucceed, + "Future was done before all dependencies completed"); + return; + } + + try { + checkState(future.isDone(), + "Tried to set value from future which is not done"); + localValues.set(index, getUninterruptibly(future)); + } catch (CancellationException e) { + if (allMustSucceed) { + // Set ourselves as cancelled. Let the input futures keep running + // as some of them may be used elsewhere. + // (Currently we don't override interruptTask, so + // mayInterruptIfRunning==false isn't technically necessary.) + cancel(false); + } + } catch (ExecutionException e) { + if (allMustSucceed) { + // As soon as the first one fails, throw the exception up. + // The result of all other inputs is then ignored. + setException(e.getCause()); + } + } catch (RuntimeException e) { + if (allMustSucceed) { + setException(e); + } + } catch (Error e) { + // Propagate errors up ASAP - our superclass will rethrow the error + setException(e); + } finally { + int newRemaining = remaining.decrementAndGet(); + checkState(newRemaining >= 0, "Less than 0 remaining futures"); + if (newRemaining == 0) { + localValues = values; + if (localValues != null) { + set(Lists.newArrayList(localValues)); + } else { + checkState(isDone()); + } + } + } + } + + @Override + public List get() throws InterruptedException, ExecutionException { + callAllGets(); + + // This may still block in spite of the calls above, as the listeners may + // be scheduled for execution in other threads. + return super.get(); + } + + /** + * Calls the get method of all dependency futures to work around a bug in + * some ListenableFutures where the listeners aren't called until get() is + * called. + */ + private void callAllGets() throws InterruptedException { + List> oldFutures = futures; + if (oldFutures != null && !isDone()) { + for (ListenableFuture future : oldFutures) { + // We wait for a little while for the future, but if it's not done, + // we check that no other futures caused a cancellation or failure. + // This can introduce a delay of up to 10ms in reporting an exception. + while (!future.isDone()) { + try { + future.get(); + } catch (Error e) { + throw e; + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + // ExecutionException / CancellationException / RuntimeException + if (allMustSucceed) { + return; + } else { + continue; + } + } + } + } + } + } + } + + /** + * A checked future that uses a function to map from exceptions to the + * appropriate checked type. + */ + private static class MappingCheckedFuture extends + AbstractCheckedFuture { + + final Function mapper; + + MappingCheckedFuture(ListenableFuture delegate, + Function mapper) { + super(delegate); + + this.mapper = checkNotNull(mapper); + } + + @Override + protected X mapException(Exception e) { + return mapper.apply(e); + } + } +} diff --git a/java/hive/src/main/java/com/google/common/util/concurrent/ListeningScheduledExecutorService.java b/java/hive/src/main/java/com/google/common/util/concurrent/ListeningScheduledExecutorService.java new file mode 100644 index 00000000..42dcdd24 --- /dev/null +++ b/java/hive/src/main/java/com/google/common/util/concurrent/ListeningScheduledExecutorService.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2011 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.util.concurrent; + +import com.google.common.annotations.Beta; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * A {@link ScheduledExecutorService} that returns {@link ListenableFuture} + * instances from its {@code ExecutorService} methods. Futures returned by the + * {@code schedule*} methods, by contrast, need not implement {@code + * ListenableFuture}. (To create an instance from an existing {@link + * ScheduledExecutorService}, call {@link + * MoreExecutors#listeningDecorator(ScheduledExecutorService)}. + * + *

TODO(cpovirk): make at least the one-time schedule() methods return a + * ListenableFuture, too? But then we'll need ListenableScheduledFuture... + * + * @author Chris Povirk + * @since 10.0 + */ +@Beta +public interface ListeningScheduledExecutorService + extends ScheduledExecutorService, ListeningExecutorService { +} diff --git a/java/hive/src/main/java/com/google/common/util/concurrent/MoreExecutors.java b/java/hive/src/main/java/com/google/common/util/concurrent/MoreExecutors.java new file mode 100644 index 00000000..915b96db --- /dev/null +++ b/java/hive/src/main/java/com/google/common/util/concurrent/MoreExecutors.java @@ -0,0 +1,478 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.util.concurrent; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.Beta; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link + * ExecutorService}, and {@link ThreadFactory}. + * + * @author Eric Fellheimer + * @author Kyle Littlefield + * @author Justin Mahoney + * @since 3.0 + */ +public final class MoreExecutors { + private MoreExecutors() {} + + /** + * Converts the given ThreadPoolExecutor into an ExecutorService that exits + * when the application is complete. It does so by using daemon threads and + * adding a shutdown hook to wait for their completion. + * + *

This is mainly for fixed thread pools. + * See {@link Executors#newFixedThreadPool(int)}. + * + * @param executor the executor to modify to make sure it exits when the + * application is finished + * @param terminationTimeout how long to wait for the executor to + * finish before terminating the JVM + * @param timeUnit unit of time for the time parameter + * @return an unmodifiable version of the input which will not hang the JVM + */ + @Beta + public static ExecutorService getExitingExecutorService( + ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { + executor.setThreadFactory(new ThreadFactoryBuilder() + .setDaemon(true) + .setThreadFactory(executor.getThreadFactory()) + .build()); + + ExecutorService service = Executors.unconfigurableExecutorService(executor); + + addDelayedShutdownHook(service, terminationTimeout, timeUnit); + + return service; + } + + /** + * Converts the given ScheduledThreadPoolExecutor into a + * ScheduledExecutorService that exits when the application is complete. It + * does so by using daemon threads and adding a shutdown hook to wait for + * their completion. + * + *

This is mainly for fixed thread pools. + * See {@link Executors#newScheduledThreadPool(int)}. + * + * @param executor the executor to modify to make sure it exits when the + * application is finished + * @param terminationTimeout how long to wait for the executor to + * finish before terminating the JVM + * @param timeUnit unit of time for the time parameter + * @return an unmodifiable version of the input which will not hang the JVM + */ + @Beta + public static ScheduledExecutorService getExitingScheduledExecutorService( + ScheduledThreadPoolExecutor executor, long terminationTimeout, + TimeUnit timeUnit) { + executor.setThreadFactory(new ThreadFactoryBuilder() + .setDaemon(true) + .setThreadFactory(executor.getThreadFactory()) + .build()); + + ScheduledExecutorService service = + Executors.unconfigurableScheduledExecutorService(executor); + + addDelayedShutdownHook(service, terminationTimeout, timeUnit); + + return service; + } + + /** + * Add a shutdown hook to wait for thread completion in the given + * {@link ExecutorService service}. This is useful if the given service uses + * daemon threads, and we want to keep the JVM from exiting immediately on + * shutdown, instead giving these daemon threads a chance to terminate + * normally. + * @param service ExecutorService which uses daemon threads + * @param terminationTimeout how long to wait for the executor to finish + * before terminating the JVM + * @param timeUnit unit of time for the time parameter + */ + @Beta + public static void addDelayedShutdownHook( + final ExecutorService service, final long terminationTimeout, + final TimeUnit timeUnit) { + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + try { + // We'd like to log progress and failures that may arise in the + // following code, but unfortunately the behavior of logging + // is undefined in shutdown hooks. + // This is because the logging code installs a shutdown hook of its + // own. See Cleaner class inside {@link LogManager}. + service.shutdown(); + service.awaitTermination(terminationTimeout, timeUnit); + } catch (InterruptedException ignored) { + // We're shutting down anyway, so just ignore. + } + } + })); + } + + /** + * Converts the given ThreadPoolExecutor into an ExecutorService that exits + * when the application is complete. It does so by using daemon threads and + * adding a shutdown hook to wait for their completion. + * + *

This method waits 120 seconds before continuing with JVM termination, + * even if the executor has not finished its work. + * + *

This is mainly for fixed thread pools. + * See {@link Executors#newFixedThreadPool(int)}. + * + * @param executor the executor to modify to make sure it exits when the + * application is finished + * @return an unmodifiable version of the input which will not hang the JVM + */ + @Beta + public static ExecutorService getExitingExecutorService( + ThreadPoolExecutor executor) { + return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); + } + + /** + * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that + * exits when the application is complete. It does so by using daemon threads + * and adding a shutdown hook to wait for their completion. + * + *

This method waits 120 seconds before continuing with JVM termination, + * even if the executor has not finished its work. + * + *

This is mainly for fixed thread pools. + * See {@link Executors#newScheduledThreadPool(int)}. + * + * @param executor the executor to modify to make sure it exits when the + * application is finished + * @return an unmodifiable version of the input which will not hang the JVM + */ + @Beta + public static ScheduledExecutorService getExitingScheduledExecutorService( + ScheduledThreadPoolExecutor executor) { + return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); + } + + /** + * Creates an executor service that runs each task in the thread + * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy} This + * applies both to individually submitted tasks and to collections of tasks + * submitted via {@code invokeAll} or {@code invokeAny}. In the latter case, + * tasks will run serially on the calling thread. Tasks are run to + * completion before a {@code Future} is returned to the caller (unless the + * executor has been shutdown). + * + *

Although all tasks are immediately executed in the thread that + * submitted the task, this {@code ExecutorService} imposes a small + * locking overhead on each task submission in order to implement shutdown + * and termination behavior. + * + *

The implementation deviates from the {@code ExecutorService} + * specification with regards to the {@code shutdownNow} method. First, + * "best-effort" with regards to canceling running tasks is implemented + * as "no-effort". No interrupts or other attempts are made to stop + * threads executing tasks. Second, the returned list will always be empty, + * as any submitted task is considered to have started execution. + * This applies also to tasks given to {@code invokeAll} or {@code invokeAny} + * which are pending serial execution, even the subset of the tasks that + * have not yet started execution. It is unclear from the + * {@code ExecutorService} specification if these should be included, and + * it's much easier to implement the interpretation that they not be. + * Finally, a call to {@code shutdown} or {@code shutdownNow} may result + * in concurrent calls to {@code invokeAll/invokeAny} throwing + * RejectedExecutionException, although a subset of the tasks may already + * have been executed. + * + * @since 10.0 (mostly source-compatible since 3.0) + */ + public static ListeningExecutorService sameThreadExecutor() { + return new SameThreadExecutorService(); + } + + // See sameThreadExecutor javadoc for behavioral notes. + private static class SameThreadExecutorService + extends AbstractListeningExecutorService { + /** + * Lock used whenever accessing the state variables + * (runningTasks, shutdown, terminationCondition) of the executor + */ + private final Lock lock = new ReentrantLock(); + + /** Signaled after the executor is shutdown and running tasks are done */ + private final Condition termination = lock.newCondition(); + + /* + * Conceptually, these two variables describe the executor being in + * one of three states: + * - Active: shutdown == false + * - Shutdown: runningTasks > 0 and shutdown == true + * - Terminated: runningTasks == 0 and shutdown == true + */ + private int runningTasks = 0; + private boolean shutdown = false; + + @Override + public void execute(Runnable command) { + startTask(); + try { + command.run(); + } finally { + endTask(); + } + } + + @Override + public boolean isShutdown() { + lock.lock(); + try { + return shutdown; + } finally { + lock.unlock(); + } + } + + @Override + public void shutdown() { + lock.lock(); + try { + shutdown = true; + } finally { + lock.unlock(); + } + } + + // See sameThreadExecutor javadoc for unusual behavior of this method. + @Override + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + + @Override + public boolean isTerminated() { + lock.lock(); + try { + return shutdown && runningTasks == 0; + } finally { + lock.unlock(); + } + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lock(); + try { + for (;;) { + if (isTerminated()) { + return true; + } else if (nanos <= 0) { + return false; + } else { + nanos = termination.awaitNanos(nanos); + } + } + } finally { + lock.unlock(); + } + } + + /** + * Checks if the executor has been shut down and increments the running + * task count. + * + * @throws RejectedExecutionException if the executor has been previously + * shutdown + */ + private void startTask() { + lock.lock(); + try { + if (isShutdown()) { + throw new RejectedExecutionException("Executor already shutdown"); + } + runningTasks++; + } finally { + lock.unlock(); + } + } + + /** + * Decrements the running task count. + */ + private void endTask() { + lock.lock(); + try { + runningTasks--; + if (isTerminated()) { + termination.signalAll(); + } + } finally { + lock.unlock(); + } + } + } + + /** + * Creates an {@link ExecutorService} whose {@code submit} and {@code + * invokeAll} methods submit {@link ListenableFutureTask} instances to the + * given delegate executor. Those methods, as well as {@code execute} and + * {@code invokeAny}, are implemented in terms of calls to {@code + * delegate.execute}. All other methods are forwarded unchanged to the + * delegate. This implies that the returned {@code ListeningExecutorService} + * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code + * invokeAny} methods, so any special handling of tasks must be implemented in + * the delegate's {@code execute} method or by wrapping the returned {@code + * ListeningExecutorService}. + * + *

If the delegate executor was already an instance of {@code + * ListeningExecutorService}, it is returned untouched, and the rest of this + * documentation does not apply. + * + * @since 10.0 + */ + public static ListeningExecutorService listeningDecorator( + ExecutorService delegate) { + return (delegate instanceof ListeningExecutorService) + ? (ListeningExecutorService) delegate + : (delegate instanceof ScheduledExecutorService) + ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate) + : new ListeningDecorator(delegate); + } + + /** + * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code + * invokeAll} methods submit {@link ListenableFutureTask} instances to the + * given delegate executor. Those methods, as well as {@code execute} and + * {@code invokeAny}, are implemented in terms of calls to {@code + * delegate.execute}. All other methods are forwarded unchanged to the + * delegate. This implies that the returned {@code + * SchedulingListeningExecutorService} never calls the delegate's {@code + * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special + * handling of tasks must be implemented in the delegate's {@code execute} + * method or by wrapping the returned {@code + * SchedulingListeningExecutorService}. + * + *

If the delegate executor was already an instance of {@code + * ListeningScheduledExecutorService}, it is returned untouched, and the rest + * of this documentation does not apply. + * + * @since 10.0 + */ + public static ListeningScheduledExecutorService listeningDecorator( + ScheduledExecutorService delegate) { + return (delegate instanceof ListeningScheduledExecutorService) + ? (ListeningScheduledExecutorService) delegate + : new ScheduledListeningDecorator(delegate); + } + + private static class ListeningDecorator + extends AbstractListeningExecutorService { + final ExecutorService delegate; + + ListeningDecorator(ExecutorService delegate) { + this.delegate = checkNotNull(delegate); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + } + + private static class ScheduledListeningDecorator + extends ListeningDecorator implements ListeningScheduledExecutorService { + final ScheduledExecutorService delegate; + + ScheduledListeningDecorator(ScheduledExecutorService delegate) { + super(delegate); + this.delegate = checkNotNull(delegate); + } + + @Override + public ScheduledFuture schedule( + Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule( + Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay( + command, initialDelay, delay, unit); + } + } +} From 1e8fcbf69b5f101eb52547a60ddeae513b08494a Mon Sep 17 00:00:00 2001 From: vegetableysm Date: Fri, 29 Dec 2023 14:40:22 +0800 Subject: [PATCH 2/2] Clean dockerfile and update readme. Signed-off-by: vegetableysm --- java/hive/docker/README.rst | 20 ++++---------- java/hive/docker/build.sh | 2 +- java/hive/docker/dependency/images/Dockerfile | 27 +++++++++---------- java/hive/docker/docker-compose.yaml | 2 ++ 4 files changed, 20 insertions(+), 31 deletions(-) diff --git a/java/hive/docker/README.rst b/java/hive/docker/README.rst index 611ce598..1e26814e 100644 --- a/java/hive/docker/README.rst +++ b/java/hive/docker/README.rst @@ -12,7 +12,7 @@ Build Hive Docker Image with Hadoop ### Build docker images ```bash - cd v6d/java/hive/distributed + cd v6d/java/hive/docker ./build.sh ``` @@ -23,15 +23,15 @@ Build Hive Docker Image with Hadoop ### Start sql server for hive metastore ```bash - cd v6d/java/hive/distributed/docker/mysql + cd v6d/java/hive/docker/dependency/mysql docker-compose -f mysql-compose.yaml up -d # You can change the password in mysql-compose.yaml and hive-site.xml ``` ### Run hadoop & hive docker images ```bash - cd v6d/java/hive/distributed/docker - docker-compose -f docker-compose.yaml up -d + cd v6d/java/hive/docker + docker-compose -f docker-compose-distributed.yaml up -d ``` ### Prepare tez jars @@ -53,14 +53,6 @@ Build Hive Docker Image with Hadoop docker exec -it hive-hiveserver2 beeline -u "jdbc:hive2://hive-hiveserver2:10000" -n root ``` -```sql - -- in beeline - drop table test_hive1; - create table test_hive1(field int); - insert into table test_hive1 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); - select * from test_hive1; -``` - Using vineyard as storage ----------------- @@ -100,9 +92,7 @@ Using vineyard as storage ```sql -- in beeline drop table test_vineyard; - create table test_vineyard(field int) - stored as Vineyard - location "vineyard:///user/hive_remote/warehouse/test_vineyard"; + create table test_vineyard(field int); insert into table test_vineyard values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); select * from test_vineyard; ``` \ No newline at end of file diff --git a/java/hive/docker/build.sh b/java/hive/docker/build.sh index b866d2a2..10e40b8c 100755 --- a/java/hive/docker/build.sh +++ b/java/hive/docker/build.sh @@ -42,7 +42,7 @@ else fi fi -cp -R ./dependency/* "$WORK_DIR/" +cp -R ./dependency/images/ "$WORK_DIR/" cp ../target/vineyard-hive-0.1-SNAPSHOT.jar "$WORK_DIR/images/" tar -xzf "$WORK_DIR/apache-hive-$HIVE_VERSION-bin.tar.gz" -C "$WORK_DIR/" diff --git a/java/hive/docker/dependency/images/Dockerfile b/java/hive/docker/dependency/images/Dockerfile index 785afc41..6307eb4b 100755 --- a/java/hive/docker/dependency/images/Dockerfile +++ b/java/hive/docker/dependency/images/Dockerfile @@ -1,11 +1,10 @@ -# FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/hadoop:v1 FROM apache/hadoop:3.3.5 -RUN mkdir -p /opt/apache/ -RUN mv /opt/hadoop/ /opt/apache/ -ENV HADOOP_HOME=/opt/apache/hadoop -ENV HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop -ENV HIVE_HOME=/opt/apache/hive +RUN mkdir -p /opt/apache/; \ + mv /opt/hadoop/ /opt/apache/ +ENV HADOOP_HOME=/opt/apache/hadoop \ + HADOOP_CONF_DIR=/opt/apache/hadoop/etc/hadoop \ + HIVE_HOME=/opt/apache/hive COPY ./vineyard-hive-0.1-SNAPSHOT.jar ${HADOOP_HOME}/share/hadoop/common/ # prepare hdoop config @@ -30,12 +29,10 @@ ENV PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${HIVE_HOME}/bin:${PATH} COPY bootstrap.sh /opt/apache/ COPY mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar ${HIVE_HOME}/lib/ -RUN sudo yum -y install unzip -RUN wget http://www.vuln.cn/wp-content/uploads/2019/08/libstdc.so_.6.0.26.zip -q && unzip libstdc.so_.6.0.26.zip -RUN sudo cp libstdc++.so.6.0.26 /usr/lib64 -RUN sudo rm /usr/lib64/libstdc++.so.6 -RUN sudo ln -s /usr/lib64/libstdc++.so.6.0.26 /usr/lib64/libstdc++.so.6 - -RUN sudo yum -y install vim - -#RUN yum -y install which +RUN sudo yum -y install unzip; \ + wget http://www.vuln.cn/wp-content/uploads/2019/08/libstdc.so_.6.0.26.zip -q && unzip libstdc.so_.6.0.26.zip; \ + sudo cp libstdc++.so.6.0.26 /usr/lib64; \ + sudo rm /usr/lib64/libstdc++.so.6; \ + sudo ln -s /usr/lib64/libstdc++.so.6.0.26 /usr/lib64/libstdc++.so.6; \ + sudo yum -y install vim; \ + rm libstdc.so_.6.0.26.zip libstdc++.so.6.0.26; diff --git a/java/hive/docker/docker-compose.yaml b/java/hive/docker/docker-compose.yaml index c6ce90ef..c1c3ba54 100644 --- a/java/hive/docker/docker-compose.yaml +++ b/java/hive/docker/docker-compose.yaml @@ -3,6 +3,7 @@ version: '3.5' services: metastore: image: apache/hadoop_hive:v1 + user: "root:root" restart: unless-stopped container_name: metastore hostname: metastore @@ -29,6 +30,7 @@ services: hive: image: apache/hadoop_hive:v1 + user: "root:root" depends_on: - metastore restart: unless-stopped