) ois.readObject();
try {
state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
new file mode 100644
index 0000000000000..c07a0697e8f18
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics;
+
+import com.codahale.metrics.Metric;
+
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+
+/**
+ * An adapter between the {@link NamedAggregators} and codahale's {@link Metric}
+ * interface.
+ */
+public class AggregatorMetric implements Metric {
+
+ private final NamedAggregators namedAggregators;
+
+ private AggregatorMetric(final NamedAggregators namedAggregators) {
+ this.namedAggregators = namedAggregators;
+ }
+
+ public static AggregatorMetric of(final NamedAggregators namedAggregators) {
+ return new AggregatorMetric(namedAggregators);
+ }
+
+ NamedAggregators getNamedAggregators() {
+ return namedAggregators;
+ }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
new file mode 100644
index 0000000000000..0658e049a999e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.spark.metrics.source.Source;
+
+/**
+ * A Spark {@link Source} that is tailored to expose an {@link AggregatorMetric},
+ * wrapping an underlying {@link NamedAggregators} instance.
+ */
+public class AggregatorMetricSource implements Source {
+
+ private static final String SOURCE_NAME = "NamedAggregators";
+
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+
+ public AggregatorMetricSource(final NamedAggregators aggregators) {
+ metricRegistry.register(SOURCE_NAME, AggregatorMetric.of(aggregators));
+ }
+
+ @Override
+ public String sourceName() {
+ return SOURCE_NAME;
+ }
+
+ @Override
+ public MetricRegistry metricRegistry() {
+ return metricRegistry;
+ }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
new file mode 100644
index 0000000000000..88e2211cf9e1c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link MetricRegistry} decorator-like* that supports {@link AggregatorMetric} by exposing
+ * the underlying * {@link org.apache.beam.runners.spark.aggregators.NamedAggregators}'
+ * aggregators as {@link Gauge}s.
+ *
+ * *{@link MetricRegistry} is not an interface, so this is not a by-the-book decorator.
+ * That said, it delegates all metric related getters to the "decorated" instance.
+ *
+ */
+public class WithNamedAggregatorsSupport extends MetricRegistry {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WithNamedAggregatorsSupport.class);
+
+ private MetricRegistry internalMetricRegistry;
+
+ private WithNamedAggregatorsSupport(final MetricRegistry internalMetricRegistry) {
+ this.internalMetricRegistry = internalMetricRegistry;
+ }
+
+ public static WithNamedAggregatorsSupport forRegistry(final MetricRegistry metricRegistry) {
+ return new WithNamedAggregatorsSupport(metricRegistry);
+ }
+
+ @Override
+ public SortedMap getTimers(final MetricFilter filter) {
+ return internalMetricRegistry.getTimers(filter);
+ }
+
+ @Override
+ public SortedMap getMeters(final MetricFilter filter) {
+ return internalMetricRegistry.getMeters(filter);
+ }
+
+ @Override
+ public SortedMap getHistograms(final MetricFilter filter) {
+ return internalMetricRegistry.getHistograms(filter);
+ }
+
+ @Override
+ public SortedMap getCounters(final MetricFilter filter) {
+ return internalMetricRegistry.getCounters(filter);
+ }
+
+ @Override
+ public SortedMap getGauges(final MetricFilter filter) {
+ return
+ new ImmutableSortedMap.Builder(
+ Ordering.from(String.CASE_INSENSITIVE_ORDER))
+ .putAll(internalMetricRegistry.getGauges(filter))
+ .putAll(extractGauges(internalMetricRegistry, filter))
+ .build();
+ }
+
+ private Map extractGauges(final MetricRegistry metricRegistry,
+ final MetricFilter filter) {
+
+ // find the AggregatorMetric metrics from within all currently registered metrics
+ final Optional