Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1817] change some deprecated code and fix minor codestyle #3678

Merged
merged 2 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand Down Expand Up @@ -69,17 +71,17 @@
@Slf4j
public class HiveSerDeConverter extends InstrumentedConverter<Object, Object, Writable, Writable> {

private SerDe serializer;
private SerDe deserializer;
private Serializer serializer;
private Deserializer deserializer;

@Override
public HiveSerDeConverter init(WorkUnitState state) {
super.init(state);
Configuration conf = HadoopUtils.getConfFromState(state);

try {
this.serializer = HiveSerDeWrapper.getSerializer(state).getSerDe();
this.deserializer = HiveSerDeWrapper.getDeserializer(state).getSerDe();
this.serializer = (Serializer) HiveSerDeWrapper.getSerializer(state).getSerDe();
this.deserializer = (Deserializer) HiveSerDeWrapper.getDeserializer(state).getSerDe();
this.deserializer.initialize(conf, state.getProperties());
setColumnsIfPossible(state);
this.serializer.initialize(conf, state.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.io.Writable;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -54,7 +55,7 @@ public DataWriter<Writable> build() throws IOException {

if (!properties.contains(WRITER_WRITABLE_CLASS) || !properties.contains(WRITER_OUTPUT_FORMAT_CLASS)) {
HiveSerDeWrapper serializer = HiveSerDeWrapper.getSerializer(properties);
properties.setProp(WRITER_WRITABLE_CLASS, serializer.getSerDe().getSerializedClass().getName());
properties.setProp(WRITER_WRITABLE_CLASS, ((Serializer) serializer.getSerDe()).getSerializedClass().getName());
properties.setProp(WRITER_OUTPUT_FORMAT_CLASS, serializer.getOutputFormatClassName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
Expand All @@ -28,6 +29,7 @@
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
Expand All @@ -39,6 +41,7 @@

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.Either;


/**
Expand Down Expand Up @@ -88,7 +91,7 @@ public String toString() {
}
}

private Optional<SerDe> serDe = Optional.absent();
private Optional<Either<AbstractSerDe, VectorizedSerde>> serDe = Optional.absent();
private final String serDeClassName;
private final String inputFormatClassName;
private final String outputFormatClassName;
Expand All @@ -107,15 +110,21 @@ private HiveSerDeWrapper(String serDeClassName, String inputFormatClassName, Str
* Get the {@link SerDe} instance associated with this {@link HiveSerDeWrapper}.
* This method performs lazy initialization.
*/
public SerDe getSerDe() throws IOException {
public Object getSerDe() throws IOException {
if (!this.serDe.isPresent()) {
try {
this.serDe = Optional.of(SerDe.class.cast(Class.forName(this.serDeClassName).newInstance()));
Object serde = Class.forName(this.serDeClassName).newInstance();
if (serde instanceof OrcSerde) {
this.serDe = Optional.of(Either.right(VectorizedSerde.class.cast(serde)));
} else {
this.serDe = Optional.of(Either.left(AbstractSerDe.class.cast(serde)));
}
} catch (Throwable t) {
throw new IOException("Failed to instantiate SerDe " + this.serDeClassName, t);
}
}
return this.serDe.get();

return this.serDe.get().get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public class HelixInstancePurgerWithMetrics {


/**
* Blocking call for purging all offline helix instances. Provides boiler plate code for providing periodic updates
* Blocking call for purging all offline helix instances. Provides boilerplate code for providing periodic updates
* and sending a GTE if it's an unexpected amount of time.
*
* <p>
* All previous helix instances should be purged on startup. Gobblin task runners are stateless from helix
* perspective because all important state is persisted separately in Workunit State Store or Watermark store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void handleContainerReleaseRequest(ContainerReleaseRequest containerRelea
// Record that this container was explicitly released so that a new one is not spawned to replace it
// Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion()
// can check for the container id and skip spawning a replacement container.
// Note that this is best effort since these are asynchronous operations and a container may abort concurrently
// Note that this is the best effort since these are asynchronous operations and a container may abort concurrently
// with the release call. So in some cases a replacement container may have already been spawned before
// the container is put into the black list.
this.releasedContainerCache.put(container.getId(), "");
Expand Down Expand Up @@ -453,7 +453,7 @@ private EventSubmitter buildEventSubmitter() {
/**
* Request an allocation of containers. If numTargetContainers is larger than the max of current and expected number
* of containers then additional containers are requested.
*
* <p>
arjun4084346 marked this conversation as resolved.
Show resolved Hide resolved
* If numTargetContainers is less than the current number of allocated containers then release free containers.
* Shrinking is relative to the number of currently allocated containers since it takes time for containers
* to be allocated and assigned work and we want to avoid releasing a container prematurely before it is assigned
Expand Down Expand Up @@ -709,7 +709,7 @@ private boolean shouldStickToTheSameNode(int containerExitStatus) {
* Handle the completion of a container. A new container will be requested to replace the one
* that just exited. Depending on the exit status and if container host affinity is enabled,
* the new container may or may not try to be started on the same node.
*
* <p>
* A container completes in either of the following conditions: 1) some error happens in the
* container and caused the container to exit, 2) the container gets killed due to some reason,
* for example, if it runs over the allowed amount of virtual or physical memory, 3) the gets
Expand Down Expand Up @@ -860,11 +860,11 @@ private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(C
* Get the number of matching container requests for the specified resource memory and cores.
* Due to YARN-1902 and YARN-660, this API is not 100% accurate. {@link AMRMClientCallbackHandler#onContainersAllocated(List)}
* contains logic for best effort clean up of requests, and the resource tend to match the allocated container. So in practice the count is pretty accurate.
*
* <p>
* This API call gets the count of container requests for containers that are > resource if there is no request with the exact same resource
* The RM can return containers that are larger (because of normalization etc).
* Container may be larger by memory or cpu (e.g. container (1000M, 3cpu) can fit request (1000M, 1cpu) or request (500M, 3cpu).
*
* <p>
* Thankfully since each helix tag / resource has a different priority, matching requests for one helix tag / resource
* have complete isolation from another helix tag / resource
*/
Expand Down