Skip to content

40CoderPlus/flink-sql-gateway-client

Repository files navigation

Notice

Default Open API specification version is v2. See in Flink SQL Gateway REST API

Some Changes in Open API v1 specification,Open API v2 specification

  1. session_handle in path parameter change to type UUID;
  2. operation_handle in path parameter change to type UUID;

Build

Use OpenAPI Generator to generation client SDKs.

  • Version V2
gradle clean
gradle openApiGenerate
gradle build -x test
  • Version V1
gradle clean
gradle buildV1SDK
gradle build -x test

Example

DefaultApi api = new DefaultApi(new ApiClient());
OpenSessionResponseBody response = api.openSession(new OpenSessionRequestBody()
    .putPropertiesItem("execution.target", "yarn-session")
    .putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.enabled", "true")
    .putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2")
    .putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm1", "yarn01")
    .putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm2", "yarn01")
    .putPropertiesItem("flink.hadoop.yarn.resourcemanager.cluster-id", "yarn-cluster")
    .putPropertiesItem(
            "flink.hadoop.yarn.client.failover-proxy-provider",
            "org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider")
    .putPropertiesItem("yarn.application.id", "application_1667789375191_XXXX"));
System.out.println(response.getSessionHandle());
    
ExecuteStatementResponseBody executeStatementResponseBody = api.executeStatement(
    UUID.fromString(response.getSessionHandle()),
    new ExecuteStatementRequestBody()
            .statement("select 1")
            .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway SDK Example"));
System.out.println(executeStatementResponseBody.getOperationHandle());

See more in FlinkSqlGatewayExample.

Easy Builder

Build Flink SQl Gateway Rest API in fluent api.

Build session request body

SessionRequestBuilder.yarn()
    .sessionName("ha yarn session")
    .applicationId("application_1667789375191_XXXX")
    .savepoint("hdfs://fortycoderplus/flink/savepoints/savepoint-c12453-134defccc7c1")
    .ha()
        .clusterId("fortycoderplus")
        .rmIds("yarn1,yarn2")
        .hostnameX("yarn1", "yarn1.fortycoderplus")
        .hostnameX("yarn2", "yarn2.fortycoderplus")
        .failoverProxyProvider()
        .webappAddressX("yarn1", "http://yarn1.fortycoderplus:8080")
        .webappAddressX("yarn2", "http://yarn2.fortycoderplus:8080")

See more example in SessionRequestBuilderTest

Build execute statement request body

OperationRequestBuilder.builder()
    .pipelineName("test")
    .streaming()
        .maxConcurrentCheckpoints(1)
        .storage()
        .backend()
        .noExternalizedCheckpoints()
    .statement("select * from kafka_table")
    .build();

See more example in OperationRequestBuilderTest

Warning

See issue FLINK-29881.

FetchResultsResponseBody incompatible with the true response of Flink SQL Gateway Fetch results of Operation API /sessions/{session_handle}/operations/{operation_handle}/result/{token}.