Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored Benchmark Scripts + Added Faiss Dense Vector Extractor #4

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ collections/*
indexes/*
.vscode/
venv/
*.txt
*.duckdb

# build directories from `python3 setup.py sdist bdist_wheel`
build/
Expand All @@ -19,3 +21,9 @@ runs/

# logs should also be ignored
logs/

# binaries should also be ignored
bin/*
lib*
pyvenv*
share*
1 change: 0 additions & 1 deletion collections/.gitkeep

This file was deleted.

19 changes: 19 additions & 0 deletions duckdb_in_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import duckdb
import faiss_vector_extractor

# Open the file-based DuckDB database
file_con = duckdb.connect('my_database.duckdb')

# Create an in-memory DuckDB database
mem_con = duckdb.connect(database=':memory:')

# Extract data from the file-based msmarco table into a Pandas DataFrame
df = file_con.execute("SELECT * FROM msmarco").fetchdf()

# Register the DataFrame in the in-memory DuckDB database
mem_con.register('msmarco', df)

# Now you can create the HNSW index on the msmarco table in the in-memory database
mem_con.execute(f"CREATE INDEX hnsw_idx ON msmarco USING HNSW(vector) WITH (metric = 'ip')")

# Continue with your operations...
17 changes: 17 additions & 0 deletions duckdb_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import duckdb
from flask import Flask, request, jsonify

app = Flask(__name__)
con = duckdb.connect('my_database.duckdb')

@app.route('/query', methods=['POST'])
def query_duckdb():
query = request.json.get('query')
try:
result = con.execute(query).fetchdf()
return result.to_json(orient='split')
except Exception as e:
return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
app.run(port=5000)
23 changes: 0 additions & 23 deletions faiss_to_pgvector.py

This file was deleted.

27 changes: 27 additions & 0 deletions msmarco_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import argparse
import faiss
import faiss_vector_extractor

TREC_DOT_PRODUCT_OUTPUT_FILE_PATH = "/store/scratch/x59song/trec_dot_product_output.txt"

def run_benchmark(trec_output_file_path, metric, query_index_path, adaptor):
query_vector_map = load_index_and_docids(query_index_path)
adaptor.run_benchmark(query_vector_map, table_name, metric, 20, 768, trec_output_file_path)

if __name__ == "__main__":
parser = argparse.ArgumentParser(description='FAISS Vector DB Index Constructor')
parser.add_argument('--index_name', type=str, required=True, help='name of the FAISS index file')
parser.add_argument('--metric', type=str, required=True, help='metric of the FAISS index')
parser.add_argument('--table_name', type=str, required=True, help='name of the table to store the vectors')
args = parser.parse_args()

DBConfig = {
'temp_directory': '/store/scratch/x59song/temp',
'memory_limit': '50GB'
}

adaptor = DuckDBVectorDBFaissIndexAdaptor(args.index_name, DBConfig)
adaptor.extract_vectors_and_construct_index(args.table_name, args.metric)
run_benchmark(TREC_DOT_PRODUCT_OUTPUT_FILE_PATH, args.metric, args.index_name, adaptor)


Empty file modified scripts/msmarco-passage/encode_queries.py
100644 → 100755
Empty file.
16 changes: 12 additions & 4 deletions benchmark_duckdb.py → vectordb_benchmark/benchmark_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def run_trec_eval(trec_output_file_path):

def run_benchmark(con, trec_output_file_path, metric):
"""Runs the benchmark and writes results in TREC format."""
total_time = 0
query_times = []
with open(trec_output_file_path, 'w') as trec_file:
with open(QUERY_JSONL_FILE_PATH, 'r') as query_file:
for line in query_file:
Expand All @@ -100,16 +100,24 @@ def run_benchmark(con, trec_output_file_path, metric):
start_time = time.time()
results = con.execute(sql_query, (vector, K)).fetchall()
end_time = time.time()
# aggregate time
total_time += end_time - start_time

# Calculate the time for this query and add it to the list
query_time = end_time - start_time
query_times.append(query_time)

# Write results in TREC format
for rank, (doc_id, score) in enumerate(results, start=1):
trec_file.write(f"{query_id} Q0 {doc_id} {rank} {score} {RUN_ID}\n")

print(f"TREC results written to {trec_output_file_path}")
run_trec_eval(trec_output_file_path)
return total_time
# Aggregate statistics
total_time = sum(query_times)
mean_time = np.mean(query_times)
variance_time = np.var(query_times)
min_time = min(query_times)
max_time = max(query_times)
return total_time, mean_time, variance_time, min_time, max_time

if __name__ == "__main__":
con = setup_database()
Expand Down
7 changes: 7 additions & 0 deletions vectordb_benchmark/benchmark_msmarco.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
python3 ./run_benchmark.py \
--index_name='msmarco-v1-passage.bge-base-en-v1.5' \
--table_name='msmarco' \
--metric='ip' \
--query_index_path='/store/scratch/x59song/Research/pyserini/indexes/msmarco-dev.bge-base-en-v1.5' \
--db_config_file='duckdb_db_config.txt' \
--db_type='duckdb' \
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import numpy as np
import psycopg2
import json
import subprocess
Expand Down Expand Up @@ -73,7 +74,7 @@ def run_trec_eval(trec_output_file_path):
subprocess.run(command)

def run_benchmark(cur, trec_output_file_path, metric):
total_time = 0
query_times = []
"""Runs the benchmark and writes results in TREC format."""
with open(trec_output_file_path, 'w') as trec_file:
with open(QUERY_JSONL_FILE_PATH, 'r') as query_file:
Expand All @@ -95,16 +96,24 @@ def run_benchmark(cur, trec_output_file_path, metric):
cur.execute(sql_query, (vector, K))
results = cur.fetchall()
end_time = time.time()
# aggregate the time
total_time += end_time - start_time

# Calculate the time for this query and add it to the list
query_time = end_time - start_time
query_times.append(query_time)

# Write results in TREC format
for rank, (doc_id, score) in enumerate(results, start=1):
trec_file.write(f"{query_id} Q0 {doc_id} {rank} {score} {RUN_ID}\n")

print(f"TREC results written to {trec_output_file_path}")
run_trec_eval(trec_output_file_path)
return total_time
# Aggregate statistics
total_time = sum(query_times)
mean_time = np.mean(query_times)
variance_time = np.var(query_times)
min_time = min(query_times)
max_time = max(query_times)
return total_time, mean_time, variance_time, min_time, max_time

if __name__ == "__main__":
cur, conn = setup_database()
Expand Down
1 change: 1 addition & 0 deletions vectordb_benchmark/duckdb_db_config.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
memory_limit:100GB
116 changes: 116 additions & 0 deletions vectordb_benchmark/duckdb_faiss_index_adaptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import faiss_index_adaptor
import duckdb
import faiss_vector_extractor
import time
import numpy as np

class DuckDBVectorDBFaissIndexAdaptor(faiss_index_adaptor.VectorDBFaissIndexAdaptor):
def __init__(self, index_name, DBConfig):
super().__init__(index_name, DBConfig)
self.con = None

def initialize_database_and_table(self, table_name, DBConfig, vector_size):
memory_limit = DBConfig['memory_limit']
self.con = duckdb.connect(database=':memory:')
self.con.execute("INSTALL vss")
self.con.execute("LOAD vss")
self.con.execute(f"PRAGMA memory_limit='{memory_limit}'")

# Create documents table
self.con.execute(f"""
CREATE TABLE {table_name} (
id INT,
vector FLOAT[{vector_size}]
)
""")
print(f"created table {table_name}")

def construct_index(self, table_name, metric):
self.con.execute(f"CREATE INDEX {metric}_idx ON {table_name} USING HNSW(vector) WITH (metric = '{metric}')")
print(f"Index constructed for {table_name} using {metric} metric")

def insert_vector_map_into_table(self, table_name, metric):
start_time = time.time()
for id, vector in self.vector_map.items():
self.con.execute(f"INSERT INTO {table_name} (id, vector) VALUES (?, ?)", (id, vector))
self.con.commit()
end_time = time.time()
print(f"Inserted {len(self.vector_map)} vectors into {table_name} in {end_time - start_time} seconds")

def get_connection(self):
return self.con

# close the connection
def close(self):
self.con.close()

def run_benchmark(self, table_name, metric, K, vector_size, trec_file_path, query_vector_map=None):
print(f"running benchmark for {table_name} with metric {metric}")
# Select appropriate SQL query based on the metric
if metric == 'l2sq':
evaluation_metric = 'array_distance'
elif metric == 'cosine':
evaluation_metric = 'array_cosine_similarity'
elif metric == 'ip':
evaluation_metric = 'array_inner_product'
with open(trec_file_path, 'w') as trec_file:
count = 0
query_times = []
for query_id, query_vector in query_vector_map.items():
sql_query = f"SELECT id, {evaluation_metric}(vector, ?::FLOAT[{vector_size}]) as score FROM {table_name} ORDER BY score DESC LIMIT ?"
# time the execution
start_time = time.time()
results = self.con.execute(sql_query, (query_vector, K)).fetchall()
end_time = time.time()

# Calculate the time for this query and add it to the list
query_time = end_time - start_time
query_times.append(query_time)

# Write results in TREC format
for rank, (doc_id, score) in enumerate(results, start=1):
trec_file.write(f"{query_id} Q0 {doc_id} {rank} {score} DuckDB\n")
count += 1
if count % 100 == 0:
print(f"processed {count} queries")

print(f"TREC results written to {trec_file_path}")
ans = self.run_trec_eval(trec_file_path)
# Aggregate statistics
total_time = sum(query_times)
mean_time = np.mean(query_times)
variance_time = np.var(query_times)
min_time = min(query_times)
max_time = max(query_times)
# create a file to store results
with open(f"{table_name}_benchmark_results.txt", "w") as f:
f.write(f"Total time: {total_time}\n")
f.write(f"Mean time: {mean_time}\n")
f.write(f"Variance time: {variance_time}\n")
f.write(f"Min time: {min_time}\n")
f.write(f"Max time: {max_time}\n")
f.write(f"TREC eval output: {ans}\n")
return total_time, mean_time, variance_time, min_time, max_time

def create_in_memory_hnsw_index_on_file(self, file_path, table_name):
# Open the file-based DuckDB database
file_con = duckdb.connect(file_path)

# Create an in-memory DuckDB database
self.con = duckdb.connect(database=':memory:')
self.con.execute("INSTALL vss")
self.con.execute("LOAD vss")

# Extract data from the file-based table into a Pandas DataFrame
df = file_con.execute(f"SELECT * FROM {table_name}").fetchdf()

df['vector'] = df['vector'].apply(lambda x: x if isinstance(x, list) else list(x))

# Create a new table in the in-memory DuckDB database
self.con.execute("CREATE TABLE msmarco AS SELECT * FROM df")

# Cast the vector column to the required FLOAT[N] type if necessary
self.con.execute("ALTER TABLE msmarco ALTER COLUMN vector SET DATA TYPE FLOAT[]")

# Now you can create the HNSW index on the msmarco table in the in-memory database
self.con.execute(f"CREATE INDEX hnsw_idx ON msmarco USING HNSW(vector) WITH (metric = 'ip')")
13 changes: 13 additions & 0 deletions vectordb_benchmark/encode_msmarco.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
python3 tools/scripts/msmarco-passage/encode_queries.py \
--encoder=bge-base-en-v1.5 \
--input=collections/msmarco-passage/queries.dev.small.tsv \
--output=collections/faiss-queries/msmarco-passage/queries.pkl

python -m pyserini.encode \
input --corpus collections/faiss-queries/msmarco-passage/queries.jsonl \
output --embeddings indexes/msmarco-dev.bge-base-en-v1.5 \
--to-faiss \
encoder --encoder BAAI/bge-base-en-v1.5 --l2-norm \
--device cpu \
--pooling mean \
--batch 32
9 changes: 9 additions & 0 deletions vectordb_benchmark/evaluate_trec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import subprocess

command = [
"python", "-m", "pyserini.eval.trec_eval",
"-c", "-M", "10", "-m", "recip_rank",
"../collections/msmarco-passage/qrels.dev.small.trec",
'../trec_dot_product_output.txt'
]
subprocess.run(command)
Loading