-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1945 Issue: update SingerHelper read method #2053
Changes from 5 commits
c6d1b5f
9bb2ee1
6e2c3c7
ed383c5
2de4c33
4119d48
0badd97
c2abace
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -163,48 +163,67 @@ def get_catalogs(logger, shell_command: str, sync_mode_overrides: Dict[str, Sync | |||||
@staticmethod | ||||||
def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x: x)) -> Generator[AirbyteMessage, None, None]: | ||||||
with subprocess.Popen(shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) as p: | ||||||
sel = selectors.DefaultSelector() | ||||||
sel.register(p.stdout, selectors.EVENT_READ) | ||||||
sel.register(p.stderr, selectors.EVENT_READ) | ||||||
ok = True | ||||||
while ok: | ||||||
for key, _ in sel.select(): | ||||||
line = key.fileobj.readline() | ||||||
if not line: | ||||||
ok = False | ||||||
try: | ||||||
p.wait(timeout=60) | ||||||
except subprocess.TimeoutExpired: | ||||||
raise Exception(f"Underlying command {shell_command} is hanging") | ||||||
|
||||||
if p.returncode != 0: | ||||||
raise Exception(f"Underlying command {shell_command} failed with exit code {p.returncode}") | ||||||
|
||||||
elif key.fileobj is p.stdout: | ||||||
out_json = to_json(line) | ||||||
if out_json is not None and is_message(out_json): | ||||||
transformed_json = transform(out_json) | ||||||
if transformed_json is not None: | ||||||
if transformed_json.get("type") == "SCHEMA" or transformed_json.get("type") == "ACTIVATE_VERSION": | ||||||
pass | ||||||
elif transformed_json.get("type") == "STATE": | ||||||
out_record = AirbyteStateMessage(data=transformed_json["value"]) | ||||||
out_message = AirbyteMessage(type=Type.STATE, state=out_record) | ||||||
yield transform(out_message) | ||||||
else: | ||||||
# todo: check that messages match the discovered schema | ||||||
stream_name = transformed_json["stream"] | ||||||
out_record = AirbyteRecordMessage( | ||||||
stream=stream_name, | ||||||
data=transformed_json["record"], | ||||||
emitted_at=int(datetime.now().timestamp()) * 1000, | ||||||
) | ||||||
out_message = AirbyteMessage(type=Type.RECORD, record=out_record) | ||||||
yield transform(out_message) | ||||||
else: | ||||||
logger.log_by_prefix(line, "INFO") | ||||||
for std_data in SingerHelper._read_std_rows(p, is_message, transform): | ||||||
if isinstance(std_data, AirbyteMessage): | ||||||
yield std_data | ||||||
else: | ||||||
logger.log_by_prefix(*std_data) | ||||||
|
||||||
@staticmethod | ||||||
def _read_std_rows(process: subprocess.Popen, is_message, transform) -> Generator[AirbyteMessage, None, None]: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
sel = selectors.DefaultSelector() | ||||||
sel.register(process.stdout, selectors.EVENT_READ) | ||||||
sel.register(process.stderr, selectors.EVENT_READ) | ||||||
eof = False | ||||||
while not eof: | ||||||
selects_list = sel.select() | ||||||
empty_line_counter = 0 | ||||||
for key, _ in selects_list: | ||||||
line = key.fileobj.readline() | ||||||
if not line: | ||||||
empty_line_counter += 1 | ||||||
if empty_line_counter >= len(selects_list): | ||||||
eof = True | ||||||
|
||||||
try: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should only run this section of code (the |
||||||
process.wait(timeout=60) | ||||||
except subprocess.TimeoutExpired: | ||||||
raise Exception(f"Underlying command {process.args} is hanging") | ||||||
|
||||||
if process.returncode != 0: | ||||||
raise Exception(f"Underlying command {process.args} failed with exit code {process.returncode}") | ||||||
|
||||||
elif key.fileobj is process.stdout: | ||||||
out_json = to_json(line) | ||||||
if out_json is not None and is_message(out_json): | ||||||
message_data = SingerHelper._classify_and_convert_out_json_to_airbyte_message(out_json, transform) | ||||||
if message_data is not None: | ||||||
yield message_data | ||||||
else: | ||||||
logger.log_by_prefix(line, "ERROR") | ||||||
yield line, "INFO" | ||||||
else: | ||||||
yield line, "ERROR" | ||||||
|
||||||
@staticmethod | ||||||
def _classify_and_convert_out_json_to_airbyte_message(out_json: Dict, transform) -> AirbyteMessage: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
transformed_json = transform(out_json) | ||||||
if transformed_json is not None: | ||||||
if transformed_json.get("type") == "SCHEMA" or transformed_json.get("type") == "ACTIVATE_VERSION": | ||||||
pass | ||||||
elif transformed_json.get("type") == "STATE": | ||||||
out_record = AirbyteStateMessage(data=transformed_json["value"]) | ||||||
out_message = AirbyteMessage(type=Type.STATE, state=out_record) | ||||||
return transform(out_message) | ||||||
else: | ||||||
# todo: check that messages match the discovered schema | ||||||
stream_name = transformed_json["stream"] | ||||||
out_record = AirbyteRecordMessage( | ||||||
stream=stream_name, | ||||||
data=transformed_json["record"], | ||||||
emitted_at=int(datetime.now().timestamp()) * 1000, | ||||||
) | ||||||
out_message = AirbyteMessage(type=Type.RECORD, record=out_record) | ||||||
return transform(out_message) | ||||||
eugene-kulak marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
@staticmethod | ||||||
def create_singer_catalog_with_selection(masked_airbyte_catalog: ConfiguredAirbyteCatalog, discovered_singer_catalog: object) -> str: | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove transform, I don't see its usage it is not transparent enough why it is here