diff --git a/.gitignore b/.gitignore
index 9bb8dcb54..9f5a6a269 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,4 +60,7 @@ common/src/main/resources/auron-build-info.properties
.flattened-pom.xml
-dependency-reduced-pom.xml
\ No newline at end of file
+dependency-reduced-pom.xml
+
+#lsp
+*.prefs
\ No newline at end of file
diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml b/auron-flink-extension/auron-flink-runtime/pom.xml
index 3b5dfea21..4998e04c1 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -38,6 +38,41 @@
proto
${project.version}
+
+
+
+ org.apache.arrow
+ arrow-c-data
+
+
+ org.apache.arrow
+ arrow-memory-unsafe
+
+
+ org.apache.arrow
+ arrow-vector
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+
+
+ org.apache.auron
+ auron-core
+ ${project.version}
+ test-jar
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.jupiter.version}
+ test
+
diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
new file mode 100644
index 000000000..651f911b6
--- /dev/null
+++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas.
+ */
+public class FlinkArrowUtils {
+
+ /**
+ * Root allocator for Arrow memory management.
+ */
+ public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(ROOT_ALLOCATOR::close));
+ }
+
+ /**
+ * Creates a child allocator from the root allocator.
+ *
+ * @param name Name for the child allocator
+ * @return A new child allocator
+ */
+ public static BufferAllocator createChildAllocator(String name) {
+ return ROOT_ALLOCATOR.newChildAllocator(name, 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * Converts a Flink LogicalType to Arrow ArrowType.
+ *
+ * @param logicalType The Flink logical type
+ * @return The corresponding Arrow type
+ * @throws UnsupportedOperationException if the type is not supported
+ */
+ public static ArrowType toArrowType(LogicalType logicalType) {
+ if (logicalType instanceof NullType) {
+ return ArrowType.Null.INSTANCE;
+ } else if (logicalType instanceof BooleanType) {
+ return ArrowType.Bool.INSTANCE;
+ } else if (logicalType instanceof TinyIntType) {
+ return new ArrowType.Int(8, true);
+ } else if (logicalType instanceof SmallIntType) {
+ return new ArrowType.Int(16, true);
+ } else if (logicalType instanceof IntType) {
+ return new ArrowType.Int(32, true);
+ } else if (logicalType instanceof BigIntType) {
+ return new ArrowType.Int(64, true);
+ } else if (logicalType instanceof FloatType) {
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ } else if (logicalType instanceof DoubleType) {
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ } else if (logicalType instanceof VarCharType || logicalType instanceof CharType) {
+ return ArrowType.Utf8.INSTANCE;
+ } else if (logicalType instanceof VarBinaryType || logicalType instanceof BinaryType) {
+ return ArrowType.Binary.INSTANCE;
+ } else if (logicalType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) logicalType;
+ // Note: Arrow Java only has DecimalVector (128-bit) and Decimal256Vector (256-bit).
+ // There's no Decimal64Vector, so we always use 128-bit to match the actual storage.
+ // Setting bitWidth=64 would cause FFI export issues since the actual data is 128-bit.
+ return new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128);
+ } else if (logicalType instanceof DateType) {
+ return new ArrowType.Date(DateUnit.DAY);
+ } else if (logicalType instanceof TimeType) {
+ // Flink TimeType stores time as milliseconds (int), convert to Arrow Time64 (microseconds)
+ return new ArrowType.Time(TimeUnit.MICROSECOND, 64);
+ } else if (logicalType instanceof TimestampType) {
+ return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+ } else if (logicalType instanceof LocalZonedTimestampType) {
+ // LocalZonedTimestampType is similar to TimestampType but with UTC timezone
+ return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
+ } else {
+ throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString());
+ }
+ }
+
+ /**
+ * Converts a Flink LogicalType to an Arrow Field.
+ *
+ * @param name The field name
+ * @param logicalType The Flink logical type
+ * @param nullable Whether the field is nullable
+ * @return The corresponding Arrow Field
+ */
+ public static Field toArrowField(String name, LogicalType logicalType, boolean nullable) {
+ if (logicalType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) logicalType;
+ LogicalType elementType = arrayType.getElementType();
+ FieldType fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null);
+ Field elementField = toArrowField("element", elementType, elementType.isNullable());
+ List children = new ArrayList<>();
+ children.add(elementField);
+ return new Field(name, fieldType, children);
+ } else if (logicalType instanceof RowType) {
+ RowType rowType = (RowType) logicalType;
+ FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null);
+ List children = new ArrayList<>();
+ for (RowType.RowField field : rowType.getFields()) {
+ children.add(toArrowField(
+ field.getName(), field.getType(), field.getType().isNullable()));
+ }
+ return new Field(name, fieldType, children);
+ } else if (logicalType instanceof MapType) {
+ MapType mapType = (MapType) logicalType;
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+
+ // Create entries field (struct)
+ FieldType entriesFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null);
+ List entriesChildren = new ArrayList<>();
+ entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType, false));
+ entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType, valueType.isNullable()));
+ Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, entriesFieldType, entriesChildren);
+
+ // Create map field
+ FieldType mapFieldType = new FieldType(nullable, new ArrowType.Map(false), null);
+ List mapChildren = new ArrayList<>();
+ mapChildren.add(entriesField);
+ return new Field(name, mapFieldType, mapChildren);
+ } else {
+ ArrowType arrowType = toArrowType(logicalType);
+ FieldType fieldType = new FieldType(nullable, arrowType, null);
+ return new Field(name, fieldType, new ArrayList<>());
+ }
+ }
+
+ /**
+ * Converts a Flink RowType to an Arrow Schema.
+ *
+ * @param rowType The Flink row type
+ * @return The corresponding Arrow Schema
+ */
+ public static Schema toArrowSchema(RowType rowType) {
+ List fields = new ArrayList<>();
+ for (RowType.RowField field : rowType.getFields()) {
+ fields.add(toArrowField(
+ field.getName(), field.getType(), field.getType().isNullable()));
+ }
+ return new Schema(fields);
+ }
+
+ private FlinkArrowUtils() {
+ // Utility class
+ }
+}
diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
new file mode 100644
index 000000000..17bce4dba
--- /dev/null
+++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for FlinkArrowUtils. */
+public class FlinkArrowUtilsTest {
+
+ @Test
+ public void testBasicTypeConversion() {
+ // Boolean
+ assertEquals(ArrowType.Bool.INSTANCE, FlinkArrowUtils.toArrowType(new BooleanType()));
+
+ // Integer types
+ assertEquals(new ArrowType.Int(8, true), FlinkArrowUtils.toArrowType(new TinyIntType()));
+ assertEquals(new ArrowType.Int(16, true), FlinkArrowUtils.toArrowType(new SmallIntType()));
+ assertEquals(new ArrowType.Int(32, true), FlinkArrowUtils.toArrowType(new IntType()));
+ assertEquals(new ArrowType.Int(64, true), FlinkArrowUtils.toArrowType(new BigIntType()));
+
+ // Floating point types
+ assertEquals(
+ new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE),
+ FlinkArrowUtils.toArrowType(new FloatType()));
+ assertEquals(
+ new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE),
+ FlinkArrowUtils.toArrowType(new DoubleType()));
+
+ // String and binary types
+ assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new VarCharType(100)));
+ assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new CharType(10)));
+ assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new VarBinaryType(100)));
+ assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new BinaryType(10)));
+
+ // Decimal type
+ DecimalType decimalType = new DecimalType(10, 2);
+ ArrowType arrowDecimal = FlinkArrowUtils.toArrowType(decimalType);
+ assertTrue(arrowDecimal instanceof ArrowType.Decimal);
+ assertEquals(10, ((ArrowType.Decimal) arrowDecimal).getPrecision());
+ assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale());
+
+ // Date and timestamp types
+ assertEquals(new ArrowType.Date(DateUnit.DAY), FlinkArrowUtils.toArrowType(new DateType()));
+ assertEquals(
+ new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), FlinkArrowUtils.toArrowType(new TimestampType(3)));
+ }
+
+ @Test
+ public void testArrayTypeConversion() {
+ ArrayType arrayType = new ArrayType(new IntType());
+ Field field = FlinkArrowUtils.toArrowField("test_array", arrayType, true);
+
+ assertEquals("test_array", field.getName());
+ assertTrue(field.isNullable());
+ assertTrue(field.getType() instanceof ArrowType.List);
+ assertEquals(1, field.getChildren().size());
+
+ Field elementField = field.getChildren().get(0);
+ assertEquals("element", elementField.getName());
+ assertTrue(elementField.getType() instanceof ArrowType.Int);
+ }
+
+ @Test
+ public void testRowTypeConversion() {
+ RowType rowType =
+ RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"});
+
+ Field field = FlinkArrowUtils.toArrowField("test_row", rowType, false);
+
+ assertEquals("test_row", field.getName());
+ assertFalse(field.isNullable());
+ assertTrue(field.getType() instanceof ArrowType.Struct);
+ assertEquals(2, field.getChildren().size());
+
+ Field idField = field.getChildren().get(0);
+ assertEquals("id", idField.getName());
+ assertTrue(idField.getType() instanceof ArrowType.Int);
+
+ Field nameField = field.getChildren().get(1);
+ assertEquals("name", nameField.getName());
+ assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType());
+ }
+
+ @Test
+ public void testMapTypeConversion() {
+ MapType mapType = new MapType(new VarCharType(100), new IntType());
+ Field field = FlinkArrowUtils.toArrowField("test_map", mapType, true);
+
+ assertEquals("test_map", field.getName());
+ assertTrue(field.isNullable());
+ assertTrue(field.getType() instanceof ArrowType.Map);
+ assertEquals(1, field.getChildren().size());
+
+ Field entriesField = field.getChildren().get(0);
+ assertEquals("entries", entriesField.getName());
+ assertTrue(entriesField.getType() instanceof ArrowType.Struct);
+ assertEquals(2, entriesField.getChildren().size());
+
+ Field keyField = entriesField.getChildren().get(0);
+ assertEquals("key", keyField.getName());
+ assertEquals(ArrowType.Utf8.INSTANCE, keyField.getType());
+
+ Field valueField = entriesField.getChildren().get(1);
+ assertEquals("value", valueField.getName());
+ assertTrue(valueField.getType() instanceof ArrowType.Int);
+ }
+
+ @Test
+ public void testSchemaConversion() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType()},
+ new String[] {"id", "name", "score"});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ assertEquals(3, schema.getFields().size());
+
+ Field idField = schema.getFields().get(0);
+ assertEquals("id", idField.getName());
+ assertTrue(idField.getType() instanceof ArrowType.Int);
+
+ Field nameField = schema.getFields().get(1);
+ assertEquals("name", nameField.getName());
+ assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType());
+
+ Field scoreField = schema.getFields().get(2);
+ assertEquals("score", scoreField.getName());
+ assertTrue(scoreField.getType() instanceof ArrowType.FloatingPoint);
+ }
+
+ @Test
+ public void testTimeTypeConversion() {
+ TimeType timeType = new TimeType(3);
+ ArrowType arrowType = FlinkArrowUtils.toArrowType(timeType);
+ assertTrue(arrowType instanceof ArrowType.Time);
+ ArrowType.Time timeArrowType = (ArrowType.Time) arrowType;
+ assertEquals(TimeUnit.MICROSECOND, timeArrowType.getUnit());
+ assertEquals(64, timeArrowType.getBitWidth());
+ }
+
+ @Test
+ public void testLocalZonedTimestampTypeConversion() {
+ LocalZonedTimestampType lzType = new LocalZonedTimestampType(6);
+ ArrowType arrowType = FlinkArrowUtils.toArrowType(lzType);
+ assertTrue(arrowType instanceof ArrowType.Timestamp);
+ ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType;
+ assertEquals(TimeUnit.MICROSECOND, tsType.getUnit());
+ assertEquals("UTC", tsType.getTimezone());
+ }
+
+ @Test
+ public void testUnsupportedTypeThrowsException() {
+ // RawType is not supported
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> FlinkArrowUtils.toArrowType(new RawType<>(String.class, StringSerializer.INSTANCE)));
+ }
+}