Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
[Core] Optimize block_manager_v2 vs block_manager_v1 (to make V2 defa…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexm-redhat authored and robertgshaw2-redhat committed Jul 7, 2024
1 parent 1732299 commit 77f588c
Show file tree
Hide file tree
Showing 19 changed files with 1,189 additions and 532 deletions.
4 changes: 4 additions & 0 deletions benchmarks/benchmark_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def main(args: argparse.Namespace):
load_format=args.load_format,
distributed_executor_backend=args.distributed_executor_backend,
otlp_traces_endpoint=args.otlp_traces_endpoint,
enable_prefix_caching=args.enable_prefix_caching,
)

sampling_params = SamplingParams(
Expand Down Expand Up @@ -220,6 +221,9 @@ def run_to_completion(profile_dir: Optional[str] = None):
action='store_true',
help='If True, the prefill requests can be chunked based on the '
'max_num_batched_tokens')
parser.add_argument("--enable-prefix-caching",
action='store_true',
help="Enable automatic prefix caching")
parser.add_argument('--use-v2-block-manager', action='store_true')
parser.add_argument(
"--ray-workers-use-nsight",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def generate(
req_sample_output_strs: List[str] = []
for sample in req_output.outputs:
output_str = sample.text
output_ids = sample.token_ids
output_ids = list(sample.token_ids)
req_sample_output_ids.append(prompt_ids + output_ids)
req_sample_output_strs.append(prompt_str + output_str)
outputs.append((req_sample_output_ids, req_sample_output_strs))
Expand Down
5 changes: 3 additions & 2 deletions tests/core/block/test_block_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ def test_cow(block_size: int, sequence_len: int, append_len: int,
block_size) - (sequence_len // block_size)

original_block_table.allocate(token_ids=token_ids, device=Device.GPU)
original_block_ids = original_block_table.physical_block_ids
original_block_ids = original_block_table.physical_block_ids[:]

print("original_block_ids = {}".format(original_block_ids))
forked_block_table = original_block_table.fork()

# Expect no additional allocation (copy on _write_).
Expand Down Expand Up @@ -462,7 +463,7 @@ def test_cow_lookahead_simple(block_size: int, sequence_len: int,

# Allocate lookahead slots.
original_block_table.ensure_num_empty_slots(lookahead_slots)
original_block_ids = original_block_table.physical_block_ids
original_block_ids = original_block_table.physical_block_ids[:]

forked_block_table = original_block_table.fork()

Expand Down
24 changes: 12 additions & 12 deletions tests/core/block/test_cpu_gpu_block_allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
@pytest.mark.parametrize("num_gpu_blocks", [1024])
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
def test_allocate_mutable_block(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
Expand All @@ -26,14 +26,14 @@ def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

cpu_blocks = [
allocator.allocate_mutable(prev_block=None, device=Device.CPU)
allocator.allocate_mutable_block(prev_block=None, device=Device.CPU)
for _ in range(num_cpu_blocks)
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

gpu_blocks = [
allocator.allocate_mutable(prev_block=None, device=Device.GPU)
allocator.allocate_mutable_block(prev_block=None, device=Device.GPU)
for _ in range(num_gpu_blocks)
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
Expand All @@ -52,8 +52,8 @@ def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
@pytest.mark.parametrize("num_gpu_blocks", [1024])
@pytest.mark.parametrize("block_size", [2])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_allocate_immutable(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
def test_allocate_immutable_block(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
Expand All @@ -72,18 +72,18 @@ def test_allocate_immutable(num_cpu_blocks: int, num_gpu_blocks: int,
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

cpu_blocks = [
allocator.allocate_immutable(prev_block=None,
token_ids=token_ids,
device=Device.CPU)
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids,
device=Device.CPU)
for token_ids in cpu_token_ids
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

gpu_blocks = [
allocator.allocate_immutable(prev_block=None,
token_ids=token_ids,
device=Device.GPU)
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids,
device=Device.GPU)
for token_ids in gpu_token_ids
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
Expand Down
6 changes: 3 additions & 3 deletions tests/core/block/test_naive_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ def create_allocate_lambda(allocate_type: str,
prev_block: Optional[Block],
token_ids: List[int]):
if allocate_type == "immutable":
allocate_block = lambda: allocator.allocate_immutable(
allocate_block = lambda: allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=token_ids)
elif allocate_type == "mutable":
allocate_block = lambda: allocator.allocate_mutable(prev_block=
prev_block)
allocate_block = lambda: allocator.allocate_mutable_block(
prev_block=prev_block)
else:
raise ValueError()

Expand Down
106 changes: 67 additions & 39 deletions tests/core/block/test_prefix_caching_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ def test_first_block_has_correct_content_hash(seed: int, block_size: int,
token_ids = list(range(num_to_fill))
mock_allocator = MagicMock(spec=PrefixCachingBlockAllocator)

block_with_prev = PrefixCachingBlock(
prev_block=None,
token_ids=token_ids,
block_size=block_size,
prefix_caching_allocator=mock_allocator)
block_with_prev = PrefixCachingBlock(prev_block=None,
token_ids=token_ids,
block_size=block_size,
allocator=mock_allocator)

if is_curr_block_full:
# Expect hash since block is full.
Expand Down Expand Up @@ -76,7 +75,7 @@ def test_nth_block_has_correct_content_hash(seed: int, block_size: int,
prev_block=previous_block,
token_ids=token_ids,
block_size=block_size,
prefix_caching_allocator=mock_allocator,
allocator=mock_allocator,
)

if is_curr_block_full and prev_block_has_hash:
Expand Down Expand Up @@ -143,7 +142,7 @@ def create_chain(block_size: int,
prev_block=prev_block,
token_ids=[],
block_size=block_size,
prefix_caching_allocator=allocator,
allocator=allocator,
)

tokens_to_append = token_ids[block_number *
Expand All @@ -164,11 +163,11 @@ def create_allocate_lambda(allocate_type: str, allocator: BlockAllocator,
prev_block: Optional[Block],
token_ids: List[int]):
if allocate_type == "immutable":
allocate_block = lambda: allocator.allocate_immutable(
allocate_block = lambda: allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=token_ids)
elif allocate_type == "mutable":
allocate_block = lambda: allocator.allocate_mutable(prev_block=
prev_block)
allocate_block = lambda: allocator.allocate_mutable_block(
prev_block=prev_block)
else:
raise ValueError()

Expand Down Expand Up @@ -238,12 +237,13 @@ def test_allocate_immutable_ooms_many_hash(num_blocks: int,

# Expect allocation with unseen hash to fail.
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_immutable(prev_block=chain[-1],
token_ids=list(range(block_size)))
allocator.allocate_immutable_block(prev_block=chain[-1],
token_ids=list(
range(block_size)))

# Expect mutable allocation to fail.
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_mutable(prev_block=chain[-1])
allocator.allocate_mutable_block(prev_block=chain[-1])

# Expect allocation of exact same chain to pass.
second_chain = TestPrefixCachingBlockAllocator.create_immutable_chain(
Expand Down Expand Up @@ -275,7 +275,7 @@ def test_free_prevents_oom(num_blocks: int, block_size: int):

# Expect mutable allocation to fail.
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_mutable(prev_block=None)
allocator.allocate_mutable_block(prev_block=None)

block_to_free = chain[-1]

Expand All @@ -285,11 +285,11 @@ def test_free_prevents_oom(num_blocks: int, block_size: int):
allocator.free(block_to_free)
assert block_to_free.block_id is None, i

new_block = allocator.allocate_mutable(prev_block=None)
new_block = allocator.allocate_mutable_block(prev_block=None)
assert new_block.block_id == block_id, i

with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_mutable(prev_block=None)
allocator.allocate_mutable_block(prev_block=None)

block_to_free = new_block

Expand Down Expand Up @@ -381,17 +381,13 @@ def test_get_common_computed_block_ids(num_blocks: int, block_size: int,

# Create token ids that will exhaust all blocks.
token_ids = list(range(num_blocks_to_consume * block_size))
blocks = list(range(num_blocks_to_consume))

first_chain = TestPrefixCachingBlockAllocator.create_immutable_chain(
block_size=block_size,
token_ids=token_ids,
allocator=allocator,
)

# mark all blocks in first chain as computed
allocator.mark_blocks_as_computed(blocks)

# After zero_point, second_chain's token_ids would be set -1, which
# make it different from here comparing with first_chain
zero_point = random.randint(1, len(token_ids) - 1)
Expand Down Expand Up @@ -429,15 +425,16 @@ def test_alloc_promotion(num_blocks: int, block_size: int, seed: int):
block_size=block_size)
token_ids = list(range(block_size))

block = allocator.allocate_immutable(prev_block=None,
token_ids=token_ids)
block = allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids)

assert allocator._refcounter.get(block.block_id) == 1
m = allocator.allocate_mutable(prev_block=None)
m = allocator.allocate_mutable_block(prev_block=None)

block_id = m.block_id
for i in range(block_size):
m.append_token_ids([i])

# After block get promoted to immutable from mutable, if there is
# already same content hash block, then it shall be released into
# hashless_allocator
Expand All @@ -457,48 +454,79 @@ def test_eviction_alloc_mixed(num_blocks: int, block_size: int, seed: int):

all_blocks_list = [i for i in range(num_blocks)]
zero_ref = {i: 0 for i in range(num_blocks)}
one_ref = {i: 1 for i in range(num_blocks)}
allocator = PrefixCachingBlockAllocator(num_blocks=num_blocks,
block_size=block_size)
token_ids = list(range(num_blocks * block_size))

# now we have num_blocks free blocks in hashless allocator
# with internal tracking list _blocks _cached_blocks and evictor
# empty and block's ref shall be 0
# Verify initial/pre-alloc state

# Ensure all blocks are free inside hashless allocator
assert list(allocator._hashless_allocator._free_block_indices
) == all_blocks_list
assert len(allocator._blocks.keys()) == 0
# Ensure no tracked blocks
assert len(allocator._block_tracker.keys()) == num_blocks
for block_id in range(num_blocks):
assert not allocator._block_tracker[block_id].active
# Ensure no cached blocks
assert len(allocator._cached_blocks.values()) == 0
# Ensure no evicted blocks
assert len(allocator.evictor.free_table.keys()) == 0
# Ensure 0s ref counts for all blocks
assert allocator._refcounter._refcounts == zero_ref

# Allocate immutable chains with only one block residuled in
new_block = []
for i in range(num_blocks):
block = allocator.allocate_immutable(
block = allocator.allocate_immutable_block(
prev_block=None,
token_ids=token_ids[block_size * i:block_size * (i + 1)])
new_block.append(block)

# Verify post-alloc state

# Ensure no blocks are free inside hashless allocator
assert (len(allocator._hashless_allocator._free_block_indices) == 0)
# Ensure all blocks are tracked
assert len(allocator._block_tracker.keys()) == num_blocks
for block_id in range(num_blocks):
assert allocator._block_tracker[block_id].active
# Ensure all blocks are cached (all promoted)
assert len(allocator._cached_blocks.values()) == num_blocks
# Ensure no evicted blocks
assert len(allocator.evictor.free_table.keys()) == 0
# Ensure 1s ref counts for all blocks
assert allocator._refcounter._refcounts == one_ref

# Free all blocks, and now all blocks shall be in the evictor
# there shall be no tracking data left in _blocks
# there shall be no tracking data left in _block_tracker
# all blocks shall be tracked in _cached_blocks
# all blocks' ref shall be zero
for block in new_block:
allocator.free(block)

assert len(allocator._blocks.keys()) == 0
# Verify post-free state

# Ensure no tracked blocks
assert len(allocator._block_tracker.keys()) == num_blocks
for block_id in range(num_blocks):
assert not allocator._block_tracker[block_id].active
# Ensure no blocks in hashless allocator (all promoted)
assert len(allocator._hashless_allocator._free_block_indices) == 0
# Ensure all blocks are cached
assert list(allocator._cached_blocks.values()) == all_blocks_list
# Ensure all blocks are inside the evictor
assert list(allocator.evictor.free_table.keys()) == all_blocks_list
# Ensure 0s refcounts
assert allocator._refcounter._refcounts == zero_ref

# Allocate a mutable block, and the first block shall be evicted
# and set its content hash into None, ref to 1
mutable = allocator.allocate_mutable(prev_block=None)
mutable = allocator.allocate_mutable_block(prev_block=None)

assert mutable.block_id == 0
assert mutable.content_hash is None
assert 0 in allocator._blocks
assert allocator._block_tracker[0].active
assert allocator._refcounter.get(0) == 1
assert 0 not in allocator._cached_blocks
assert 0 not in allocator.evictor
Expand All @@ -507,27 +535,27 @@ def test_eviction_alloc_mixed(num_blocks: int, block_size: int, seed: int):
# hashless allocator
allocator.free(mutable)

assert len(allocator._blocks.keys()) == 0
assert not allocator._block_tracker[0].active
assert allocator._refcounter._refcounts == zero_ref
assert 0 not in allocator._cached_blocks
assert 0 not in allocator.evictor
assert 0 in allocator._hashless_allocator._free_block_indices

# when allocate immutable with first block_size tokens, we
# When allocate immutable with first block_size tokens, we
# shall get free block from hashless allocator, thus no block left
# in hashless
block = allocator.allocate_immutable(prev_block=None,
token_ids=token_ids[:block_size])
block = allocator.allocate_immutable_block(
prev_block=None, token_ids=token_ids[:block_size])

assert block.block_id == 0
assert len(allocator._hashless_allocator._free_block_indices) == 0
assert 0 in allocator._blocks
assert allocator._block_tracker[0].active
assert 0 in allocator._cached_blocks.values()
assert allocator._refcounter.get(0) == 1
assert 0 not in allocator.evictor

# allocate mutable block again, it shall be popped from evictor
mutable = allocator.allocate_mutable(prev_block=None)
mutable = allocator.allocate_mutable_block(prev_block=None)
assert len(allocator._hashless_allocator._free_block_indices) == 0
assert mutable.block_id not in allocator.evictor.free_table
assert allocator._refcounter.get(mutable.block_id) == 1
Expand Down Expand Up @@ -624,7 +652,7 @@ def create_immutable_chain(
block_token_ids = token_ids[block_number *
block_size:(block_number + 1) *
block_size]
prev_block = allocator.allocate_immutable(
prev_block = allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=block_token_ids)
blocks.append(prev_block)

Expand Down
8 changes: 4 additions & 4 deletions tests/spec_decode/test_batch_expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def test_create_single_target_seq_group_metadata(k: int):

assert output.request_id == input_seq_group_metadata.request_id
assert len(output.seq_data) == 1
assert output.seq_data[target_seq_id].get_prompt_token_ids(
) == prompt_tokens
assert output.seq_data[target_seq_id].get_output_token_ids(
) == prev_output_tokens + token_ids
assert output.seq_data[target_seq_id].get_prompt_token_ids() == tuple(
prompt_tokens)
assert output.seq_data[target_seq_id].get_output_token_ids() == tuple(
prev_output_tokens + token_ids)

assert len(output.block_tables) == 1
assert output.block_tables[
Expand Down
Loading

0 comments on commit 77f588c

Please sign in to comment.