Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Salesforce capped records to page_size value for incremental syncs #13657

Closed
marcosmarxm opened this issue Jun 9, 2022 · 0 comments · Fixed by #13658
Closed

Source Salesforce capped records to page_size value for incremental syncs #13657

marcosmarxm opened this issue Jun 9, 2022 · 0 comments · Fixed by #13658

Comments

@marcosmarxm
Copy link
Member

Environment

  • Airbyte version: 0.39.14-alpha
  • OS Version / Instance: macOS
  • Deployment: Docker
  • Source Connector and version: Salesforce 1.0.9
  • Destination Connector and version: Postgres
  • Step where error happened: Sync Job

Current Behavior

This is only happening for streams with more records than the page_size. Today the page_size value is 30k records.
So this issue is not reproducible from our integration account without some tests:

Sync using Salesforce connector 1.0.9 version only using ObjectPermission stream with sycn mode dedup + history
image

Please reset the connection because is incremental sync.
Changing page_size=1000 here:

class BulkSalesforceStream(SalesforceStream):
page_size = 30000
DEFAULT_WAIT_TIMEOUT_SECONDS = 600
MAX_CHECK_INTERVAL_SECONDS = 2.0
MAX_RETRY_NUMBER = 3

Sync using dev build after the change, the sync is capped to 1k record.
image

The reason why this is happening is:

2022-06-09 19:07:51 source > total count records 100
2022-06-09 19:07:51 source > total counter records 1000

Airbyte uses count variable to compare with page_size if it is the last page.

count = 0
record: Mapping[str, Any] = {}
for count, record in self.read_with_chunks(self.download_data(url=job_full_url)):
yield record
self.delete_job(url=job_full_url)
if count < self.page_size:
# this is a last page
break

It uses this way because Salesforce doesn't give a direct next token to validate last page... so in theory should work as the following: if you have 70k records, break into three batches of 30+30+10 and the latest would stop the sync because the number of records is lower than the page_size.
The problem happens that the variable count is not the sum up of all records in the sync BUT the chunk size (which is 100)

def read_with_chunks(self, path: str = None, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]:
"""
Reads the downloaded binary data, using lines chunks, set by `chunk_size`.
@ path: string - the path to the downloaded temporarily binary data.
@ chunk_size: int - the number of lines to read at a time, default: 100 lines / time.
"""
try:
with open(path, "r", encoding=self.encoding) as data:
chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix")
for chunk in chunks:
chunk = chunk.replace({nan: None}).to_dict(orient="records")
for n, row in enumerate(chunk, 1):
yield n, row

To solve the problem we can modify the chunk size to be exactly the page size BUT this could throw OOM errors for large column streams. In my opinion to solve the problem we can execute the counter here (see there is a count but is overwrite by the for loop)

count = 0
record: Mapping[str, Any] = {}
for count, record in self.read_with_chunks(self.download_data(url=job_full_url)):
yield record

Other example using 1.0.9 version
image

and after applying the counter as suggested here:
image

Expected Behavior

Sync all data in the first run

Logs

Steps to Reproduce

  1. setup Salesforce account with Integration Account
  2. select ObjectPermissions as dedup + history sync mode
  3. see description above

Are you willing to submit a PR?

Yes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant