-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature]:support spark connector sink data to doris #6256
Conversation
|
||
val buffer = ListBuffer[String]() | ||
partition.foreach(row => { | ||
val rowString = row.toSeq.mkString("\t") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do need to consider the Null value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It is recommended to add the number of failed load retries
- It is recommended to get the list of surviving BE nodes through FE, and directly connect to BE to execute Stream load through rotation training or other strategies.
- Add to execute stream load according to the time interval, two strategies, one is according to the number of records you have implemented so far, the other is according to the time interval
} | ||
|
||
public void load(String value) throws StreamLoadException { | ||
LoadResponse loadResponse = loadBatch(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is recommended to add the maximum number of failed retries ,Avoid failures caused by short-term network jitter
|
||
private LoadResponse loadBatch(String value) { | ||
Calendar calendar = Calendar.getInstance(); | ||
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is recommended to use spark_connector_ for the label name, which is easy to distinguish
package org.apache.doris.spark.sql | ||
|
||
object DorisOptions { | ||
val beHostPort="beHostPort" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IP address of FE should be configured here, and the list of surviving BE nodes can be obtained through FE, and then communication training or other strategies during Stream load, connect to BE to execute load, and avoid the pressure of FE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
support spark conector write dataframe to doris
support spark conector write dataframe to doris
Proposed changes
support spark conncetor write dataframe to doris
Types of changes
What types of changes does your code introduce to Doris?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.Further comments