-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Combined*SequenceFile for VKVS, WritableSequenceFileScheme, SequenceFileScheme #1647
Use Combined*SequenceFile for VKVS, WritableSequenceFileScheme, SequenceFileScheme #1647
Conversation
how was this tested and can you add any comments about the motivation or test/benchmark results? |
@johnynek motivation is simple, in many cases, VKVS produce small files and these changes give us the flexibility to have ability combine input of VKVS in fewer mappers. Performance impact should mitigate by tuning settings of maxSplitSize. and scalding has tests for VKVS (VersionedKeyValSourceTest). |
May be worth running our Tsar end-to-end tests with this change to verify. |
@@ -24,6 +26,11 @@ public KeyValueByteScheme(Fields fields) { | |||
super(fields, BytesWritable.class, BytesWritable.class); | |||
} | |||
|
|||
@Override | |||
public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { | |||
conf.setInputFormat(CombineSequenceFileInputFormat.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good idea to hardcode Hadoop's CombineSequenceFileInputFormat especially since we normally use the one from ElephnantBird. It's also disabled by default which will make it a backwards-compatible change.
The simplest way to hook it up in VersionedKeyValSource
import com.twitter.elephantbird.cascading2.scheme.CombinedSequenceFile
...
def hdfsScheme =
HadoopSchemeInstance(new CombinedSequenceFile(fields).asInstanceOf[Scheme[_, _, _, _, _]])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did little refactoring to keep backward capability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My one-line change suggestion for VersionedKeyValSource is not sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that it should be opt-in and affect not only VKVS.
And also, it better to moves to hadoop implementation instead elephantbird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EB is opt-in, you have to set elephantbird.use.combine.input.format=true
How do you measure "better" for hadoop's impl. The code change is simpler in the EB case.
You can make input format configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadoop has better community support, EB used only inside Twitter.
@isnotinvain / @johnynek what do you think?
87f23ed
to
e224b32
Compare
now this is an even bigger change, where we need even more evidence it is safe. Unfortunately, sources are things that are hard to test well because you really need to interact with Hadoop. He have some hadoop tests, but we do not have performance or memory tests. I agree that using standard hadoop is preferable if possible. That said, we already have an elephant-bird dependency. @gerashegalov what is it about the elephantbird implementation that is better here? It is a shame if we should assume a custom approach because it is indeed hard for people to keep it straight. |
@johnynek we widely use EB combiner with LzoThrift (and even Parquet via Multiformat sources), whereas I am not aware of significant usage of this Hadoop class. We use EB CombinedSequenceFile specifically, specifically on thousands jobs for intermediate data daily. I also pointed out the simplicity of such a change with the EB. Other than that I don't have any data to compare. However, as stated before, we can make it configurable to give users a choice. |
|
e224b32
to
3d881e3
Compare
@gerashegalov I changed approach to use EB API, can you look at now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 LGTM
build.sbt
Outdated
@@ -329,6 +329,8 @@ lazy val scaldingCore = module("core").settings( | |||
"com.twitter" %% "bijection-macros" % bijectionVersion, | |||
"com.twitter" %% "chill" % chillVersion, | |||
"com.twitter" %% "chill-algebird" % chillVersion, | |||
"com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek what's your take on our favorite subject of transitive dependencies? If they are allowed we can remove eb entries from scaldingCommons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, strictly speaking, I thought elephant-bird had GPL'd dependencies (lzo). That was actually one of the original reasons for not putting this in the core.
Do we need both of these dependencies here?
I start to wonder if there is a single consumer of elephant-bird-cascading2 other than scalding. Looks like the answer is no:
https://mvnrepository.com/artifact/com.twitter.elephantbird/elephant-bird-cascading2/usages
Maybe we should move that code into this repo so we don't have the cross repo publish issue?
We could move it into a separate module, like we did with maple
and just have the java code there. As far as elephant-bird-core, that is used elsewhere. I wonder if the gpl lzo code can be is a separate module in elephant-bird, then we could have scalding-lzo
that depends on that and split scalding-commons
into core
and lzo
?
Each new dependency is a possible pain point for others... So, I think we should add dependencies very carefully (and usually strive to remove them, not add them).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek got it, let's keep elephantbird isolated in commons for now and use combined for VKVS and have separated CombinedSchemas for sequence files as helpers.
4cb54dd
to
40dbad9
Compare
40dbad9
to
38a1cd9
Compare
38a1cd9
to
c416872
Compare
@gerashegalov / @johnynek please review again :) |
trait CombinedWritableSequenceFileScheme extends WritableSequenceFileScheme { | ||
// TODO Cascading doesn't support local mode yet | ||
override def hdfsScheme = | ||
HadoopSchemeInstance(new CombinedWritableSequenceFile(fields, keyType, valueType).asInstanceOf[cascading.scheme.Scheme[_, _, _, _, _]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the fully qualified name here but not on line 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks
c416872
to
b1927ce
Compare
Side note, one reason to use EB's combine input format and not hadoop's is I noticed that the hadoop one doesn't properly report mapper progress for some reason (mappers go from 0% to 100% and nothing in between). Maybe it's not that commonly used. |
👍 |
…SequenceFileScheme
b1927ce
to
d7c2708
Compare
That give us the flexibility to combine small files in fewer mappers.