From b33bea0982fefb5c47386fb04c1545aa778529da Mon Sep 17 00:00:00 2001 From: x-tong Date: Sun, 25 Jan 2026 19:07:08 +0800 Subject: [PATCH 1/4] [AURON #1850] Add FlinkArrowUtils for Flink-Arrow type conversion Part 1 of Flink RowData to Arrow conversion implementation. This PR adds the foundational type conversion utilities: - FlinkArrowUtils: Bidirectional conversion between Flink LogicalType and Arrow types - Support for all common Flink types including primitives, temporal, and complex types - Comprehensive unit tests for type conversion --- .gitignore | 5 +- .../auron-flink-runtime/pom.xml | 35 +++ .../auron/flink/arrow/FlinkArrowUtils.java | 193 +++++++++++++++++ .../flink/arrow/FlinkArrowUtilsTest.java | 202 ++++++++++++++++++ 4 files changed, 434 insertions(+), 1 deletion(-) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java diff --git a/.gitignore b/.gitignore index 9bb8dcb54..9f5a6a269 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,7 @@ common/src/main/resources/auron-build-info.properties .flattened-pom.xml -dependency-reduced-pom.xml \ No newline at end of file +dependency-reduced-pom.xml + +#lsp +*.prefs \ No newline at end of file diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml b/auron-flink-extension/auron-flink-runtime/pom.xml index 3b5dfea21..4998e04c1 100644 --- a/auron-flink-extension/auron-flink-runtime/pom.xml +++ b/auron-flink-extension/auron-flink-runtime/pom.xml @@ -38,6 +38,41 @@ proto ${project.version} + + + + org.apache.arrow + arrow-c-data + + + org.apache.arrow + arrow-memory-unsafe + + + org.apache.arrow + arrow-vector + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + + org.apache.auron + auron-core + ${project.version} + test-jar + test + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java new file mode 100644 index 000000000..99b4b55e8 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; + +/** + * Utility class for converting between Flink LogicalType and Arrow types. + */ +public class FlinkArrowUtils { + + /** + * Root allocator for Arrow memory management. + */ + public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(ROOT_ALLOCATOR::close)); + } + + /** + * Creates a child allocator from the root allocator. + * + * @param name Name for the child allocator + * @return A new child allocator + */ + public static BufferAllocator createChildAllocator(String name) { + return ROOT_ALLOCATOR.newChildAllocator(name, 0, Long.MAX_VALUE); + } + + /** + * Converts a Flink LogicalType to Arrow ArrowType. + * + * @param logicalType The Flink logical type + * @return The corresponding Arrow type + * @throws UnsupportedOperationException if the type is not supported + */ + public static ArrowType toArrowType(LogicalType logicalType) { + if (logicalType instanceof NullType) { + return ArrowType.Null.INSTANCE; + } else if (logicalType instanceof BooleanType) { + return ArrowType.Bool.INSTANCE; + } else if (logicalType instanceof TinyIntType) { + return new ArrowType.Int(8, true); + } else if (logicalType instanceof SmallIntType) { + return new ArrowType.Int(16, true); + } else if (logicalType instanceof IntType) { + return new ArrowType.Int(32, true); + } else if (logicalType instanceof BigIntType) { + return new ArrowType.Int(64, true); + } else if (logicalType instanceof FloatType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + } else if (logicalType instanceof DoubleType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + } else if (logicalType instanceof VarCharType || logicalType instanceof CharType) { + return ArrowType.Utf8.INSTANCE; + } else if (logicalType instanceof VarBinaryType || logicalType instanceof BinaryType) { + return ArrowType.Binary.INSTANCE; + } else if (logicalType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) logicalType; + // Note: Arrow Java only has DecimalVector (128-bit) and Decimal256Vector (256-bit). + // There's no Decimal64Vector, so we always use 128-bit to match the actual storage. + // Setting bitWidth=64 would cause FFI export issues since the actual data is 128-bit. + return new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128); + } else if (logicalType instanceof DateType) { + return new ArrowType.Date(DateUnit.DAY); + } else if (logicalType instanceof TimeType) { + // Flink TimeType stores time as milliseconds (int), convert to Arrow Time64 (microseconds) + return new ArrowType.Time(TimeUnit.MICROSECOND, 64); + } else if (logicalType instanceof TimestampType) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else if (logicalType instanceof LocalZonedTimestampType) { + // LocalZonedTimestampType is similar to TimestampType but with UTC timezone + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + } else { + throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString()); + } + } + + /** + * Converts a Flink LogicalType to an Arrow Field. + * + * @param name The field name + * @param logicalType The Flink logical type + * @param nullable Whether the field is nullable + * @return The corresponding Arrow Field + */ + public static Field toArrowField(String name, LogicalType logicalType, boolean nullable) { + if (logicalType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) logicalType; + LogicalType elementType = arrayType.getElementType(); + FieldType fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null); + Field elementField = toArrowField("element", elementType, elementType.isNullable()); + List children = new ArrayList<>(); + children.add(elementField); + return new Field(name, fieldType, children); + } else if (logicalType instanceof RowType) { + RowType rowType = (RowType) logicalType; + FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null); + List children = new ArrayList<>(); + for (RowType.RowField field : rowType.getFields()) { + children.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + } + return new Field(name, fieldType, children); + } else if (logicalType instanceof MapType) { + MapType mapType = (MapType) logicalType; + LogicalType keyType = mapType.getKeyType(); + LogicalType valueType = mapType.getValueType(); + + // Create entries field (struct) + FieldType entriesFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null); + List entriesChildren = new ArrayList<>(); + entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType, false)); + entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType, valueType.isNullable())); + Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, entriesFieldType, entriesChildren); + + // Create map field + FieldType mapFieldType = new FieldType(nullable, new ArrowType.Map(false), null); + List mapChildren = new ArrayList<>(); + mapChildren.add(entriesField); + return new Field(name, mapFieldType, mapChildren); + } else { + ArrowType arrowType = toArrowType(logicalType); + FieldType fieldType = new FieldType(nullable, arrowType, null); + return new Field(name, fieldType, new ArrayList<>()); + } + } + + /** + * Converts a Flink RowType to an Arrow Schema. + * + * @param rowType The Flink row type + * @return The corresponding Arrow Schema + */ + public static Schema toArrowSchema(RowType rowType) { + List fields = new ArrayList<>(); + for (RowType.RowField field : rowType.getFields()) { + fields.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + } + return new Schema(fields); + } + + private FlinkArrowUtils() { + // Utility class + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java new file mode 100644 index 000000000..17bce4dba --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowUtils. */ +public class FlinkArrowUtilsTest { + + @Test + public void testBasicTypeConversion() { + // Boolean + assertEquals(ArrowType.Bool.INSTANCE, FlinkArrowUtils.toArrowType(new BooleanType())); + + // Integer types + assertEquals(new ArrowType.Int(8, true), FlinkArrowUtils.toArrowType(new TinyIntType())); + assertEquals(new ArrowType.Int(16, true), FlinkArrowUtils.toArrowType(new SmallIntType())); + assertEquals(new ArrowType.Int(32, true), FlinkArrowUtils.toArrowType(new IntType())); + assertEquals(new ArrowType.Int(64, true), FlinkArrowUtils.toArrowType(new BigIntType())); + + // Floating point types + assertEquals( + new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), + FlinkArrowUtils.toArrowType(new FloatType())); + assertEquals( + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + FlinkArrowUtils.toArrowType(new DoubleType())); + + // String and binary types + assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new VarCharType(100))); + assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new CharType(10))); + assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new VarBinaryType(100))); + assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new BinaryType(10))); + + // Decimal type + DecimalType decimalType = new DecimalType(10, 2); + ArrowType arrowDecimal = FlinkArrowUtils.toArrowType(decimalType); + assertTrue(arrowDecimal instanceof ArrowType.Decimal); + assertEquals(10, ((ArrowType.Decimal) arrowDecimal).getPrecision()); + assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale()); + + // Date and timestamp types + assertEquals(new ArrowType.Date(DateUnit.DAY), FlinkArrowUtils.toArrowType(new DateType())); + assertEquals( + new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), FlinkArrowUtils.toArrowType(new TimestampType(3))); + } + + @Test + public void testArrayTypeConversion() { + ArrayType arrayType = new ArrayType(new IntType()); + Field field = FlinkArrowUtils.toArrowField("test_array", arrayType, true); + + assertEquals("test_array", field.getName()); + assertTrue(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.List); + assertEquals(1, field.getChildren().size()); + + Field elementField = field.getChildren().get(0); + assertEquals("element", elementField.getName()); + assertTrue(elementField.getType() instanceof ArrowType.Int); + } + + @Test + public void testRowTypeConversion() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + Field field = FlinkArrowUtils.toArrowField("test_row", rowType, false); + + assertEquals("test_row", field.getName()); + assertFalse(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.Struct); + assertEquals(2, field.getChildren().size()); + + Field idField = field.getChildren().get(0); + assertEquals("id", idField.getName()); + assertTrue(idField.getType() instanceof ArrowType.Int); + + Field nameField = field.getChildren().get(1); + assertEquals("name", nameField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType()); + } + + @Test + public void testMapTypeConversion() { + MapType mapType = new MapType(new VarCharType(100), new IntType()); + Field field = FlinkArrowUtils.toArrowField("test_map", mapType, true); + + assertEquals("test_map", field.getName()); + assertTrue(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.Map); + assertEquals(1, field.getChildren().size()); + + Field entriesField = field.getChildren().get(0); + assertEquals("entries", entriesField.getName()); + assertTrue(entriesField.getType() instanceof ArrowType.Struct); + assertEquals(2, entriesField.getChildren().size()); + + Field keyField = entriesField.getChildren().get(0); + assertEquals("key", keyField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, keyField.getType()); + + Field valueField = entriesField.getChildren().get(1); + assertEquals("value", valueField.getName()); + assertTrue(valueField.getType() instanceof ArrowType.Int); + } + + @Test + public void testSchemaConversion() { + RowType rowType = RowType.of( + new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType()}, + new String[] {"id", "name", "score"}); + + Schema schema = FlinkArrowUtils.toArrowSchema(rowType); + + assertEquals(3, schema.getFields().size()); + + Field idField = schema.getFields().get(0); + assertEquals("id", idField.getName()); + assertTrue(idField.getType() instanceof ArrowType.Int); + + Field nameField = schema.getFields().get(1); + assertEquals("name", nameField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType()); + + Field scoreField = schema.getFields().get(2); + assertEquals("score", scoreField.getName()); + assertTrue(scoreField.getType() instanceof ArrowType.FloatingPoint); + } + + @Test + public void testTimeTypeConversion() { + TimeType timeType = new TimeType(3); + ArrowType arrowType = FlinkArrowUtils.toArrowType(timeType); + assertTrue(arrowType instanceof ArrowType.Time); + ArrowType.Time timeArrowType = (ArrowType.Time) arrowType; + assertEquals(TimeUnit.MICROSECOND, timeArrowType.getUnit()); + assertEquals(64, timeArrowType.getBitWidth()); + } + + @Test + public void testLocalZonedTimestampTypeConversion() { + LocalZonedTimestampType lzType = new LocalZonedTimestampType(6); + ArrowType arrowType = FlinkArrowUtils.toArrowType(lzType); + assertTrue(arrowType instanceof ArrowType.Timestamp); + ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; + assertEquals(TimeUnit.MICROSECOND, tsType.getUnit()); + assertEquals("UTC", tsType.getTimezone()); + } + + @Test + public void testUnsupportedTypeThrowsException() { + // RawType is not supported + assertThrows( + UnsupportedOperationException.class, + () -> FlinkArrowUtils.toArrowType(new RawType<>(String.class, StringSerializer.INSTANCE))); + } +} From 7de5c88208bd0e16a65d9345d055e5dd9da0d9ca Mon Sep 17 00:00:00 2001 From: xTong Date: Mon, 26 Jan 2026 22:11:24 +0800 Subject: [PATCH 2/4] Update auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index 99b4b55e8..f463cffb6 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -52,7 +52,7 @@ import org.apache.flink.table.types.logical.VarCharType; /** - * Utility class for converting between Flink LogicalType and Arrow types. + * Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas. */ public class FlinkArrowUtils { From 666fc8578a20d4c5b9f464a5869b953711033ca9 Mon Sep 17 00:00:00 2001 From: x-tong Date: Tue, 27 Jan 2026 21:03:45 +0800 Subject: [PATCH 3/4] Fix code formatting --- .../java/org/apache/auron/flink/arrow/FlinkArrowUtils.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index f463cffb6..651f911b6 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.List; - import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.complex.MapVector; @@ -146,7 +145,8 @@ public static Field toArrowField(String name, LogicalType logicalType, boolean n FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null); List children = new ArrayList<>(); for (RowType.RowField field : rowType.getFields()) { - children.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + children.add(toArrowField( + field.getName(), field.getType(), field.getType().isNullable())); } return new Field(name, fieldType, children); } else if (logicalType instanceof MapType) { @@ -182,7 +182,8 @@ public static Field toArrowField(String name, LogicalType logicalType, boolean n public static Schema toArrowSchema(RowType rowType) { List fields = new ArrayList<>(); for (RowType.RowField field : rowType.getFields()) { - fields.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + fields.add(toArrowField( + field.getName(), field.getType(), field.getType().isNullable())); } return new Schema(fields); } From 9f6cfc6a97fb6c43b27aea2608c2fe273f3a3f79 Mon Sep 17 00:00:00 2001 From: x-tong Date: Wed, 28 Jan 2026 02:07:20 +0800 Subject: [PATCH 4/4] Address PR review comments for FlinkArrowUtils - Mark class as final to prevent subclassing - Add null check for logicalType parameter to throw IllegalArgumentException instead of NPE --- .../java/org/apache/auron/flink/arrow/FlinkArrowUtils.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index 651f911b6..b20e5e626 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -53,7 +53,7 @@ /** * Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas. */ -public class FlinkArrowUtils { +public final class FlinkArrowUtils { /** * Root allocator for Arrow memory management. @@ -82,6 +82,9 @@ public static BufferAllocator createChildAllocator(String name) { * @throws UnsupportedOperationException if the type is not supported */ public static ArrowType toArrowType(LogicalType logicalType) { + if (logicalType == null) { + throw new IllegalArgumentException("logicalType cannot be null"); + } if (logicalType instanceof NullType) { return ArrowType.Null.INSTANCE; } else if (logicalType instanceof BooleanType) {