Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On write InfluxDB gives status 204 without any content written #15

Closed
abossenbroek opened this issue Apr 22, 2019 · 8 comments
Closed
Assignees
Labels
bug Something isn't working

Comments

@abossenbroek
Copy link

abossenbroek commented Apr 22, 2019

I am trying to load data from a CSV to InfluxDB using chronicler-spark. The program completes without errors but InfluxDB reports a 204 error. Example program that shows this behaviour on my laptop can be found below. May be related to a bug in influxdb.

sbt
name := "sensor_data"

version := "0.1"

scalaVersion := "2.12.8"
lazy val chronicler: String = "0.4.6"

libraryDependencies ++= Seq(
  // You don’t *have to* use Spark, but in case you want to, we have added the dependency
  "org.apache.spark" %% "spark-core" % "2.4.1",
  "org.apache.spark" %% "spark-sql" % "2.4.1",
  "com.github.fsanaulla" %% "chronicler-spark-ds"       % "0.2.8",
  "com.github.fsanaulla" %% "chronicler-ahc-io"         % chronicler,
  "com.github.fsanaulla" %% "chronicler-macros"         % chronicler,
  "com.github.fsanaulla" %% "chronicler-url-io"         % chronicler,
  "com.github.fsanaulla" %% "chronicler-url-management" % chronicler
)
Writing to InfluxDB
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxWriter}
import com.github.fsanaulla.chronicler.urlhttp.shared.InfluxConfig
import com.github.fsanaulla.chronicler.macros.annotations.{field, tag, timestamp}
import com.github.fsanaulla.chronicler.urlhttp.management.{InfluxMng, UrlManagementClient}
import org.apache.spark.sql


final case class MeteringEntry(
                                @tag sensorID: String,
                                @tag unit: Option[String],
                                @field value: Double,
                                @timestamp created: Long
                              )


object Main extends App {
  val spark: SparkSession =
    SparkSession
      .builder()
      .appName("Global temperature loader")
      .config("spark.master", "local[1]")
      .config("spark.driver.memory", "1600M")
      .config("spark.executor.memory", "1600M")
      .getOrCreate()

  import spark.implicits._

  val rawData: DataFrame  = spark.read
    .option("header", true)
    .option("sep", "|")
    .option("charset", "iso-8859-1")
    .csv("/tmp/toy_data.csv")
    .toDF(Seq("sensorID", "unit", "value", "created"): _*)
    .withColumn("value", 'value.cast(sql.types.DoubleType))
    .withColumn("created", 'created.cast(sql.types.LongType))

  val data = rawData.as[MeteringEntry]

  val credentials: InfluxCredentials = InfluxCredentials("influx", "influxx")
  implicit lazy val influxConf: InfluxConfig = InfluxConfig(host = "localhost", port = 8086,
    credentials = Some(credentials), gzipped = false, None)

  lazy val management: UrlManagementClient = InfluxMng(influxConf)

  management.dropDatabase("galaxy")
  management.createDatabase("galaxy")

  management.close()

  import com.github.fsanaulla.chronicler.spark.ds._

  implicit val wr: InfluxWriter[MeteringEntry] = new InfluxWriter[MeteringEntry] {
    def write(me: MeteringEntry): String = {
      val sb = StringBuilder.newBuilder

      def unitUnpacker(value: Option[String]): String = value match {
        case Some(s) => s
        case _ => ""
      }

      // Query looks like: <tags> <fields> <timestamp RFC3339>
      sb.append("unit=")
        .append(unitUnpacker(me.unit))
        .append(" ")
        .append("level=")
        .append(me.value)
        .append(" ")
        .append(me.created)

      sb.toString()
    }
  }

  val sensorID = "meaningOfLife"

  print(data.filter(row => row.sensorID == sensorID).show(10))
  data.filter(row => row.sensorID == sensorID).saveToInfluxDB(dbName = "galaxy", measName = sensorID,
    onFailure = ex => {
      throw new Exception(ex)
    })

  spark.stop()
}

/tmp/toy_data.csv

meaningOfLife|zorg|442.401184|1451602860000000000
meaningOfLife|zorg|442.083191|1451602920000000000
meaningOfLife|zorg|442.161682|1451602980000000000
meaningOfLife|zorg|442.598999|1451603040000000000
meaningOfLife|zorg|442.052185|1451603100000000000
meaningOfLife|zorg|442.718689|1451603160000000000
meaningOfLife|zorg|442.338806|1451603220000000000
meaningOfLife|zorg|442.322815|1451603280000000000
meaningOfLife|zorg|442.588318|1451603340000000000
meaningOfLife|zorg|442.041687|1451603400000000000

docker-compose.yml

version: '3.3'
services:
  influxdb:
    image: influxdb:1.7
    restart: always
    hostname: influxdb
    environment:
      - INFLUXDB_ADMIN_ENABLED=true
      - INFLUXDB_DB=uv_sensors
      - INFLUXDB_ADMIN_USER=influx
      - INFLUXDB_ADMIN_PASSWORD=influxx
      - INFLUXDB_WRITE_USER=influx_user
      - INFLUXDB_WRITE_USER_PASSWORD=influxx_user
      - VIRTUAL_PORT=8086
    volumes:
      - ./volumes/influxdb:/var/lib/influxdb:rw
    ports:
      - "8086:8086"

  chronograf:
    image: chronograf:1.7
    restart: always
    hostname: chronograf
    environment:
      - VIRTUAL_PORT=8888
      - INFLUXDB_URL=http://influxdb:8086
      - INFLUXDB_USERNAME=influx
      - INFLUXDB_PASSWORD=influxx
    volumes:
      - ./volumes/chronograf:/var/lib/chronograf:rw
    depends_on:
      - influxdb
    ports:
      - "8888:8888"

Logs

When running with sbt run I see the following on the spark logs,

19/04/22 18:19:48 INFO ContextCleaner: Cleaned accumulator 12
+-------------+----+----------+-------------------+
|     sensorID|unit|     value|            created|
+-------------+----+----------+-------------------+
|meaningOfLife|zorg|442.083191|1451602920000000000|
|meaningOfLife|zorg|442.161682|1451602980000000000|
|meaningOfLife|zorg|442.598999|1451603040000000000|
|meaningOfLife|zorg|442.052185|1451603100000000000|
|meaningOfLife|zorg|442.718689|1451603160000000000|
|meaningOfLife|zorg|442.338806|1451603220000000000|
|meaningOfLife|zorg|442.322815|1451603280000000000|
|meaningOfLife|zorg|442.588318|1451603340000000000|
|meaningOfLife|zorg|442.041687|1451603400000000000|
+-------------+----+----------+-------------------+

and the following in the docker logs

influxdb_1    | ts=2019-04-22T17:19:48.026308Z lvl=info msg="Executing query" log_id=0Ew~~o7l000 service=query query="DROP DATABASE galaxy"
influxdb_1    | [httpd] 172.20.0.1 - inflx [22/Apr/2019:17:19:48 +0000] "GET /query?p=%5BREDACTED%5D&q=DROP+DATABASE+galaxy&u=inflx HTTP/1.1" 200 168 "-" "Java/1.8.0_212" d4848559-6522-11e9-82a8-000000000000 818
influxdb_1    | ts=2019-04-22T17:19:48.081766Z lvl=info msg="Executing query" log_id=0Ew~~o7l000 service=query query="CREATE DATABASE galaxy"
influxdb_1    | [httpd] 172.20.0.1 - inflx [22/Apr/2019:17:19:48 +0000] "GET /query?p=%5BREDACTED%5D&q=CREATE+DATABASE+galaxy&u=inflx HTTP/1.1" 200 170 "-" "Java/1.8.0_212" d48cfc4f-6522-11e9-82a9-000000000000 7946
influxdb_1    | [httpd] 172.20.0.1 - inflx [22/Apr/2019:17:19:49 +0000] "POST /write?db=galaxy&p=%5BREDACTED%5D&u=inflx HTTP/1.1" 204 0 "-" "Java/1.8.0_212" d51ca41b-6522-11e9-82aa-000000000000 163483

Diagnosis

To understand the issue better I set a breakpoint at com.github.fsanaulla.chronicler.urlhttp.io.models.UrlWriter.scala:45
and stepped through the method. I can confirm the entity is properly formatted
and works well when inserted with insert using the chronograf gui.

@fsanaulla fsanaulla self-assigned this Apr 22, 2019
@fsanaulla
Copy link
Owner

fsanaulla commented Apr 22, 2019

Hello, @abossenbroek.

Thank you for your attention.

As I know Influx use 204 code as a result of a successful insert link.

So, I'm I right, you have 204 without inserting data?

@abossenbroek
Copy link
Author

Thanks for pointing out the doc, wasn't aware @fsanaulla. You are right, the example I provided does not lead to any data inserted into the database. Remarkably, it works if I create a Dataset in scala without reading in a csv.

@abossenbroek abossenbroek changed the title On write InfluxDB gives status 204 On write InfluxDB gives status 204 without any content written Apr 22, 2019
@fsanaulla
Copy link
Owner

Few notes:

  • can you provide a file sample?
  • check you timestamp format, because by default influx inspect to receive in nano precision.
  • did you try structure-streaming for csv? Connector for it already exists in this project

@abossenbroek
Copy link
Author

abossenbroek commented Apr 22, 2019

Few notes:

  • can you provide a file sample?

See CSV section in my question

  • check you timestamp format, because by default influx inspect to receive in nano precision.

I am aware of that, I took the data and wrote a bulk load myself and it works. Also tried

  val data: Dataset[MeteringEntry] = Seq(
    MeteringEntry("meaningOfLife", Option("zorg"), 442.401184,1451602860000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.083191,1451602920000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.161682,1451602980000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.598999,1451603040000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.052185,1451603100000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.718689,1451603160000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.338806,1451603220000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.322815,1451603280000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.588318,1451603340000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.041687,1451603400000000000L)).toDS

and this works properly. Somehow just the csv doesn't work.

  • did you try structure-streaming for csv? Connector for it already exists in this project

Not sure how to do this. Any pointers would be useful, thanks.

@fsanaulla
Copy link
Owner

@abossenbroek I will look on it today.

About structured-streaming:

  • look on it
  • example from test

@fsanaulla fsanaulla added the bug Something isn't working label Apr 23, 2019
@abossenbroek
Copy link
Author

Interestingly, adding .repartition(2) solves the issue somehow. Test with:

data.repartition(2).filter(row => row.sensorID == sensorID).saveToInfluxDB(dbName = "galaxy", measName = sensorID,
    onFailure = ex => {
      throw new Exception(ex)
    })

@fsanaulla
Copy link
Owner

fsanaulla commented Apr 23, 2019

@abossenbroek 0.2.9 should be in a few hours. It should help you I think. Tell me if it helps.

@fsanaulla
Copy link
Owner

Reopen if the error still occurs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants