RMLStreamer generates RDF from files or data streams using RML. The difference with other RML implementations is that it can handle big input files and continuous data streams, like sensor data.
Documentation regarding the use of (custom) functions can be found here.
- Download
RMLStreamer-<version>-standalone.jar
from the latest release. - Run it as
$ java -jar RMLStreamer-<version>-standalone.jar <commands and options>
See Basic commands (where you replace $FLINK_BIN run <path to RMLStreamer jar>
with java -jar RMLStreamer-<version>-standalone.jar
)
and Complete RMLStreamer usage for
examples, possible commands and options.
This runs the stand-alone version of RMLStreamer in a Docker container. This is a good way to quickly test things or run RMLStreamer on a single machine, but you don't have the features of a Flink cluster set-up (distributed, failover, checkpointing). If you need those features, see docker/README.md.
$ docker run -v $PWD:/data --rm rmlio/rmlstreamer toFile -m /data/mapping.ttl -o /data/output
This option builds RMLStreamer from source and puts that build into a Docker container ready to run. The main purpose is to have a one-time job image.
$ ./buildDocker.sh
If the build succeeds, you can invoke it as follows. If you go to the directory where your data and mappings are, you can run something like (change tag to appropriate version):
$ docker run -v $PWD:/data --rm rmlstreamer:v2.5.1-SNAPSHOT toFile -m /data/mapping.ttl -o /data/output.ttl
There are more options for the script, if you want to use specific tags or push to Docker Hub:
$ ./buildDocker.sh -h
Build and push Docker images for RMLStreamer
buildDocker.sh [-h]
buildDocker.sh [-a][-n][-p][-u <username>][-v <version>]
options:
-a Build for platforms linux/arm64 and linux/amd64. Default: perform a standard 'docker build'
-h Print this help and exit.
-n Do NOT (re)build RMLStreamer before building the Docker image. This is risky because the Docker build needs a stand-alone version of RMLStreamer.
-u <username> Add an username name to the tag name as on Docker Hub, like <username>/rmlstreamer:<version>.
-p Push to Docker Hub repo. You must be logged in for this to succeed.
-v <version> Override the version in the tag name, like <username>/rmlstreamer:<version>. If not given, use the current version found in pom.xml.
If you want to get RMLStreamer up and running within 5 minutes using Docker, check out docker/README.md
If you want to deploy it yourself, read on.
If you want to develop, read these instructions.
RMLStreamer runs its jobs on Flink clusters. More information on how to install Flink and getting started can be found here. At least a local cluster must be running in order to start executing RML Mappings with RMLStreamer. Please note that this version works with Flink 1.14.5 with Scala 2.11 support, which can be downloaded here.
Download RMLStreamer-<version>.jar
from the latest release.
In order to build a jar file that can be deployed on a Flink cluster, you need:
- a Java JDK >= 11 and <= 13 (We develop and test on JDK 11)
- Apache Maven 3 or higher
Clone or download and then build the code in this repository:
$ git clone https://github.com/RMLio/RMLStreamer.git
$ cd RMLStreamer
and then run:
$ mvn -DskipTests clean package
-DskipTests
just builds and packages without running tests. If you want to run the tests, just omit this parameter.
clean
cleans any cached builds before packaging. While not strictly necessary, it is considered good practice to do
so.
The resulting RMLStreamer-<version>.jar
, found in the target
folder, can be deployed on a Flink cluster.
Note: To build a stand-alone RMLStreamer jar, add -P 'stand-alone'
to the build command, e.g.:
$ mvn clean package -DskipTests -P 'stand-alone'
Note: If you want to update the version of RMLStreamer (e.g. when developing or releasing), run the script
change-version.sh <your-new-version>
. It updates the version on relevant places in the repository.
This section assumes the use of a CLI. If you want to use Flink's web interface, check out this section in the Docker README.
Here we give examples for running RMLStreamer from the command line. We use FLINK_BIN
to denote the Flink CLI tool,
usually found in the bin
directory of the Flink installation. E.g. /home/myuser/flink-1.14.0/bin/flink
.
For Windows a flink.bat
script is provided.
The general usage is:
$ FLINK_BIN run [Flink options] -c io.rml.framework.Main <path to RMLStreamer jar> [toFile|toKafka|toTCPSocket] [options]
FLINK HOME
| The path to the provided Flink CLI script.
Flink options | Options to the Flink run script. Example: -p 4
sets the parallelism
to 4.
-c io.rml.framework.Main
| This is the application class of RMLStreamer.
Path to RMLStreamer jar | The absolute path to the RMLStreamer jar file.
RMLStreamer options | The actual program arguments for RMLStreamer. See below for a full list.
# write output to file(s)
$FLINK_BIN run <path to RMLStreamer jar> toFile --mapping-file <path to mapping file> --output-path <path to output file>
# write output to a listening socket (only if logical source(s) are streams)
$FLINK_BIN run <path to RMLStreamer jar> toTCPSocket --output-socket <host:port>
# write output to kafka topic (only if logical source(s) are streams)
$FLINK_BIN run <path to RMLStreamer jar> toKafka --broker-list <host:port> --topic <topic name>
Usage: RMLStreamer [toFile|toKafka|toTCPSocket|toMQTT|noOutput] [options]
-j, --job-name <job name>
The name to assign to the job on the Flink cluster. Put some semantics in here ;)
-i, --base-iri <base IRI>
The base IRI as defined in the R2RML spec.
--disable-local-parallel
By default input records are spread over the available task slots within a task manager to optimise parallel processing,at the cost of losing the order of the records throughout the process. This option disables this behaviour to guarantee that the output order is the same as the input order.
-p, --parallelism <task slots>
Sets the maximum operator parallelism (~nr of task slots used)
-m, --mapping-file <RML mapping file>
REQUIRED. The path to an RML mapping file. The path must be accessible on the Flink cluster.
--json-ld Write the output as JSON-LD instead of N-Quads. An object contains all RDF generated from one input record. Note: this is slower than using the default N-Quads format.
--bulk Write all triples generated from one input record at once, instead of writing triples the moment they are generated.
--checkpoint-interval <time (ms)>
If given, Flink's checkpointing is enabled with the given interval. If not given, checkpointing is enabled when writing to a file (this is required to use the flink StreamingFileSink). Otherwise, checkpointing is disabled.
--auto-watermark-interval <time (ms)>
If given, Flink's watermarking will be generated periodically with the given interval. If not given, a default value of 50ms will be used.This option is only valid for DataStreams.
-f, --function-descriptions <function description location 1>,<function description location 2>...
An optional list of paths to function description files (in RDF using FnO). A path can be a file location or a URL.
Command: toFile [options]
Write output to file
Note: when the mapping consists only of stream triple maps, a StreamingFileSink is used. This sink will write the output to a part file at every checkpoint.
-o, --output-path <output file>
The path to an output file. Note: when a StreamingFileSink is used (the mapping consists only of stream triple maps), this path specifies a directory and optionally an extension. Part files will be written to the given directory and the given extension will be used for each part file.
Command: toKafka [options]
Write output to a Kafka topic
-b, --broker-list <host:port>[,<host:port>]...
A comma separated list of Kafka brokers.
-t, --topic <topic name>
The name of the Kafka topic to write output to.
--partition-id <id> EXPERIMENTAL. The partition id of kafka topic to which the output will be written to.
Command: toTCPSocket [options]
Write output to a TCP socket
-s, --output-socket <host:port>
The TCP socket to write to.
Command: toMQTT [options]
Write output to an MQTT topic
-b, --broker <host:port>
The MQTT broker.
-t, --topic <topic name>
The name of the MQTT topic to write output to.
Command: noOutput
Do everything, but discard output
An example of how to define the generation of an RDF stream from a stream in an RML Mapping via TCP.
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost";
rmls:port "5005"
];
rml:referenceFormulation ql:JSONPath;
];
rr:subjectMap [
rml:reference "$.id";
rr:termType rr:IRI;
rr:class skos:Concept
];
rr:predicateObjectMap [
rr:predicateMap [
rr:constant dcterms:title;
rr:termType rr:IRI
];
rr:objectMap [
rml:reference "$.id";
rr:termType rr:Literal
]
].
The RML Mapping above can be executed as follows:
The input and output in the RML Framework are both TCP clients when streaming. Before running stream mappings the input and output ports must be listened to by an application. For testing purposes the following commands can be used:
$ nc -lk 5005 # This will start listening for input connections at port 5005
$ nc -lk 9000 # This will start listening for output connections at port 9000
# This is for testing purposes, your own application needs to start listening to the input and output ports.
Once the input and output ports are listened to by applications or by the above commands, the RML Mapping can be executed. RMLStreamer will open the input and output sockets so it can act upon data that will be written to the input socket.
$FLINK_BIN run <path to RMLStreamer jar> toTCPSocket -s localhost:9000 -m .../framework/src/main/resources/json_stream_data_mapping.ttl
# The -m paramater sets the mapping file location
# The -s parameter sets the output socket port number
Whenever data is written (every data object needs to end with \n
) to the socket, this data will be processed by the RML Framework.
An example of how to define the generation of an RDF stream from a stream in an RML Mapping via Kafka.
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source [
rdf:type rmls:KafkaStream ;
rmls:broker "localhost:9092" ;
rmls:groupId "groupId";
rmls:topic "topic";
];
rml:referenceFormulation ql:JSONPath;
];
Note on using Kafka with Flink: As a consumer, the Flink Kafka client never subscribes to a topic, but it is
assigned to a topic/partition (even if you declare it to be in a consumer group with the rmls:groupId
predicate). This means that it doesn't do
anything with the concept "consumer group", except for committing offsets. This means that load is not spread across
RMLStreamer jobs running in the same consumer group. Instead, each RMLStreamer job is assigned a partition.
This has some consequences:
- When you add multiple RMLStreamer jobs in a consumer group, and the topic it listens to has one partition, only one instance will get the input.
- If there are multiple partitions in the topic and multiple RMLStreamer jobs, it could be that two (or more) jobs are assigned a certain partition, resulting in duplicate output.
See also https://stackoverflow.com/questions/38639019/flink-kafka-consumer-groupid-not-working .
The only option for spreading load is to use multiple topics, and assign one RMLStreamer job to one topic.
<#TripleMap>
a rr:TriplesMap;
rml:logicalSource [
rml:source "/home/wmaroy/github/rml-framework/akka-pipeline/src/main/resources/io/rml/framework/data/books_small.json";
rml:referenceFormulation ql:JSONPath;
rml:iterator "$.store.books"
];
rr:subjectMap [
rml:reference "id";
rr:termType rr:IRI;
rr:class skos:Concept
];
rr:predicateObjectMap [
rr:predicateMap [
rr:constant dcterms:title;
rr:termType rr:IRI
];
rr:objectMap [
rml:reference "id";
rr:termType rr:Literal
]
] .
RMLStreamer supports relational databases as a logical source. JDBC is used to establish a connection and perform a query against a database. See example mapping below.
<TriplesMap1>
a rr:TriplesMap;
rml:logicalSource [
rml:source <#DB_source>;
rr:sqlVersion rr:SQL2008;
rr:tableName "country_info";
];
rr:subjectMap [ rr:template "http://example.com/{Country Code}/{Name}" ];
rr:predicateObjectMap [
rr:predicate ex:name ;
rr:objectMap [ rml:reference "Name" ]
] .
<#DB_source> a d2rq:Database;
d2rq:jdbcDSN "CONNECTIONDSN";
d2rq:jdbcDriver "org.postgresql.Driver";
d2rq:username "postgres";
d2rq:password "" .
Namespace: http://semweb.mmlab.be/ns/rmls#
The RML vocabulary have been extended with rmls to support streaming logical sources. The following are the classes/terms currently used:
-
rmls:[stream type]
- rmls:TCPSocketStream specifies that the logical source will be a tcp socket stream.
- rmls:FileStream specifies that the logical source will be a file stream (to be implemented).
- rmls:KafkaStream specifies that the logical source will be a kafka stream.
-
rmls:hostName specifies the desired host name of the server, from where data will be streamed from.
-
rmls:port specifies a port number for the stream mapper to connect to.
Example of a valid json logical source map using all possible terms:
rml:logicalSource [
rml:source [
rdf:type rmls:TCPSocketStream ;
rmls:hostName "localhost";
rmls:port "5005"
];
rml:referenceFormulation ql:JSONPath;
];
RMLStreamer uses Flink's Log4j 2 system. It can be configured in
$FLINK_HOME/conf/log4j.properties
.
To adjust the log level for RMLStreamer specifically, add two lines like this:
logger.rmlstreamer.name = io.rml.framework
logger.rmlstreamer.level = DEBUG
RMLStreamer is benchmarked with this repo.
References (preprint)
[1] S. Min Oo, G. Haesendonck, B. De Meester, A. Dimou. RMLStreamer - an RDF stream generator from streaming heterogeneous data. The Semantic Web – ISWC 2022. Springer International Publishing, (2022)