Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dima-vm committed Dec 2, 2019
1 parent f81845e commit 2c8cb62
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 46 deletions.
4 changes: 3 additions & 1 deletion presto-victoriametrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ CREATE TABLE metrics (
Then query:
```
SELECT * FROM metrics
AND JSON_EXTRACT(CAST (name AS JSON), '$.__name__') = JSON '"vm_blocks"'
WHERE REGEXP_LIKE(name, '__name__="<search_for_this_name>"')
AND REGEXP_LIKE(name, 'foo="<search_for_this_foo_value>"')
AND REGEXP_LIKE(name, 'foo="<search_for_this_foo_value>"')
AND timestamp > NOW() - INTERVAL '10 hours'
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
* limitations under the License.
*/
package com.victoriametrics.presto.inject

import com.google.inject.Binder
import com.google.inject.Module
import com.google.inject.Provides
import com.google.inject.Scopes
import com.victoriametrics.presto.VmConnector
import okhttp3.OkHttpClient

class VmModule : Module {
override fun configure(binder: Binder) {
binder.bind<VmConnector>(VmConnector::class.java).`in`(Scopes.SINGLETON)
}

@Provides
fun provideHttpClient(): OkHttpClient {
return OkHttpClient.Builder().build()
}
}
//
// import com.google.inject.Binder
// import com.google.inject.Module
// import com.google.inject.Provides
// import com.google.inject.Scopes
// import com.victoriametrics.presto.VmConnector
// import okhttp3.OkHttpClient
//
// class VmModule : Module {
// override fun configure(binder: Binder) {
// binder.bind<VmConnector>(VmConnector::class.java).`in`(Scopes.SINGLETON)
// }
//
// @Provides
// fun provideHttpClient(): OkHttpClient {
// return OkHttpClient.Builder().build()
// }
// }
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,71 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonConfiguration


@Serializable
// @UseExperimental(kotlinx.serialization.ImplicitReflectionSerializer::class)
data class ExportResponseLine(
@Required val metric: Metric,
@Required val values: DoubleArray,
@Required val timestamps: LongArray
@Required val metric: LinkedHashMap<String, String>,
@Required val values: DoubleArray,
@Required val timestamps: LongArray
) {
companion object {
private val json = Json(JsonConfiguration.Stable)
private val serializer = serializer()

fun deserialize(content: String): ExportResponseLine {
// val mapper = ObjectMapper()
// val tree: ObjectNode = mapper.readTree(content) as ObjectNode
// val node: ArrayNode = tree.get("timestamps") as ArrayNode
// node.
val jsonElement = json.parseJson(content)
return json.parse(serializer, content)
}
}

init {
require(timestamps.size == values.size) { "Sizes aren't equal: ${timestamps.size} vs ${values.size}" }
}
//
// class MetricDeserializer @JvmOverloads constructor(vc: Class<*>? = null) : StdDeserializer<Metric>(vc) {
// @Throws(IOException::class, JsonProcessingException::class)
// override fun deserialize(jp: JsonParser, ctxt: DeserializationContext): Metric {
// val node: JsonNode = jp.codec.readTree(jp)
// jp.currentToken.isStructStart
// assert(jp.isExpectedStartObjectToken) {jp.currentToken}
// jp.nextToken()
// jp.currentToken.asCharArray()
// // val id = (node["id"] as IntNode).numberValue() as Int
// // val itemName = node["itemName"].asText()
// // val userId = (node["createdBy"] as IntNode).numberValue() as Int
// return Metric()
// }
// }

//
// @JsonDeserialize(using = MetricDeserializer::class)
// @Serializable
// data class Metric(val fullName: String) {
// @Serializer(forClass = Metric::class)
// companion object : KSerializer<Metric> {
// override val descriptor: SerialDescriptor = StringDescriptor.withName("Metric")
//
// override fun serialize(encoder: Encoder, obj: Metric) {
// encoder.encodeString(obj.fullName)
// }
//
// override fun deserialize(decoder: Decoder): Metric {
// return Metric(decoder.decode())
// }
// }
// }

@Serializable
data class Metric(
@SerialName("__name__")
val name: String,
val job: String,
val type: String,
val instance: String
data class Metric2(
@SerialName("__name__")
val name: String,
val job: String,
val type: String,
val instance: String
) {
override fun toString(): String {
// TODO: return original string from export, don't parse its JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package com.victoriametrics.presto.model
import com.facebook.presto.spi.ColumnMetadata
import com.facebook.presto.spi.SchemaTableName
import com.facebook.presto.spi.type.DoubleType
import com.facebook.presto.spi.type.RowType
import com.facebook.presto.spi.type.TimestampType
import com.facebook.presto.spi.type.VarcharType

Expand All @@ -28,7 +27,9 @@ object VmSchema {
// TODO: proper uint32 type http://prestodb.github.io/docs/current/develop/types.html
// ColumnMetadata("account_id", BigintType.BIGINT),
// ColumnMetadata("project_id", BigintType.BIGINT),
ColumnMetadata("name", RowType.from(listOf(RowType.field("__name__", VarcharType.VARCHAR)))),
ColumnMetadata("name", VarcharType.VARCHAR),
// ColumnMetadata("name", MapType(VarcharType.VARCHAR, VarcharType.VARCHAR)),
// ColumnMetadata("name", RowType.from(listOf(RowType.field("__name__", VarcharType.VARCHAR)))),
ColumnMetadata("timestamp", TimestampType.TIMESTAMP),
ColumnMetadata("value", DoubleType.DOUBLE)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@
package com.victoriametrics.presto

import com.facebook.airlift.log.Logger
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import com.victoriametrics.presto.model.ExportResponseLine
import org.assertj.core.api.Assertions.assertThat
import org.testng.annotations.Test
import java.io.IOException
import java.nio.charset.StandardCharsets

internal class ExportResponseLineTest {
class ExportResponseLineTest {
private val log = Logger.get(this::class.java)!!
/**
* `http://0.0.0.0:8428/api/v1/export?match={__name__=~"vm_blocks"}`
Expand All @@ -34,11 +41,43 @@ internal class ExportResponseLineTest {
log.debug("Parsing line {}", i)
val response = ExportResponseLine.deserialize(line)

assertThat(response.metric.name).isEqualTo("vm_blocks")
// assertThat(response.metricfullName).isEqualTo("""{"__name__":"vm_blocks","job":"vm","type":"indexdb","instance":"victoriametrics:8428"}""")
assertThat(response.values.size).isEqualTo(6)
assertThat(response.timestamps.size).isEqualTo(6)
}
}

@Test
fun testParse() {
val resource = javaClass.getResource("export.response.ndjson")!!
val content = resource.readText(StandardCharsets.UTF_8)
val json: String = content.lines()[0]
val mapper = ObjectMapper()
val read: ExportResponseLine3 = mapper.readValue(json, ExportResponseLine3::class.java)
}

data class ExportResponseLine3(
val metric: Metric3,
val values: DoubleArray,
val timestamps: LongArray
)

class Metric3Deserializer @JvmOverloads constructor(vc: Class<*>? = null) : StdDeserializer<Metric3>(vc) {
@Throws(IOException::class, JsonProcessingException::class)
override fun deserialize(jp: JsonParser, ctxt: DeserializationContext): Metric3 {
// val node: JsonNode = jp.codec.readTree(jp)
jp.currentToken.isStructStart
assert(jp.isExpectedStartObjectToken) { jp.currentToken }
jp.nextToken()
val fullName = String(jp.currentToken.asCharArray())
// val id = (node["id"] as IntNode).numberValue() as Int
// val itemName = node["itemName"].asText()
// val userId = (node["createdBy"] as IntNode).numberValue() as Int
return Metric3(fullName)
}
}


@JsonDeserialize(using = Metric3Deserializer::class)
data class Metric3(val fullName: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class QueryBuilderTest {
@Test
fun testGreaterThan() {
val range = Range.greaterThan(TimestampType.TIMESTAMP, 1572151210000L)
val url = test(range)
val url = test(range)[0]

assertThat(url.queryParameter("start")).isEqualTo("1572151210.001")
assertThat(url.queryParameter("end")).isNull()
Expand All @@ -43,7 +43,7 @@ class QueryBuilderTest {
@Test
fun testGreaterThanOrEqual() {
val range = Range.greaterThanOrEqual(TimestampType.TIMESTAMP, 1572151210000L)
val url = test(range)
val url = test(range)[0]

assertThat(url.queryParameter("start")).isEqualTo("1572151210.000")
assertThat(url.queryParameter("end")).isNull()
Expand All @@ -55,7 +55,7 @@ class QueryBuilderTest {
@Test
fun testLessThan() {
val range = Range.lessThan(TimestampType.TIMESTAMP, 1572151210000L)
val url = test(range)
val url = test(range)[0]

assertThat(url.queryParameter("start")).isNull()
assertThat(url.queryParameter("end")).isEqualTo("1572151209.999")
Expand All @@ -67,7 +67,7 @@ class QueryBuilderTest {
@Test
fun testLessThanOrEqual() {
val range = Range.lessThanOrEqual(TimestampType.TIMESTAMP, 1572151210000L)
val url = test(range)
val url = test(range)[0]

assertThat(url.queryParameter("start")).isNull()
assertThat(url.queryParameter("end")).isEqualTo("1572151210.000")
Expand All @@ -79,7 +79,7 @@ class QueryBuilderTest {
@Test
fun testBetween() {
val range = Range.range(TimestampType.TIMESTAMP, 1572151210646L, true, 1572154810000L, true)
val url = test(range)
val url = test(range)[0]

assertThat(url.queryParameter("start")).isEqualTo("1572151210.646")
assertThat(url.queryParameter("end")).isEqualTo("1572154810.000")
Expand All @@ -90,15 +90,17 @@ class QueryBuilderTest {
*/
@Test
fun testNotBetween() {
val range1 = Range.lessThan(TimestampType.TIMESTAMP, 1572151210646L)
val range2 = Range.greaterThan(TimestampType.TIMESTAMP, 1572151210646L)
val url = test(range1, range2)

assertThat(url.queryParameter("start")).isNull()
assertThat(url.queryParameter("end")).isNull()
val range1 = Range.lessThan(TimestampType.TIMESTAMP, 1572151210000L)
val range2 = Range.greaterThan(TimestampType.TIMESTAMP, 1572151210999L)
val urls = test(range1, range2)

assertThat(urls[0].queryParameter("start")).isNull()
assertThat(urls[0].queryParameter("end")).isEqualTo("1572151210.000")
assertThat(urls[1].queryParameter("start")).isEqualTo("1572151210.999")
assertThat(urls[1].queryParameter("end")).isNull()
}

private fun test(range: Range, vararg ranges: Range): HttpUrl {
private fun test(range: Range, vararg ranges: Range): List<HttpUrl> {
val timestampHandle: ColumnHandle = VmColumnHandle("timestamp")

val values = ValueSet.ofRanges(range, *ranges)
Expand All @@ -112,6 +114,6 @@ class QueryBuilderTest {
val constraint = TupleDomain.withColumnDomains(map)

val unit = QueryBuilder()
return unit.build(constraint)[0]
return unit.build(constraint)
}
}

0 comments on commit 2c8cb62

Please sign in to comment.