Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot stream rows larger than ~1MB #36

Open
bchazalet opened this issue Dec 16, 2015 · 11 comments
Open

Cannot stream rows larger than ~1MB #36

bchazalet opened this issue Dec 16, 2015 · 11 comments
Labels

Comments

@bchazalet
Copy link
Contributor

I am getting this error from bottledwater:

./kafka/bottledwater: While reading snapshot: PGRES_FATAL_ERROR: ERROR:  bottledwater_export: Avro conversion failed: Cannot write 4145327 bytes in memory buffer

There might very well be large blobs in the table (in a bytea column) it's processing before it fails: is there an implicit limit to the size a row due to some avro limitations?

@bchazalet bchazalet changed the title is there a limit on a row/fiels's value size? is there a limit on a row/fields' value size? Dec 16, 2015
@bchazalet
Copy link
Contributor Author

I've found a

#define MAX_BUFFER_LENGTH 1048576

in io_util.c. My problem might be related to that.

@ept
Copy link
Contributor

ept commented Dec 30, 2015

Yeah, seems likely that you're hitting that limit. I put it there to avoid accidentally allocating unreasonably large amounts of memory in the case of a bug. Can you try increasing the limit?

I guess we might be able to bump it up — allocating a few megabytes probably isn't going to hurt anyone these days — but we should have some sort of limit. It's not ideal to load a large blob entirely into memory, but I'm not sure the APIs allow streaming large blobs incrementally.

@msakrejda
Copy link

This is also potentially an issue with Kafka itself, right? If the message exceeds Kafka's messages.max.bytes, there's not much bottledwater can do here...

/cc @samstokes

@samstokes
Copy link
Contributor

Documenting current behaviour:

  • if the length in bytes of an encoded message (depending on the --output-format) exceeds Kafka's messages.max.bytes, the broker will refuse the message. If the Bottled Water client is running with --on-error=log, it will log the error but continue running, dropping the offending row and acknowledging the corresponding WAL as flushed. If the client is running with --on-error=exit, it will stop running without consuming the corresponding WAL.
  • if the length in bytes of an individual value (e.g. a TEXT or BYTEA) exceeds MAX_BUFFER_LENGTH in io_util.c, the extension will refuse to send the offending row, and will terminate the replication stream, regardless of the client's --on-error setting.

I'm pretty tempted to just truncate large values in the extension, since they're almost certainly going to hit problems at every step of whatever data pipeline they're flowing into (broker message.max.bytes, consumer fetch.message.max.bytes, whatever system they're flowing into next), and somebody along the line will have to decide what to do with arbitrarily large messages.

@samstokes
Copy link
Contributor

I'm pretty tempted to just truncate large values in the extension

Unfortunately I wasn't thinking clearly when I wrote this. Large values could be strings or byte arrays, but they could also be records or other not-cleanly-truncatable things (especially since a large string is probably occurring within a record). They're written as Avro, and truncating them would produce binary garbage, so the Avro API doesn't even allow it.

Our choices are drop the value or abort.

@mcapitanio
Copy link
Contributor

I agree @samstokes, we have not many choices.

I could add the one to manager a sort of "Dead lettera topic" in which the Extension could serialize the messages which don't meet the max lenght requirement in Avro format (for example a folder on the local filesystem for each table/topic or a single folder with namespace for the serialize messages).

Doing so the choice to abort would not be so distructive, I could recover the dropped messages in the DLT and, according to it's key, decide how to manage It.

Some drawbacks to be evaluated are maybe impact on extension performance and the risk of local storage saturation.

@msakrejda
Copy link

Another possible (long-term--definitely don't think this is worth doing short-term) solution is to break up the value into separate smaller messages that fit under the limit. This is pretty ugly and may not be worth doing, but I think it's an option, no?

@samstokes
Copy link
Contributor

@uhoh-itsmaciek we could if we changed the framing protocol to have a "packet switched" semantics. Currently each frame contains an entire row, encoded as an Avro record. (Frames themselves are also Avro records, so the row record is stored as a byte array in a field of the frame record.) We could instead split up the record Avro into multiple frames, with the initial frame having a "length" field saying how many frames to expect, and have the client recombine the parts. (We might also need sequence numbers, and a checksum... :)) Is that the sort of thing you had in mind?

Unfortunately, that doesn't get around the reason the code has a maximum size limit in the first place, which is to avoid allocating large memory buffers in the extension. If you have a table with a single BLOB column which is used to store 200MB Docker images, you're going to need a 200MB buffer to write the Avro value representing each row. I'm not sure if there's a C API for generating Avro in a streaming basis.

@msakrejda
Copy link

Yeah, that's exactly what I had in mind (N.B.: I know almost nothing about Avro or bottledwater =D ). The memory usage is certainly a concern, too, but that'd be a technical concern we could theoretically overcome (rather than a fundamental limitation of the design).

I think this is largely, moot, though, and there's more value in working with the more common smaller messages.

@samstokes
Copy link
Contributor

I've submitted PR #115 which offers a mitigation: if running the client with the --on-error=log policy, it will also configure the initial snapshot and replication output plugin to run with a similar error policy, so that if it cannot encode a row (e.g. due to hitting the size limit as discussed here), it will skip the offending row and continue on, instead of terminating the snapshot or replication stream.

N.B. once again this is a tradeoff between availability and consistency. Skipping the value means that the Kafka stream will be inconsistent with Postgres.

@mcapitanio your "dead letter" idea makes sense, although writing the entire value to the dead letter area seems a bit redundant given it's already stored in Postgres. Maybe it would make sense to write the primary key to the dead letter area, so that you could look up the full row in Postgres to diagnose. It's a tricky area though since often the primary key will be personally identifiable information (e.g. if you're using a username or email address as a primary key, or even a sequential id if your site exposes its database ids to end users), so we couldn't implement this simply by writing the primary key to the log.

(Also if the primary key by itself is larger than the limit this wouldn't help. On the other hand if you have primary keys larger than 1MB you probably have bigger problems :))

@samstokes samstokes changed the title is there a limit on a row/fields' value size? Cannot stream rows larger than ~1MB Aug 26, 2016
@maparent
Copy link

maparent commented Mar 6, 2017

Not sure if this was considered: In some cases, it would make sense to exclude certain columns in the configuration, or to allow to replace them with something else. Many blobs are immutable in practice, and it makes sense to store references to another form of storage rather than the blob itself in the event stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants