-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1945 Issue: update SingerHelper read method #2053
Conversation
I left this PR as a WIP, as I want to work on it a little more and optimize it, removing such a large number of code nestings for method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch 👍
…/bug-read-method-singerhelper � Conflicts: � airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py
logger.log_by_prefix(*std_data) | ||
|
||
@staticmethod | ||
def _read_std_rows(process: subprocess.Popen, is_message, transform) -> Generator[AirbyteMessage, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _read_std_rows(process: subprocess.Popen, is_message, transform) -> Generator[AirbyteMessage, None, None]: | |
def _read_std_rows(process: subprocess.Popen, is_message, transform) -> Iterator[Tuple[str, IOTextWrapper]: |
airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py
Outdated
Show resolved
Hide resolved
@@ -163,48 +163,67 @@ def get_catalogs(logger, shell_command: str, sync_mode_overrides: Dict[str, Sync | |||
@staticmethod | |||
def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x: x)) -> Generator[AirbyteMessage, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove transform, I don't see its usage it is not transparent enough why it is here
yield line, "ERROR" | ||
|
||
@staticmethod | ||
def _classify_and_convert_out_json_to_airbyte_message(out_json: Dict, transform) -> AirbyteMessage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _classify_and_convert_out_json_to_airbyte_message(out_json: Dict, transform) -> AirbyteMessage: | |
def _airbyte_message_from_json(out_json: Mapping[str, Ayy], transform) -> Optional[AirbyteMessage]: |
/test connector=source-slack-singer
|
/test connector=source-slack-singer
|
/test connector=source-slack-singer
|
/test connector=source-slack-singer
|
/test connector=source-googleanalytics-singer
|
/test connector=source-googleanalytics-singer
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment about EOF, once it's addressed we can merge. Good catch! 🎉
if empty_line_counter >= len(selects_list): | ||
eof = True | ||
|
||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should only run this section of code (the try except
then if process.returncode
etc.. if EOF has not been reached, so this should go outside the while loop
/publish connector=connectors/source-slack-singer
|
/publish connector=connectors/source-googleanalytics-singer
|
|
||
@staticmethod | ||
def _airbyte_message_from_json(transformed_json: Mapping[str, Any]) -> Optional[AirbyteMessage]: | ||
if transformed_json is None or transformed_json.get("type") == "SCHEMA" or transformed_json.get("type") == "ACTIVATE_VERSION": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if transformed_json is None or transformed_json.get("type") in {"SCHEMA", "ACTIVATE_VERSION"}:
What
It was found that for all Singer-based connectors, the data read from the console can be not always quite correct. The problem was that we use two
IOTextWrappers
: one forstdout
, and the second forstderror
. In the code, we exit the loop and stop receiving data from the console as soon as one of theIOTextWrappers
gives us an empty string (this means the end of the file). However, we need bothIOTextWrappers
to return an empty string and only then stop reading from the output console. Thus, the previous code sometimes exited the loop earlier and did not read a certain number of records and states, this was clearly seen on the "users" stream for the Slack connector..How
Describe the solution
Pre-merge Checklist
Recommended reading order
test.java
component.ts