Skip to content

Commit

Permalink
test commit
Browse files Browse the repository at this point in the history
  • Loading branch information
JayajP committed Dec 14, 2023
1 parent 54c1894 commit 08f8840
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.DirtyState;
import org.apache.beam.runners.core.metrics.MetricCell;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricName;

/**
* Version of {@link CounterCell} supporting multi-thread safe mutations and extraction of delta
* values.
*/
public class RemoveSafeDeltaCounterCell implements Counter, MetricCell<Long> {

private final MetricName metricName;
private final ConcurrentHashMap<MetricName, Long> countersMap;

public RemoveSafeDeltaCounterCell(MetricName metricName, ConcurrentHashMap<MetricName, Long> countersMap) {
this.metricName = metricName;
this.countersMap = countersMap;
}

@Override
public void reset() {
countersMap.computeIfPresent(name, (unusedName, unusedValue) -> 0L);
}

@Override
public void inc(long n) {
countersMap.compute(name, (name, value) -> {
if (value == null) {
return n;
}
return value + n;
});
}

@Override
public void inc() {
inc(1);
}

@Override
public void dec() {
inc(-1);
}

@Override
public void dec(long n) {
inc(-1 * n);
}

@Override
public MetricName getName() {
return metricName;
}

@Override
public DirtyState getDirty() {
throw new UnsupportedOperationException(
String.format("%s doesn't support the getDirty", getClass().getSimpleName()));
}

@Override
public Long getCumulative() {
throw new UnsupportedOperationException("getCumulative is not supported by Streaming Metrics");
}

public void deleteIfZero() {
if (metricName == null) {
return;
}
countersMap.remove(metricName, 0L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.dataflow.worker;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.is;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.beam.sdk.metrics.MetricName;

/** Tests for {@link RemoveSafeDeltaCounterCell}. */
@RunWith(JUnit4.class)
public class RemoveSafeDeltaCounterCellTest {

@Test
public void testRemoveSafeDeltaCounterCell_safeDeletes() throws Exception {
ConcurrentHashMap<MetricName, Long> metric_map = new ConcurrentHashMap<>();
MetricName metric1 = MetricName.named("namespace", "name_1");

RemoveSafeDeltaCounterCell cell_1 = new RemoveSafeDeltaCounterCell(metric1, metric_map);
RemoveSafeDeltaCounterCell cell_2 = new RemoveSafeDeltaCounterCell(metric1, metric_map);

cell_2.inc(10);
assertThat(metric_map.get(metric1), equalTo(10L));

cell_2.reset();
assertThat(metric_map.get(metric1), equalTo(0L));
cell_2.deleteIfZero();

assertThat(metric_map.get(metric1), is(nullValue()));

// Incrementing a deleted counter will recreate it.
cell_1.inc(20);
assertThat(metric_map.get(metric1), equalTo(20L));
}

}

0 comments on commit 08f8840

Please sign in to comment.