Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Zeta] Fix worker node metrics acquisition #7862

Merged
merged 8 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

hazelcast:
cluster-name: seatunnel
network:
rest-api:
enabled: true
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true
member-list:
- secondServer
- server
port:
auto-increment: true
port-count: 100
port: 5801
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 50
hazelcast.heartbeat.failuredetector.type: phi-accrual
hazelcast.heartbeat.interval.seconds: 2
hazelcast.max.no.heartbeat.seconds: 180
hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

hazelcast:
cluster-name: seatunnel
network:
rest-api:
enabled: true
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true
member-list:
- secondServer
- server
port:
auto-increment: true
port-count: 100
port: 5801
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 50
hazelcast.heartbeat.failuredetector.type: phi-accrual
hazelcast.heartbeat.interval.seconds: 2
hazelcast.max.no.heartbeat.seconds: 180
hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# JVM Heap
-Xms2g
-Xmx2g

# JVM Dump
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server

# Only used for test!!! We should make sure soft reference be collected ASAP
-XX:SoftRefLRUPolicyMSPerMB=1
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# JVM Heap
-Xms2g
-Xmx2g

# JVM Dump
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server

# Only used for test!!! We should make sure soft reference be collected ASAP
-XX:SoftRefLRUPolicyMSPerMB=1
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

seatunnel:
engine:
history-job-expire-minutes: 1440
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 10
slot-service:
dynamic-slot: true
checkpoint:
interval: 300000
timeout: 100000
storage:
type: localfile
max-retained: 3
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot/
http:
enable-http: true
port: 8080
telemetry:
metric:
enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
parallelism = 1
result_table_name = "fake"
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
c_row = {
c_map = "map<string, map<string, string>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
}

transform {
}

sink {
console {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,39 +79,21 @@ public static HazelcastInstanceImpl createMasterAndWorkerHazelcastInstance(
seaTunnelConfig
.getEngineConfig()
.setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
return ((HazelcastInstanceProxy)
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
HazelcastInstanceFactory.createInstanceName(
seaTunnelConfig.getHazelcastConfig()),
new SeaTunnelNodeContext(seaTunnelConfig)))
.getOriginal();
return initializeHazelcastInstance(seaTunnelConfig, null);
}

public static HazelcastInstanceImpl createMasterHazelcastInstance(
@NonNull SeaTunnelConfig seaTunnelConfig) {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
return ((HazelcastInstanceProxy)
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
HazelcastInstanceFactory.createInstanceName(
seaTunnelConfig.getHazelcastConfig()),
new SeaTunnelNodeContext(seaTunnelConfig)))
.getOriginal();
return initializeHazelcastInstance(seaTunnelConfig, null);
}

public static HazelcastInstanceImpl createWorkerHazelcastInstance(
@NonNull SeaTunnelConfig seaTunnelConfig) {
seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
// in hazelcast lite node will not store IMap data.
seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
return ((HazelcastInstanceProxy)
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
HazelcastInstanceFactory.createInstanceName(
seaTunnelConfig.getHazelcastConfig()),
new SeaTunnelNodeContext(seaTunnelConfig)))
.getOriginal();
return initializeHazelcastInstance(seaTunnelConfig, null);
}

public static HazelcastInstanceImpl createHazelcastInstance() {
Expand Down
Loading
Loading