Skip to content

Commit

Permalink
Remove .close() method from kafka producer in python sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
woop committed Nov 16, 2019
1 parent 6c62d9f commit 0eebd72
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,23 @@ def ingest_kafka(
num_chunks = max(dataframe.shape[0] / max(chunk_size, 100), 1)
df_chunks = np.array_split(dataframe, num_chunks)

# Create queue through which encoding and ingestion will coordinate
chunk_queue = Queue()

# Start ingestion process to push feature rows to Kafka
ingestion_process = Process(
target=_kafka_feature_row_chunk_producer,
args=(
chunk_queue,
num_chunks,
producer,
feature_set.get_kafka_source_topic(),
progress_bar,
),
)

try:
# Create queue through which encoding and ingestion will coordinate
chunk_queue = Queue()

# Start ingestion process to push feature rows to Kafka
ingestion_process = Process(
target=_kafka_feature_row_chunk_producer,
args=(
chunk_queue,
num_chunks,
producer,
feature_set.get_kafka_source_topic(),
progress_bar,
),
)
# Start ingestion process
ingestion_process.start()

# Create a pool of workers to convert df chunks into feature row chunks
Expand All @@ -99,7 +101,6 @@ def ingest_kafka(
finally:
producer.flush()
ingestion_process.join()
ingestion_process.close()
rows_ingested = progress_bar.total
progress_bar.close()
print(
Expand Down

0 comments on commit 0eebd72

Please sign in to comment.