Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: add disk spilling to lookup joiner #40208

Merged
merged 1 commit into from
Aug 28, 2019

Conversation

solongordon
Copy link
Contributor

In lookup joins on partial index keys, there is no limit on how many
rows might be returned by any particular lookup, so the joinreader may
be buffering an unbounded number of rows into memory. I changed
joinreader to use a disk-backed row container rather than just storing
the rows in memory with no accounting.

Fixes #39044

Release note (bug fix): Lookup joins now spill to disk if the index
lookups return more rows than can be stored in memory.

@solongordon solongordon added the do-not-merge bors won't merge a PR with this label. label Aug 26, 2019
@solongordon solongordon requested review from asubiotto and a team August 26, 2019 12:58
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor Author

@solongordon solongordon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still WIP but would appreciate some early feedback to make sure I'm going down the right path. Added a few comments where I'm feeling unsure.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/distsqlrun/joinreader.go, line 90 at r1 (raw file):

	// State variables for each batch of input rows.
	inputRows            sqlbase.EncDatumRows
	lookedUpRows         *rowcontainer.DiskBackedIndexedRowContainer

Does DiskBackedIndexedRowContainer seem like a reasonable choice here? Basically we are replacing a map from int (the input row index) to EncDatumRows.


pkg/sql/distsqlrun/joinreader.go, line 228 at r1 (raw file):

		flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, flowCtx.Cfg, "joinreader-mem")
	jr.diskMonitor = NewMonitor(flowCtx.EvalCtx.Ctx(), flowCtx.Cfg.DiskMonitor, "joinreader-disk")
	jr.lookedUpRows = rowcontainer.MakeDiskBackedIndexedRowContainer(

It seems like I need logic here to check if sql.distsql.temp_storage.joins is enabled, right? And if it's disabled just use an in-memory row container? (There's currently no such thing as an in-memory indexed row container but should be easy enough to add.)


pkg/sql/distsqlrun/joinreader.go, line 458 at r1 (raw file):

		}
		if !isJoinTypePartialJoin {
			// Replace missing values with nulls to appease the row container.

Is there a good alternative to this?

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! The approach looks good to me, and I think DiskBackedIndexedRowContainer is the right guy for the job.

Reviewed 2 of 2 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @solongordon)


pkg/sql/distsqlrun/joinreader.go, line 90 at r1 (raw file):

Previously, solongordon (Solon) wrote…

Does DiskBackedIndexedRowContainer seem like a reasonable choice here? Basically we are replacing a map from int (the input row index) to EncDatumRows.

Yes, I think that it does exactly what's needed.


pkg/sql/distsqlrun/joinreader.go, line 104 at r1 (raw file):

	// inputRowIdxToOutputRows.
	emitCursor struct {
		// inputRowIdx contains the index into inputRowIdxToOutputRows that we're

[nit]: it seems like inputRowIdxToOutputRows is not present, probably it was renamed at some point, but not all occurrences were updated?


pkg/sql/distsqlrun/joinreader.go, line 228 at r1 (raw file):

Previously, solongordon (Solon) wrote…

It seems like I need logic here to check if sql.distsql.temp_storage.joins is enabled, right? And if it's disabled just use an in-memory row container? (There's currently no such thing as an in-memory indexed row container but should be easy enough to add.)

I think my answer to both questions is "yes."


pkg/sql/distsqlrun/joinreader.go, line 458 at r1 (raw file):

Previously, solongordon (Solon) wrote…

Is there a good alternative to this?

Hm, we can have missing values in case of a partial key lookup?


pkg/sql/distsqlrun/joinreader.go, line 487 at r1 (raw file):

						continue
					}
					jr.inputRowIdxToLookedUpRowIdx[inputRowIdx] = []int{-1}

[nit]: I'm not sure whether it's important allocation- and performance-wise, but maybe we should have a global slice of length 1 with -1 as the single value and reuse that slice here?


pkg/sql/distsqlrun/joinreader_test.go, line 385 at r1 (raw file):

			t.Run(fmt.Sprintf("%d/%s", i, c.description), func(t *testing.T) {
				st := cluster.MakeTestingClusterSettings()
				tempEngine, err := engine.NewTempEngine(base.DefaultTestTempStorageConfig(st), base.DefaultTestStoreSpec)

I think instantiation of tempEngine and diskMonitor can be brought out of the t.Run and reused on all iterations.

Copy link
Contributor Author

@solongordon solongordon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look! Will be back with a more complete PR soon.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @solongordon, and @yuzefovich)


pkg/sql/distsqlrun/joinreader.go, line 104 at r1 (raw file):

Previously, yuzefovich wrote…

[nit]: it seems like inputRowIdxToOutputRows is not present, probably it was renamed at some point, but not all occurrences were updated?

Good eye, I'll fix this.


pkg/sql/distsqlrun/joinreader.go, line 458 at r1 (raw file):

Previously, yuzefovich wrote…

Hm, we can have missing values in case of a partial key lookup?

They're missing because the row fetcher always returns an element for every column in the index, but only the "needed" columns are actually set.

@solongordon solongordon changed the title WIP: distsql: add disk spilling to lookup joiner distsql: add disk spilling to lookup joiner Aug 27, 2019
@solongordon solongordon removed the do-not-merge bors won't merge a PR with this label. label Aug 27, 2019
Copy link
Contributor Author

@solongordon solongordon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to figure out why TestJoinReaderDrain is failing, but otherwise this is ready for another look. I added logic to use an in-memory row container if temp storage is disabled, and I added a unit test which triggers disk spilling.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @solongordon, and @yuzefovich)


pkg/sql/distsqlrun/joinreader.go, line 104 at r1 (raw file):

Previously, solongordon (Solon) wrote…

Good eye, I'll fix this.

Done.


pkg/sql/distsqlrun/joinreader.go, line 228 at r1 (raw file):

Previously, yuzefovich wrote…

I think my answer to both questions is "yes."

Done.


pkg/sql/distsqlrun/joinreader.go, line 487 at r1 (raw file):

Previously, yuzefovich wrote…

[nit]: I'm not sure whether it's important allocation- and performance-wise, but maybe we should have a global slice of length 1 with -1 as the single value and reuse that slice here?

Done.

@solongordon
Copy link
Contributor Author

Fixed TestJoinReaderDrain, but vectorized queries with lookup joins are panicking because flowCtx.Cfg.Settings is nil. Looking into that.

@solongordon
Copy link
Contributor Author

It turns out the nil pointer panic was occurring during the SupportsVectorized check, because a fake/incomplete FlowCtx gets passed in that case:

ctx, &distsqlrun.FlowCtx{
EvalCtx: &evalCtx.EvalContext,
Cfg: &distsqlrun.ServerConfig{},
NodeID: -1,
}, spec.Processors,

I worked around this by moving the row container initialization out of newJoinReader and into the Start method. It feels a bit weird that wrapped row sources get initialized at all during SupportsVectorized. I wonder if that is avoidable.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that SupportsVectorized is supposed to simulate the flow setup as close as possible to the "real" instantiation, so wrapped row sources should also be created because wrapping a row source can return an error. If we do not do that, then we might choose using the vectorized flows, and then it would fail on the actual setup, and the query would always be returning an error unless the user turns off the vectorized completely.

I think the solution is to put the real ServerConfig in the "artificial" flow context you linked to above.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @solongordon, and @yuzefovich)

@solongordon
Copy link
Contributor Author

OK, I'll try that. I think it'll also need some other things populated like the Cfg.DiskMonitor and Cfg.TempStorage but hopefully that's easy enough.

What feels off to me is that we are initializing a bunch of machinery which is never going to get used: memory and disk monitors, the row container, the row fetcher. But maybe this is necessary like you say to make sure the real deal doesn't error out.

@solongordon solongordon force-pushed the lookup-join-disk-spilling branch 2 times, most recently from 7747af6 to a23739c Compare August 28, 2019 17:51
@solongordon
Copy link
Contributor Author

Tests are now passing 🙌

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 4 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @solongordon, and @yuzefovich)


pkg/sql/distsqlrun/joinreader.go, line 458 at r1 (raw file):

Previously, solongordon (Solon) wrote…

They're missing because the row fetcher always returns an element for every column in the index, but only the "needed" columns are actually set.

I see, thanks.


pkg/sql/distsqlrun/joinreader.go, line 252 at r2 (raw file):

			0, /* rowCapacity */
		)
		if limit < mon.DefaultPoolAllocationSize {

This conditional, for some reason, is bugging me, but I don't know which version I would prefer. It also seems suspicious that in the test you're actually setting limit to mon.DefaultPoolAllocationSize, so it feels like the cache is not actually disabled.


pkg/sql/distsqlrun/joinreader_test.go, line 385 at r1 (raw file):

Previously, yuzefovich wrote…

I think instantiation of tempEngine and diskMonitor can be brought out of the t.Run and reused on all iterations.

Ping.


pkg/sql/distsqlrun/joinreader_test.go, line 527 at r2 (raw file):

	// We need MemoryLimitBytes to be at least DefaultPoolAllocationSize so that
	// we can buffer some rows before spilling to disk.
	flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = mon.DefaultPoolAllocationSize

Why do we need to "buffer some rows before spilling?"


pkg/sql/distsqlrun/joinreader_test.go, line 564 at r2 (raw file):

		expected := fmt.Sprintf("['%s']", stringColVal)
		actual := row.String([]types.T{*types.String})
		if actual != expected {

[nit]: lately we've been using require.Equal for this comparison (require package can also be used below).

In lookup joins on partial index keys, there is no limit on how many
rows might be returned by any particular lookup, so the joinreader may
be buffering an unbounded number of rows into memory. I changed
joinreader to use a disk-backed row container rather than just storing
the rows in memory with no accounting.

Fixes cockroachdb#39044

Release note (bug fix): Lookup joins now spill to disk if the index
lookups return more rows than can be stored in memory.
Copy link
Contributor Author

@solongordon solongordon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/distsqlrun/joinreader.go, line 252 at r2 (raw file):

Previously, yuzefovich wrote…

This conditional, for some reason, is bugging me, but I don't know which version I would prefer. It also seems suspicious that in the test you're actually setting limit to mon.DefaultPoolAllocationSize, so it feels like the cache is not actually disabled.

Actually the cache is not intended to be disabled for that test. This conditional is for tests like the fakedist-disk logic tests, which set MemoryLimitBytes to 1 to force disk spilling.


pkg/sql/distsqlrun/joinreader_test.go, line 385 at r1 (raw file):

Previously, yuzefovich wrote…

Ping.

Done.


pkg/sql/distsqlrun/joinreader_test.go, line 527 at r2 (raw file):

Previously, yuzefovich wrote…

Why do we need to "buffer some rows before spilling?"

Yeah, this isn't strictly necessary now that it's possible to disable caching in DiskBackedIndexedRowContainer. However I think there's still value to it because it's more realistic to store some rows in memory before spilling to disk. Also it's good to exercise the caching logic since it's more realistic. I updated the comment to reflect this.


pkg/sql/distsqlrun/joinreader_test.go, line 564 at r2 (raw file):

Previously, yuzefovich wrote…

[nit]: lately we've been using require.Equal for this comparison (require package can also be used below).

Done.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! :lgtm:

Reviewed 1 of 1 files at r3.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @solongordon, and @yuzefovich)


pkg/sql/distsqlrun/joinreader.go, line 252 at r2 (raw file):

Previously, solongordon (Solon) wrote…

Actually the cache is not intended to be disabled for that test. This conditional is for tests like the fakedist-disk logic tests, which set MemoryLimitBytes to 1 to force disk spilling.

I see, cool.


pkg/sql/distsqlrun/joinreader_test.go, line 527 at r2 (raw file):

Previously, solongordon (Solon) wrote…

Yeah, this isn't strictly necessary now that it's possible to disable caching in DiskBackedIndexedRowContainer. However I think there's still value to it because it's more realistic to store some rows in memory before spilling to disk. Also it's good to exercise the caching logic since it's more realistic. I updated the comment to reflect this.

This makes sense now, thanks.

@solongordon
Copy link
Contributor Author

Thanks much for the review!

bors r+

craig bot pushed a commit that referenced this pull request Aug 28, 2019
40208: distsql: add disk spilling to lookup joiner r=solongordon a=solongordon

In lookup joins on partial index keys, there is no limit on how many
rows might be returned by any particular lookup, so the joinreader may
be buffering an unbounded number of rows into memory. I changed
joinreader to use a disk-backed row container rather than just storing
the rows in memory with no accounting.

Fixes #39044

Release note (bug fix): Lookup joins now spill to disk if the index
lookups return more rows than can be stored in memory.

40284: storage: issue swaps on AllocatorConsiderRebalance r=nvanbenschoten a=tbg

Change the rebalancing code so that it not only looks up a new replica to
add, but also picks one to remove. Both actions are then given to a
ChangeReplicas invocation which will carry it out atomically as long as
that feature is enabled.

Release note (bug fix): Replicas can now be moved between stores without
entering an intermediate configuration that violates the zone constraints.
Violations may still occur during zone config changes, decommissioning, and
in the presence of dead nodes (NB: the remainder be addressed in a future
change, so merge the corresponding release note)

40300: store: pull updateMVCCGauges out of StoreMetrics lock, use atomics r=nvanbenschoten a=nvanbenschoten

The operations it performs are already atomic, so we can use atomic add instructions to avoid any critical section.

This was responsible for 8.15% of mutex contention on a YCSB run.

The change also removes MVCCStats from the `storeMetrics` interface, which addresses a long-standing TODO.

40301: roachtest: Deflake clock jump test r=tbg a=bdarnell

These tests perform various clock jumps, then reverse them. The
reverse can cause a crash even if the original jump did not.
Add some sleeps to make things more deterministic and improve the
recovery process at the end of the test.

Fixes #38723

Release note: None

40305: exec: modify tests to catch bad selection vector access r=rafiss a=rafiss

The runTests helper will now cause a panic if a vectorized operator
tries to access a part of the selection vector that is out of bounds.
This identified bugs in the projection operator.

Release note: None

Co-authored-by: Solon Gordon <solon@cockroachlabs.com>
Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Ben Darnell <ben@cockroachlabs.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
@craig
Copy link
Contributor

craig bot commented Aug 28, 2019

Build succeeded

@craig craig bot merged commit dfcf20f into cockroachdb:master Aug 28, 2019
@solongordon solongordon deleted the lookup-join-disk-spilling branch August 29, 2019 11:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

distsqlrun: lookup join still unboundedly buffers, but this time in a different way
3 participants