Skip to content

Commit

Permalink
[Improvement][connector-spark-email] Refactored config parameter cons…
Browse files Browse the repository at this point in the history
…tants and default values (#1707)
  • Loading branch information
mans2singh authored Apr 18, 2022
1 parent 0e73105 commit d316190
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.spark.email

/**
* Configurations for Email connector
*/
object Config extends Serializable {
/**
* Email server host
*/
val HOST = "host"

/**
* Email host port
*/
val PORT = "port"

/**
* User or sender password
*/
val PASSWORD = "password"

/**
* Email sender
*/
val FROM = "from"

/**
* Email recepients
*/
val TO = "to"

/**
* Default rows to limit
*/
val DEFAULT_LIMIT = 100000

/**
* Number of rows to include
*/
val LIMIT = "limit"

/**
* Email subject
*/
val SUBJECT = "subject"

/**
* Email content text format
*/
val BODY_TEXT = "bodyText"

/**
* Email content html format
*/
val BODY_HTML = "bodyHtml"

/**
* Email BCC
*/
val BCC = "bcc"

/**
* Email CC
*/
val CC = "cc"

/**
* Whether to use ssl
*/
val USE_SSL = "use_ssl"

/**
* Whether to use tls
*/
val USE_TLS = "use_tls"


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
package org.apache.seatunnel.spark.email.sink

import java.io.ByteArrayOutputStream

import scala.collection.JavaConverters._

import com.norbitltd.spoiwo.model.Workbook
import com.norbitltd.spoiwo.natures.xlsx.Model2XlsxConversions._
import com.typesafe.config.ConfigFactory
import org.apache.poi.xssf.usermodel.XSSFWorkbook
import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.seatunnel.spark.email.Config.{BCC, BODY_HTML, BODY_TEXT, CC, DEFAULT_LIMIT, FROM, HOST, LIMIT, PASSWORD, PORT, SUBJECT, TO, USE_SSL, USE_TLS}
import org.apache.spark.sql.{Dataset, Row}
import play.api.libs.mailer.{Attachment, AttachmentData, Email, SMTPConfiguration, SMTPMailer}

Expand All @@ -36,7 +35,7 @@ class Email extends SparkBatchSink {

// Get xlsx file's byte array
val headerRow = Some(data.schema.fields.map(_.name).toSeq)
val limitCount = if (config.hasPath("limit")) config.getInt("limit") else 100000
val limitCount = if (config.hasPath(LIMIT)) config.getInt(LIMIT) else DEFAULT_LIMIT
val dataRows = data.limit(limitCount)
.toLocalIterator()
.asScala
Expand All @@ -55,22 +54,22 @@ class Email extends SparkBatchSink {
xlsxBytes,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")

val subject = if (config.hasPath("subject")) config.getString("subject") else "(No subject)"
val subject = if (config.hasPath(SUBJECT)) config.getString(SUBJECT) else "(No subject)"
// Who sent the mail
val from = config.getString("from")
val from = config.getString(FROM)
// Who receive mail
val to = config.getString("to").split(",")
val to = config.getString(TO).split(",")

val bodyText = if (config.hasPath("bodyText")) Some(config.getString("bodyText")) else None
val bodyText = if (config.hasPath(BODY_TEXT)) Some(config.getString(BODY_TEXT)) else None
// Hypertext content, If bodyHtml is set, then bodeText will not take effect.
val bodyHtml = if (config.hasPath("bodyHtml")) Some(config.getString("bodyHtml")) else None
val bodyHtml = if (config.hasPath(BODY_HTML)) Some(config.getString(BODY_HTML)) else None

val cc =
if (config.hasPath("cc")) config.getString("cc").split(",").map(_.trim()).filter(_.nonEmpty)
if (config.hasPath(CC)) config.getString(CC).split(",").map(_.trim()).filter(_.nonEmpty)
else Array[String]()
val bcc =
if (config.hasPath("bcc")) {
config.getString("bcc").split(",").map(_.trim()).filter(_.nonEmpty)
if (config.hasPath(BCC)) {
config.getString(BCC).split(",").map(_.trim()).filter(_.nonEmpty)
} else {
Array[String]()
}
Expand All @@ -86,18 +85,18 @@ class Email extends SparkBatchSink {
bcc = bcc)

// Mailbox server settings, used to send mail
val host = config.getString("host")
val port = config.getInt("port")
val password = config.getString("password")
val ssl = if (config.hasPath("use_ssl")) config.getBoolean("use_ssl") else false
val tls = if (config.hasPath("use_tls")) config.getBoolean("use_tls") else false
val host = config.getString(HOST)
val port = config.getInt(PORT)
val password = config.getString(PASSWORD)
val ssl = if (config.hasPath(USE_SSL)) config.getBoolean(USE_SSL) else false
val tls = if (config.hasPath(USE_TLS)) config.getBoolean(USE_TLS) else false

val mailer: SMTPMailer = createMailer(host, port, from, password, ssl, tls)
val result: String = mailer.send(email)
}

override def checkConfig(): CheckResult = {
CheckConfigUtil.checkAllExists(config, "from", "to", "host", "port", "password")
CheckConfigUtil.checkAllExists(config, FROM, TO, HOST, PORT, PASSWORD)
}

def createMailer(
Expand Down

0 comments on commit d316190

Please sign in to comment.