Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support http to kafka proxy using openapi.yaml and asyncapi.yaml #810

Merged
merged 183 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
183 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
9fe7a91
Merge branch 'aklivity:develop' into develop
akrambek Dec 11, 2023
7e3d237
Merge branch 'aklivity:develop' into develop
akrambek Dec 12, 2023
33c4411
Merge branch 'aklivity:develop' into develop
akrambek Dec 13, 2023
fe9e318
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
d8b5e5c
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
ebca7ef
Merge branch 'aklivity:develop' into develop
akrambek Dec 18, 2023
5e3e059
Merge branch 'aklivity:develop' into develop
akrambek Dec 22, 2023
ee71db9
Merge branch 'aklivity:develop' into develop
akrambek Dec 24, 2023
0b7a15a
Merge branch 'aklivity:develop' into develop
akrambek Dec 25, 2023
be13489
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
95df84c
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
3ebdbf5
Merge branch 'aklivity:develop' into develop
akrambek Dec 28, 2023
24ad9e1
Merge branch 'aklivity:develop' into develop
akrambek Dec 30, 2023
6d21fec
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
368a0a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
7069f1a
Merge branch 'aklivity:develop' into develop
akrambek Jan 2, 2024
09b7041
Merge branch 'aklivity:develop' into develop
akrambek Jan 3, 2024
98f1faa
Merge branch 'aklivity:develop' into develop
akrambek Jan 4, 2024
371391a
Merge branch 'aklivity:develop' into develop
akrambek Jan 5, 2024
c6a0882
Merge branch 'aklivity:develop' into develop
akrambek Jan 8, 2024
f99f009
Merge branch 'aklivity:develop' into develop
akrambek Jan 9, 2024
a110b68
Merge branch 'aklivity:develop' into develop
akrambek Jan 11, 2024
80c4625
Merge branch 'aklivity:develop' into develop
akrambek Jan 16, 2024
6617e20
Merge branch 'aklivity:develop' into develop
akrambek Jan 19, 2024
dea9f53
Merge branch 'aklivity:develop' into develop
akrambek Jan 20, 2024
b74db57
Merge branch 'aklivity:develop' into develop
akrambek Jan 23, 2024
65aad28
skeleton
bmaidics Jan 25, 2024
33be2a0
checkpoint
bmaidics Jan 26, 2024
a71c350
checkpoint
bmaidics Jan 30, 2024
4617b54
Merge branch 'aklivity:develop' into develop
akrambek Jan 30, 2024
db6997f
checkpoint
bmaidics Jan 30, 2024
b3b421d
Merge branch 'aklivity:develop' into develop
akrambek Jan 31, 2024
73d64b1
Merge branch 'aklivity:develop' into develop
akrambek Feb 1, 2024
7bb546e
Merge branch 'aklivity:develop' into develop
akrambek Feb 2, 2024
0574abe
checkpoint
bmaidics Feb 3, 2024
63136cd
checkpoint
bmaidics Feb 5, 2024
0833f4d
checkpoint
bmaidics Feb 6, 2024
ae38add
Checkpoint
bmaidics Feb 7, 2024
e8f93fc
check
bmaidics Feb 8, 2024
4984c85
Checkpoint
bmaidics Feb 8, 2024
046583b
some fixes
bmaidics Feb 8, 2024
b1c7901
Merge branch 'aklivity:develop' into develop
akrambek Feb 8, 2024
ffd9900
Merge remote-tracking branch 'upstream/develop' into asyncapi_mqtt
bmaidics Feb 9, 2024
a366947
checkpoint
bmaidics Feb 12, 2024
54aea97
Address review items
bmaidics Feb 12, 2024
51f039c
Merge remote-tracking branch 'upstream/develop' into asyncapi_mqtt
bmaidics Feb 12, 2024
ee96b7d
remove files
bmaidics Feb 12, 2024
949df2f
Merge branch 'aklivity:develop' into develop
akrambek Feb 13, 2024
504af3e
Reviews
bmaidics Feb 13, 2024
3ca1ffb
Add dependency to docker
bmaidics Feb 13, 2024
084c7ff
Fix
bmaidics Feb 13, 2024
24a2897
Fixes
bmaidics Feb 14, 2024
ca946b8
Merge branch 'aklivity:develop' into develop
akrambek Feb 14, 2024
f7b2b50
Fix pom
bmaidics Feb 15, 2024
d13162d
Add asyncapi schema validation
bmaidics Feb 16, 2024
45949d7
first commit
bmaidics Feb 13, 2024
4055367
checkpoint
bmaidics Feb 14, 2024
ab63936
REVERT BEFORE MERGE: Akram's HTTP changes
bmaidics Feb 14, 2024
6641539
checkpoint
bmaidics Feb 14, 2024
f7a1168
REVERT THIS AS WELL
bmaidics Feb 14, 2024
b4c005e
checkpoint
bmaidics Feb 15, 2024
932d6de
Fix tests
bmaidics Feb 15, 2024
b6bc905
Test http
bmaidics Feb 15, 2024
079c781
Add schema, builders
bmaidics Feb 16, 2024
87d8ae6
Merge remote-tracking branch 'upstream/develop' into asyncapi_mqtt
bmaidics Feb 16, 2024
e484222
first commit
bmaidics Feb 13, 2024
607ea1a
checkpoint
bmaidics Feb 14, 2024
cc72a7a
REVERT BEFORE MERGE: Akram's HTTP changes
bmaidics Feb 14, 2024
6f3b371
checkpoint
bmaidics Feb 14, 2024
3f9c72f
REVERT THIS AS WELL
bmaidics Feb 14, 2024
3ad70d6
checkpoint
bmaidics Feb 15, 2024
8a8aae2
Fix tests
bmaidics Feb 15, 2024
12f7619
Test http
bmaidics Feb 15, 2024
ab2b1ca
Client done?
bmaidics Feb 19, 2024
d196b99
Start proxy work
bmaidics Feb 19, 2024
fdad0c5
checkpoint
bmaidics Feb 20, 2024
049e595
Review items
bmaidics Feb 20, 2024
3267a92
Merge branch 'asyncapi_mqtt' into http_asyncapi
bmaidics Feb 20, 2024
0a2fb4e
Merge fixes
bmaidics Feb 20, 2024
94d6296
Merge branch 'http_asyncapi' into kafka_asyncapi
bmaidics Feb 20, 2024
f0280d2
checkpoint
bmaidics Feb 20, 2024
7d05a57
Merge remote-tracking branch 'upstream/develop' into asyncapi_mqtt
bmaidics Feb 21, 2024
4d59803
Use binding.qvault
bmaidics Feb 21, 2024
dd71f47
Merge branch 'asyncapi_mqtt' into http_asyncapi
bmaidics Feb 21, 2024
18acd92
Review fixes
bmaidics Feb 21, 2024
9056e04
Add schema, builders
bmaidics Feb 16, 2024
3df787a
Client done?
bmaidics Feb 19, 2024
8ca3a72
Checkpoint
bmaidics Feb 21, 2024
f9dcd75
Merge branch 'aklivity:develop' into develop
akrambek Feb 21, 2024
b79279e
feedbacks
bmaidics Feb 22, 2024
d99bb51
Merge remote-tracking branch 'upstream/develop' into http_asyncapi
bmaidics Feb 22, 2024
285f1d3
Merge branch 'http_asyncapi' into kafka_client_asyncapi
bmaidics Feb 22, 2024
d694e5b
Fix
bmaidics Feb 22, 2024
251dc7a
Merge branch 'kafka_client_asyncapi' into kafka_asyncapi
bmaidics Feb 22, 2024
e1e5e75
Merge branch 'aklivity:develop' into develop
akrambek Feb 22, 2024
df4ea30
check
bmaidics Feb 23, 2024
8d9d1db
check2
bmaidics Feb 23, 2024
5f50549
Merge branch 'aklivity:develop' into develop
akrambek Feb 23, 2024
b03b610
check
bmaidics Feb 26, 2024
f5d5f63
Dsign correct
bmaidics Feb 26, 2024
5c9faed
Merge remote-tracking branch 'upstream/develop' into kafka_asyncapi
bmaidics Feb 27, 2024
dd4844e
checkpoint
bmaidics Feb 27, 2024
cf22f32
Tests are passing
bmaidics Feb 27, 2024
b4daae9
Fixes
bmaidics Feb 27, 2024
727ff82
fix
bmaidics Feb 27, 2024
194da54
Merge remote-tracking branch 'upstream/develop' into kafka_asyncapi
bmaidics Feb 28, 2024
32725be
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
09f5195
Add tests
bmaidics Feb 28, 2024
e695537
Add bootstrap configs
bmaidics Feb 28, 2024
83b145a
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
096303f
wip
akrambek Feb 3, 2024
c713f50
wip
akrambek Feb 3, 2024
538bc88
WIP
akrambek Feb 4, 2024
70e56cd
WIP
akrambek Feb 5, 2024
b7fc711
WIP
akrambek Feb 5, 2024
a30a121
WIP
akrambek Feb 6, 2024
7bb0cce
Refactor
akrambek Feb 6, 2024
866b5d7
WIP
akrambek Feb 6, 2024
952cc5b
WIP
akrambek Feb 7, 2024
aa13f7c
WIP
akrambek Feb 7, 2024
6f57ce7
WIP
akrambek Feb 7, 2024
c8604ac
WIP
akrambek Feb 8, 2024
35ce514
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
450d77b
Revert back the change
akrambek Feb 8, 2024
22eee59
WIP
akrambek Feb 8, 2024
dee3e26
tls and authorization support
akrambek Feb 8, 2024
24a7baf
Support guard and tls
akrambek Feb 10, 2024
4e24b99
Apply feedback from PR
akrambek Feb 12, 2024
92d9ce7
WIP
akrambek Feb 12, 2024
17c4068
Checkpoint
akrambek Feb 13, 2024
c7829b6
WIP
akrambek Feb 13, 2024
6603e2d
Remove view from opens
akrambek Feb 13, 2024
cccaf8a
Address PR feedbacks
akrambek Feb 13, 2024
95802cf
Fix typo
akrambek Feb 13, 2024
cbfb1db
WIP
akrambek Feb 21, 2024
491b214
WIP
akrambek Feb 22, 2024
381ea41
parent 491b214bb7b83eb9186b26bbea6dd8370fdc971f
akrambek Feb 22, 2024
6df61e6
Merge branch 'develop' into feature/742
akrambek Feb 28, 2024
eb15b14
Revert back change
akrambek Feb 28, 2024
2331069
User parser
akrambek Feb 28, 2024
2724fa3
WIP
akrambek Feb 29, 2024
b60cb26
WIP
akrambek Feb 29, 2024
cbfc080
WIP
akrambek Feb 29, 2024
7b36c26
Revert back changes
akrambek Feb 29, 2024
23d84db
Fix typo
akrambek Feb 29, 2024
ee3266f
Revert "Fix typo"
akrambek Feb 29, 2024
0d30e96
Revert "Fix typo"
akrambek Feb 29, 2024
92fb495
WIP
akrambek Feb 29, 2024
5876c70
Remove unnecessary dependency
akrambek Feb 29, 2024
26c9abf
Fix number
akrambek Feb 29, 2024
b9ec42e
Update copyright
akrambek Feb 29, 2024
1b78750
update headers
akrambek Feb 29, 2024
75e7709
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
b81a140
Merge branch 'develop' into feature/742
akrambek Feb 29, 2024
1f0050a
Remove extra line
akrambek Feb 29, 2024
57d1d93
Revert back change
akrambek Feb 29, 2024
0691041
Remove extra line
akrambek Feb 29, 2024
e953a3d
Fix apiID
akrambek Feb 29, 2024
6f1ba51
Fix apiId
akrambek Feb 29, 2024
d4c3117
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
346869b
Merge branch 'develop' into feature/742
akrambek Feb 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>binding-openapi-asyncapi</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>binding-echo</artifactId>
Expand Down
1 change: 1 addition & 0 deletions cloud/docker-image/src/main/docker/zpm.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"io.aklivity.zilla:binding-mqtt",
"io.aklivity.zilla:binding-mqtt-kafka",
"io.aklivity.zilla:binding-openapi",
"io.aklivity.zilla:binding-openapi-asyncapi",
"io.aklivity.zilla:binding-proxy",
"io.aklivity.zilla:binding-sse",
"io.aklivity.zilla:binding-sse-kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,34 @@ defaultContentType: application/json
channels:
pets:
address: /pets
messages:
pet:
$ref: '#/components/messages/pet'
showPetById:
address: /pets/{id}
messages:
pet:
$ref: '#/components/messages/pet'

operations:
createPet:
action: send
bindings:
http:
type: request
method: POST
channel:
$ref: '#/channels/pets'
listPets:
action: receive
bindings:
http:
type: request
method: GET
channel:
$ref: '#/channels/pets'
getPets:
action: receive
bindings:
http:
type: request
method: GET
query:
type: object
Expand All @@ -68,3 +71,26 @@ components:
correlationIds:
petsCorrelationId:
location: '$message.header#/idempotency-key'
schemas:
petPayload:
type: object
properties:
id:
type: integer
minimum: 0
description: Pet id.
name:
type: string
description: Pet name.
tag:
type: string
description: Tag.
messages:
pet:
name: Pet
title: Pet
summary: >-
Inform about Pet.
contentType: application/json
payload:
$ref: '#/components/schemas/petPayload'
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ connect "zilla://streams/asyncapi0"

write zilla:begin.ext ${asyncapi:beginEx()
.typeId(zilla:id("asyncapi"))
.apiId(3833148448)
.apiId(759838734)
.operationId("createPet")
.extension(http:beginEx()
.typeId(zilla:id("http"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ accepted

read zilla:begin.ext ${asyncapi:matchBeginEx()
.typeId(zilla:id("asyncapi"))
.apiId(3833148448)
.apiId(759838734)
.operationId("createPet")
.extension(http:beginEx()
.typeId(zilla:id("http"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void shouldMatchAsyncapiBeginExtensionOnly() throws Exception
{
BytesMatcher matcher = AsyncapiFunctions.matchBeginEx()
.typeId(0x00)
.apiId(1L)
.extension(new byte[] {1})
.build();

Expand All @@ -71,6 +72,7 @@ public void shouldMatchAsyncapiBeginExtensionOnly() throws Exception

new AsyncapiBeginExFW.Builder().wrap(new UnsafeBuffer(byteBuf), 0, byteBuf.capacity())
.typeId(0x00)
.apiId(1L)
.extension(new OctetsFW.Builder().wrap(writeBuffer, 0, 1).set(new byte[] {1}).build())
.build();

Expand All @@ -82,7 +84,7 @@ public void shouldMatchAsyncapiBeginExtension() throws Exception
{
BytesMatcher matcher = AsyncapiFunctions.matchBeginEx()
.typeId(0x00)
.apiId(1)
.apiId(1L)
.operationId("operationId")
.extension(new byte[] {1})
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.config;

import static java.util.Collections.unmodifiableMap;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.io.InputStream;
import java.io.StringReader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;
import jakarta.json.bind.Jsonb;
import jakarta.json.bind.JsonbBuilder;

import org.agrona.collections.Object2ObjectHashMap;
import org.leadpony.justify.api.JsonSchema;
import org.leadpony.justify.api.JsonValidationService;
import org.leadpony.justify.api.ProblemHandler;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.AsyncapiBinding;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.engine.config.ConfigException;

public class AsyncapiParser
{
private final Map<String, JsonSchema> schemas;

public AsyncapiParser()
{
Map<String, JsonSchema> schemas = new Object2ObjectHashMap<>();
schemas.put("2.6.0", schema("2.6.0"));
schemas.put("3.0.0", schema("3.0.0"));
this.schemas = unmodifiableMap(schemas);
}

public Asyncapi parse(
String asyncapiText)
{
Asyncapi asyncapi = null;

List<Exception> errors = new LinkedList<>();

try
{
String asyncApiVersion = detectAsyncApiVersion(asyncapiText);

JsonValidationService service = JsonValidationService.newInstance();
ProblemHandler handler = service.createProblemPrinter(msg -> errors.add(new ConfigException(msg)));
JsonSchema schema = schemas.get(asyncApiVersion);

service.createReader(new StringReader(asyncapiText), schema, handler).read();

Jsonb jsonb = JsonbBuilder.create();

asyncapi = jsonb.fromJson(asyncapiText, Asyncapi.class);
}
catch (Exception ex)
{
errors.add(ex);
}

if (!errors.isEmpty())
{
Exception ex = errors.remove(0);
errors.forEach(ex::addSuppressed);
rethrowUnchecked(ex);
}

return asyncapi;
}

private JsonSchema schema(
String version)
{
InputStream schemaInput = null;

if (version.startsWith("2.6"))
{
schemaInput = AsyncapiBinding.class.getResourceAsStream("schema/asyncapi.2.6.schema.json");
}
else if (version.startsWith("3.0"))
{
schemaInput = AsyncapiBinding.class.getResourceAsStream("schema/asyncapi.3.0.schema.json");
}

JsonValidationService service = JsonValidationService.newInstance();

return service.createSchemaReaderFactoryBuilder()
.withSpecVersionDetection(true)
.build()
.createSchemaReader(schemaInput)
.read();
}

private String detectAsyncApiVersion(
String openapiText)
{
try (JsonReader reader = Json.createReader(new StringReader(openapiText)))
{
JsonObject json = reader.readObject();
if (json.containsKey("asyncapi"))
{
return json.getString("asyncapi");
}
else
{
throw new IllegalArgumentException("Unable to determine AsyncApi version.");
}
}
catch (Exception e)
{
throw new RuntimeException("Error reading AsyncApi document.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public <C> BindingConfigBuilder<C> injectMqttKafkaRoutes(
{
break inject;
}

final AsyncapiOperation whenOperation = mqttAsyncapi.operations.get(condition.operationId);
final AsyncapiChannelView channel = AsyncapiChannelView.of(mqttAsyncapi.channels, whenOperation.channel);
final MqttKafkaConditionKind kind = whenOperation.action.equals(ASYNCAPI_SEND_ACTION_NAME) ?
Expand Down
Loading