Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flink_tranquility send event to druid fail #312

Open
johncai0 opened this issue Oct 20, 2020 · 2 comments
Open

flink_tranquility send event to druid fail #312

johncai0 opened this issue Oct 20, 2020 · 2 comments

Comments

@johncai0
Copy link

johncai0 commented Oct 20, 2020

The flink program executes normally, but cannot write to druid(Source cannot be queried in druid). What is wrong?

version:
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>2.2.0</kafka.version>
<flink.version>1.10.0</flink.version>

flin_tranquility version:

io.druid
tranquility-flink_2.11
0.8.3

apache druid version: 0.19.0

flink code:
event case class:
case class Event(id: Int, name: String, age: Int, ts: Long)

SimpleEventBeamFactory.scala
`class SimpleEventBeamFactory(conf: ConfigureUtil) extends BeamFactory[Event] {

lazy val makeBeam: Beam[Event] = {

// Tranquility uses ZooKeeper (through Curator framework) for coordination.
val curator = CuratorFrameworkFactory.newClient(
  conf.getDruidZKHost(),
  new BoundedExponentialBackoffRetry(100, 3000, 5)
)
curator.start()

val dimensions = IndexedSeq("id", "name", "age")
val aggregators = Seq(new LongSumAggregatorFactory("ts", "ts_sum"))
val isRollup = true

// Expects simpleEvent.timestamp to return a Joda DateTime object.
DruidBeams
  .builder((simpleEvent: Event) => new DateTime(simpleEvent.ts))
  .curator(curator)
  .discoveryPath(conf.getDruidDiscoveryPath())
  .location(DruidLocation.create(conf.getDruidIndexService(), "john_flink"))
  .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities. MINUTE,isRollup))
  .tuning(
    ClusteredBeamTuning(
      segmentGranularity = Granularity.HOUR,
      windowPeriod = new Period("PT10M"),
      partitions = 1,
      replicants = 1
    )
  )
  .buildBeam()

}
}`

config:
application.name: dsp-report
profiles.active: @profileActive@
druid.zk.host: 192.168.200.78:2181
druid.index.service: druid/overlord
druid.discovery.path: /druid/discovery

input event data:
1,john1,18,1603179767000
2,john2,19,1603179768000
3,john3,11,1603179769000
2,john2,19,1603179770000
2,john2,19,1603179771000
2,john2,19,1603179772000
2,john2,19,1603617127000
2,john2,19,1603617127400
2,john2,19,1603617127500
2,john2,19,1603617127600
1,john1,18,1603789927000
2,john2,19,1603789927000
3,john3,11,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000
2,john2,19,1603789927000

@johncai0 johncai0 changed the title flin_tranquility send event to druid fail flink_tranquility send event to druid fail Oct 20, 2020
@fpompermaier
Copy link

From what I see in the https://github.com/druid-io/tranquility/blob/master/build.sbt Flink version is very old (1.0.3). Is Tranquillity supposed to work also with newer versions?

@johncai0
Copy link
Author

It can be solved by customizing the source, and finally passed the test.
But "Tranquillity " does not support Exactly-once, so it was later abandoned.
In the business, flink-"kafka-"druid was finally used

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants