Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint does not preserve reader and writer features for the table protocol. #2288

Closed
bryan-c-castillo-ms opened this issue Mar 15, 2024 · 3 comments · Fixed by #2293
Closed
Assignees
Labels
bug Something isn't working

Comments

@bryan-c-castillo-ms
Copy link

Environment

Delta-rs version:
0.17.1

Binding:
Rust

Environment:

  • Cloud provider: Azure
  • OS: Windows
  • Other:

Bug

Taking a checkpoint does not preserve read and writer features for the table protocol.

What happened:

I was writing RecordBatches to a delta table using rust APIs with append only mode. The writes stop working after I take a checkpoint though. After creating a checkpoint and attempting to write a new batch I see the following error.

DeltatableError: Transaction failed: Writer features must be specified for writerversion >= 7, please specify: AppendOnly

What you expected to happen:
I expect the write after a checkpoint to succeed and the checkpoint retain readerFeatures and writerFeatures for the table protocol.

How to reproduce it:

  1. Create a table
  2. Write a batch to the table
  3. Take a checkpoint
  4. Attempt to write a batch to the table in append mode.

More details:
I was not seeing this error when using 0.16.2.

I noticed that the parquet filed for the checkpoint, 00000000000000000001.checkpoint.parquet only contains the following information for the protocol:

{'minReaderVersion': 3.0, 'minWriterVersion': 7.0}

If I look at the protocol information for "00000000000000000000.json" I see the following:

{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["appendOnly","timestampNtz"]}}

I think the code in checkpoint.rs is not serializing readerFeatures or writerFeatures.

@bryan-c-castillo-ms bryan-c-castillo-ms added the bug Something isn't working label Mar 15, 2024
@ion-elgreco ion-elgreco self-assigned this Mar 15, 2024
@ion-elgreco
Copy link
Collaborator

I'll take a look this weekend

@bryan-c-castillo-ms
Copy link
Author

Here is some stripped down code to reproduce the issue I saw.

use std::future::IntoFuture;
use std::sync::Arc;
use std::vec;
use std::time::{SystemTime, UNIX_EPOCH};

use deltalake::kernel::{Action, DataType, StructField};

use deltalake::arrow::array::{builder, StringBuilder};
use deltalake::arrow::array::RecordBatch;
use deltalake::arrow::datatypes::Field as AField;
use deltalake::arrow::datatypes::Schema as ASchema;
use deltalake::arrow::datatypes::DataType as ADataType;
use deltalake::arrow::datatypes::TimeUnit as ATimeUnit;
use deltalake::operations::transaction::commit;
use deltalake::operations::writer::{DeltaWriter, WriterConfig};
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::{open_table, DeltaOps, DeltaTableBuilder};

use deltalake::checkpoints::create_checkpoint;

use async_std::task::block_on;

type AppResult<T> = Result<T, Box<dyn std::error::Error>>;

pub fn now() -> u128 {
    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros()
}

struct Person {
    pub name: String,
    pub favorite_number: Option<i32>,
    pub created: u128,
}

impl Person {
    pub fn new(name: impl Into<String>, favorite_number: Option<i32>) -> Self {
        Self {
            name: name.into(),
            favorite_number,
            created: now(),
        }
    }
}

struct PersonBatch {
    pub name: StringBuilder,
    pub favorite_number: builder::Int32Builder,
    pub created: builder::TimestampMicrosecondBuilder,
}

impl PersonBatch {
    pub fn new() -> Self {
        Self {
            name: StringBuilder::new(),
            favorite_number: builder::Int32Builder::new(),
            created: builder::TimestampMicrosecondBuilder::new(),
        }
    }

    pub fn append(&mut self, person: &Person) {
        self.name.append_value(&person.name);
        self.favorite_number.append_option(person.favorite_number);
        self.created.append_value(person.created as i64);
    }

    pub fn finish(&mut self) -> RecordBatch {
        let schema = PersonBatch::get_arrow_schema();
        let batch = RecordBatch::try_new(
            Arc::new(schema),
            vec![
                Arc::new(self.name.finish()),
                Arc::new(self.favorite_number.finish()),
                Arc::new(self.created.finish()),
            ],
        )
        .unwrap();
        batch
    }

    pub fn get_table_schema() -> Vec<StructField> {
        vec![
            StructField::new("name", DataType::STRING, false),
            StructField::new("favorite_number", DataType::INTEGER, true),
            StructField::new("created", DataType::TIMESTAMPNTZ, false),
        ]
    }
    
    pub fn get_arrow_schema() -> ASchema {
        ASchema::new(vec![
            AField::new("name", ADataType::Utf8, false),
            AField::new("favorite_number", ADataType::Int32, true),
            AField::new("created", ADataType::Timestamp(ATimeUnit::Microsecond, None), false),
        ])
    }

    pub async fn create_if_not_exists(table_uri: impl AsRef<str>) -> AppResult<()> {
        let table = DeltaTableBuilder::from_uri(table_uri)
            .build()?;
    
        let config: HashMap<String, Option<String>> = HashMap::new();
        let storage_options: HashMap<String, String> = HashMap::new();
    
        let builder = DeltaOps(table)
            .create()
            .with_save_mode(SaveMode::Ignore)
            .with_configuration(config)
            .with_storage_options(storage_options)
            .with_columns(PersonBatch::get_table_schema())
            .with_partition_columns(vec![String::from("name")]);
        
        let _ = builder.into_future().await?;
        Ok(())
    }
}

fn create_batch(people: Vec<Person>) -> AppResult<RecordBatch> {
    let mut person_batch = PersonBatch::new();

    for person in people {
        person_batch.append(&person);
    }

    let batch = person_batch.finish();
    Ok(batch)
}

async fn write_batch(table_uri: impl AsRef<str>, people: Vec<Person>) -> AppResult<()> {
    let mut table = open_table(table_uri).await?;
    table.load().await?;
    let batch = create_batch(people)?;

    let object_store = table.object_store();
    let log_store = table.log_store();

    let write_config = WriterConfig::new(batch.schema().clone(), vec![], None, None, None);
    let mut writer = DeltaWriter::new(object_store.clone(), write_config);

    writer.write(&batch).await?;
    let write_actions = writer.close().await?;

    let actions: Vec<Action> = write_actions
                .iter()
                .map(|add| Action::Add(add.clone()))
                .collect();

    let operation = DeltaOperation::Write {
        mode: SaveMode::Append,
        partition_by: Some(vec![]),
        predicate: None,
    };
    
    let _ = commit(&*log_store, &actions, operation, Some(table.snapshot()?), None).await?;
    Ok(())
}

pub async fn checkpoint(table_uri: impl AsRef<str>) -> AppResult<()> {
    let mut table = open_table(table_uri).await?;
    table.load().await?;

    create_checkpoint(&mut table).await?;
    Ok(())
}

async fn run_main() -> AppResult<()> {
    let table_path = "data/people";

    PersonBatch::create_if_not_exists(table_path).await?; 

    write_batch(table_path, vec![
        Person::new("Alice", Some(42)),
        Person::new("Bob", None),
    ]).await?;

    checkpoint(table_path).await?;

    write_batch(table_path, vec![
        Person::new("Bailey", Some(12)),
        Person::new("Ollie", None),
    ]).await?;

    Ok(())
}

fn main() -> AppResult<()>{
    block_on(run_main())?;
    Ok(())
}

Output:

Error: Transaction { source: WriterFeaturesRequired(AppendOnly) }


@bryan-c-castillo-ms
Copy link
Author

For this bug to be reproduced you need to have a batch with a TIMESTAMPNTZ type.

ion-elgreco added a commit that referenced this issue Mar 18, 2024
# Description
The CheckPoint schema simply didn't contain the features as fields in
the Protocol struct, and we weren't propagating them as well within the
checkpoint function.

- closes #2288
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants