Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The client act in a synchronous manner when BatchProcessorr#queue is full. #688

Open
simarpreets opened this issue Jul 11, 2020 · 7 comments

Comments

@simarpreets
Copy link
Contributor

I went through the BatchProcessor implementation and noticed that when the batch processor is lagging behind and is not able to clear the queue, InfluxDB.write(Point p) becomes a blocking call even when one has configured it to write in an asynchronous manner.
This is happening because the queue instance is of type LinkedBlockingQueue and put(E e) is used for enqueueing which blocks the thread trying to enqueue.
IMO, this behaviour is very misleading, can block all the application threads who are trying to write whenever the write latencies on the influx-db server increases.
I am using the influxdb-java:2.15
Below is the sample Junit test to simulate this.



import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.junit.Test;

import java.io.IOException;

public class InfluxTest {


    InfluxDB influxDB;

    @Test
    public void testConnection() {

        connect();

        for (int i=0;i<5000; ++i) {

            long before = System.currentTimeMillis();
            influxDB.write(Point.measurement("test").addField("a-field", 0).build());
            long after = System.currentTimeMillis();
            System.out.println(String.format("Time taken for writing %d times was %d", i, after-before));
        }

    }

    private void connect() {

            String influxDbConnectionURL = "http://localhost:8086";

            influxDB = InfluxDBFactory.connect(influxDbConnectionURL, new OkHttpClient.Builder().addInterceptor(new Interceptor() {
                @Override
                public Response intercept(Chain chain) throws IOException {
                    try {
                        Thread.sleep(10000);
                        System.out.println("interacted with influx");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return chain.proceed(chain.request());
                }
            }));

            influxDB.query(new Query("CREATE DATABASE " + '"' + "test" + '"'));
            influxDB.setDatabase("test");

            influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(
                (failedPoints, throwable) -> {
                })
            );

    }

}



The default batch size is 1000, so the 2000th write call will be blocked for ~10000ms.

@majst01
Copy link
Collaborator

majst01 commented Jul 11, 2020

Correct, but what should be the desired behavior instead ?

@simarpreets
Copy link
Contributor Author

It should be configurable to allow choice of opting for queue.offer as well, so that the application threads are never blocked and the integration with influx is truly asynchronous.
In web-server applications, there might be scenarios where you want to send events to influx in every request. In those cases, higher latencies on influx-db server side can adversely affect the application performance and availability.

@majst01
Copy link
Collaborator

majst01 commented Jul 13, 2020

But what to do if the user configured not to block ?
If you are able to write a PR it would be awesome

@simarpreets
Copy link
Contributor Author

simarpreets commented Jul 13, 2020

But what to do if the user configured not to block ?

IMO, those events should be not be added to the queue and thus will be dropped.
But a provision can be added to plug in a custom handler for these scenarios, somewhere where one can afford to loose data, can just log failure or collect it in a metrics collector, others can handle it differently.

Also, let me see if I am able to raise a PR.

@simarpreets
Copy link
Contributor Author

@majst01 raised a PR for the same #689

@simarpreets
Copy link
Contributor Author

@majst01 when will we do a release including #689 ?

@majst01
Copy link
Collaborator

majst01 commented Jul 24, 2020

I have to ask the influxdb employees, because they own the maven repo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants