From dde8a92541013387dda7b726c21b84a974cafaa7 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 31 Dec 2025 22:05:41 +0800 Subject: [PATCH] feat(avro): support writing multiple blocks --- src/iceberg/avro/avro_writer.cc | 1 + src/iceberg/test/avro_test.cc | 54 ++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index b426a756c..307f2fd62 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -80,6 +80,7 @@ class DirectEncoderBackend : public AvroWriteBackend { Status WriteRow(const Schema& write_schema, const ::arrow::Array& array, int64_t row_index) override { + writer_->syncIfNeeded(); ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_, writer_->encoder(), write_schema, array, row_index, encode_ctx_)); diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index f84d3c81c..b7a0f5463 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -18,20 +18,25 @@ */ #include +#include #include #include #include +#include +#include #include #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/avro/avro_stream_internal.h" #include "iceberg/avro/avro_writer.h" #include "iceberg/file_reader.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/test/matchers.h" #include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" namespace iceberg::avro { @@ -527,7 +532,9 @@ class AvroWriterTest : public ::testing::Test, skip_datum_ = GetParam(); } - void WriteAvroFile(std::shared_ptr schema, const std::string& json_data) { + void WriteAvroFile( + std::shared_ptr schema, const std::string& json_data, + const std::unordered_map& extra_properties = {}) { ArrowSchema arrow_c_schema; ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); @@ -548,6 +555,9 @@ class AvroWriterTest : public ::testing::Test, auto writer_properties = WriterProperties::default_properties(); writer_properties->Set(WriterProperties::kAvroSkipDatum, skip_datum_); + for (const auto& [key, value] : extra_properties) { + writer_properties->mutable_configs().emplace(key, value); + } auto writer_result = WriterFactoryRegistry::Open( FileFormatType::kAvro, {.path = temp_avro_file_, @@ -772,6 +782,48 @@ TEST_P(AvroWriterTest, WriteLargeDataset) { VerifyWrittenData(json.str()); } +TEST_P(AvroWriterTest, MultipleAvroBlocks) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "name", string())}); + + const std::string json_data = R"([ + [1, "Alice_with_a_very_long_name_to_exceed_sync_interval"], + [2, "Bob_with_another_very_long_name_to_exceed_sync_interval"], + [3, "Charlie_with_yet_another_very_long_name_to_exceed_sync"], + [4, "David_with_a_super_long_name_that_will_exceed_interval"], + [5, "Eve_with_an_extremely_long_name_to_force_new_block_here"] + ])"; + + const std::vector> + test_cases = {{"32", 5}, {"65536", 1}}; + + for (const auto& [interval, num_blocks] : test_cases) { + WriteAvroFile(schema, json_data, + {{WriterProperties::kAvroSyncInterval.key(), interval}}); + VerifyWrittenData(json_data); + + // Use raw avro-cpp reader to count blocks by tracking previousSync() changes + auto mock_io = internal::checked_pointer_cast(file_io_); + auto input = mock_io->fs()->OpenInputFile(temp_avro_file_).ValueOrDie(); + auto input_stream = std::make_unique(std::move(input), 1024 * 1024); + ::avro::DataFileReader<::avro::GenericDatum> avro_reader(std::move(input_stream)); + ::avro::GenericDatum datum(avro_reader.dataSchema()); + + size_t block_count = 0; + int64_t last_sync = -1; + + while (avro_reader.read(datum)) { + if (int64_t current_sync = avro_reader.previousSync(); current_sync != last_sync) { + block_count++; + last_sync = current_sync; + } + } + + ASSERT_EQ(block_count, num_blocks); + } +} + // Instantiate parameterized tests for both direct encoder and GenericDatum paths INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest, ::testing::Values(true, false),