From d316190f7602d86b3a16558386d62a84d11a1ead Mon Sep 17 00:00:00 2001 From: mans2singh Date: Mon, 18 Apr 2022 05:00:38 -0400 Subject: [PATCH] [Improvement][connector-spark-email] Refactored config parameter constants and default values (#1707) --- .../apache/seatunnel/spark/email/Config.scala | 94 +++++++++++++++++++ .../seatunnel/spark/email/sink/Email.scala | 33 ++++--- 2 files changed, 110 insertions(+), 17 deletions(-) create mode 100644 seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/Config.scala diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/Config.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/Config.scala new file mode 100644 index 00000000000..1f8329960e5 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/Config.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.spark.email + +/** + * Configurations for Email connector + */ +object Config extends Serializable { + /** + * Email server host + */ + val HOST = "host" + + /** + * Email host port + */ + val PORT = "port" + + /** + * User or sender password + */ + val PASSWORD = "password" + + /** + * Email sender + */ + val FROM = "from" + + /** + * Email recepients + */ + val TO = "to" + + /** + * Default rows to limit + */ + val DEFAULT_LIMIT = 100000 + + /** + * Number of rows to include + */ + val LIMIT = "limit" + + /** + * Email subject + */ + val SUBJECT = "subject" + + /** + * Email content text format + */ + val BODY_TEXT = "bodyText" + + /** + * Email content html format + */ + val BODY_HTML = "bodyHtml" + + /** + * Email BCC + */ + val BCC = "bcc" + + /** + * Email CC + */ + val CC = "cc" + + /** + * Whether to use ssl + */ + val USE_SSL = "use_ssl" + + /** + * Whether to use tls + */ + val USE_TLS = "use_tls" + + +} diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala index 69ac5e35061..a012bffd1b5 100644 --- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala +++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala @@ -17,9 +17,7 @@ package org.apache.seatunnel.spark.email.sink import java.io.ByteArrayOutputStream - import scala.collection.JavaConverters._ - import com.norbitltd.spoiwo.model.Workbook import com.norbitltd.spoiwo.natures.xlsx.Model2XlsxConversions._ import com.typesafe.config.ConfigFactory @@ -27,6 +25,7 @@ import org.apache.poi.xssf.usermodel.XSSFWorkbook import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult} import org.apache.seatunnel.spark.SparkEnvironment import org.apache.seatunnel.spark.batch.SparkBatchSink +import org.apache.seatunnel.spark.email.Config.{BCC, BODY_HTML, BODY_TEXT, CC, DEFAULT_LIMIT, FROM, HOST, LIMIT, PASSWORD, PORT, SUBJECT, TO, USE_SSL, USE_TLS} import org.apache.spark.sql.{Dataset, Row} import play.api.libs.mailer.{Attachment, AttachmentData, Email, SMTPConfiguration, SMTPMailer} @@ -36,7 +35,7 @@ class Email extends SparkBatchSink { // Get xlsx file's byte array val headerRow = Some(data.schema.fields.map(_.name).toSeq) - val limitCount = if (config.hasPath("limit")) config.getInt("limit") else 100000 + val limitCount = if (config.hasPath(LIMIT)) config.getInt(LIMIT) else DEFAULT_LIMIT val dataRows = data.limit(limitCount) .toLocalIterator() .asScala @@ -55,22 +54,22 @@ class Email extends SparkBatchSink { xlsxBytes, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") - val subject = if (config.hasPath("subject")) config.getString("subject") else "(No subject)" + val subject = if (config.hasPath(SUBJECT)) config.getString(SUBJECT) else "(No subject)" // Who sent the mail - val from = config.getString("from") + val from = config.getString(FROM) // Who receive mail - val to = config.getString("to").split(",") + val to = config.getString(TO).split(",") - val bodyText = if (config.hasPath("bodyText")) Some(config.getString("bodyText")) else None + val bodyText = if (config.hasPath(BODY_TEXT)) Some(config.getString(BODY_TEXT)) else None // Hypertext content, If bodyHtml is set, then bodeText will not take effect. - val bodyHtml = if (config.hasPath("bodyHtml")) Some(config.getString("bodyHtml")) else None + val bodyHtml = if (config.hasPath(BODY_HTML)) Some(config.getString(BODY_HTML)) else None val cc = - if (config.hasPath("cc")) config.getString("cc").split(",").map(_.trim()).filter(_.nonEmpty) + if (config.hasPath(CC)) config.getString(CC).split(",").map(_.trim()).filter(_.nonEmpty) else Array[String]() val bcc = - if (config.hasPath("bcc")) { - config.getString("bcc").split(",").map(_.trim()).filter(_.nonEmpty) + if (config.hasPath(BCC)) { + config.getString(BCC).split(",").map(_.trim()).filter(_.nonEmpty) } else { Array[String]() } @@ -86,18 +85,18 @@ class Email extends SparkBatchSink { bcc = bcc) // Mailbox server settings, used to send mail - val host = config.getString("host") - val port = config.getInt("port") - val password = config.getString("password") - val ssl = if (config.hasPath("use_ssl")) config.getBoolean("use_ssl") else false - val tls = if (config.hasPath("use_tls")) config.getBoolean("use_tls") else false + val host = config.getString(HOST) + val port = config.getInt(PORT) + val password = config.getString(PASSWORD) + val ssl = if (config.hasPath(USE_SSL)) config.getBoolean(USE_SSL) else false + val tls = if (config.hasPath(USE_TLS)) config.getBoolean(USE_TLS) else false val mailer: SMTPMailer = createMailer(host, port, from, password, ssl, tls) val result: String = mailer.send(email) } override def checkConfig(): CheckResult = { - CheckConfigUtil.checkAllExists(config, "from", "to", "host", "port", "password") + CheckConfigUtil.checkAllExists(config, FROM, TO, HOST, PORT, PASSWORD) } def createMailer(