From 4eab50b0e23d5d2248ad41279720d2fc64b6406e Mon Sep 17 00:00:00 2001 From: Xuan Gu <162244362+xuang7@users.noreply.github.com> Date: Fri, 23 Jan 2026 21:22:15 -0800 Subject: [PATCH 1/2] update. --- .../service/resource/DatasetResource.scala | 108 +++++++++++++----- .../resource/DatasetResourceSpec.scala | 24 ++++ 2 files changed, 106 insertions(+), 26 deletions(-) diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index 39cba2c84f..e365931c3e 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -19,6 +19,7 @@ package org.apache.texera.service.resource +import com.fasterxml.jackson.databind.ObjectMapper import io.dropwizard.auth.Auth import jakarta.annotation.security.RolesAllowed import jakarta.ws.rs._ @@ -175,6 +176,47 @@ object DatasetResource { normalized } + /** + * Converts LakeFS ApiException to appropriate HTTP exception + */ + private def handleLakeFSException(e: io.lakefs.clients.sdk.ApiException): Nothing = { + val rawBody = Option(e.getResponseBody).filter(_.nonEmpty).getOrElse(e.getMessage) + + val message = + Try(new ObjectMapper().readTree(rawBody).get("message").asText()).getOrElse(rawBody) + + def errorResponse(status: Int): Response = + Response + .status(status) + .entity(Map("message" -> message).asJava) + .`type`(MediaType.APPLICATION_JSON) + .build() + + throw (e.getCode match { + case 400 => new BadRequestException(errorResponse(400)) + case 401 => new NotAuthorizedException(errorResponse(401)) + case 403 => new ForbiddenException(errorResponse(403)) + case 404 => new NotFoundException(errorResponse(404)) + case 409 => new WebApplicationException(errorResponse(409)) + case 410 => new WebApplicationException(errorResponse(410)) + case 412 => new WebApplicationException(errorResponse(412)) + case 416 => new WebApplicationException(errorResponse(416)) + case 420 => new WebApplicationException(errorResponse(420)) + case _ => new InternalServerErrorException(errorResponse(500)) + }) + } + + /** + * Wraps a LakeFS call with centralized error handling. + */ + private def withLakeFSErrorHandling[T](lakeFsCall: => T): T = { + try { + lakeFsCall + } catch { + case e: io.lakefs.clients.sdk.ApiException => handleLakeFSException(e) + } + } + case class DashboardDataset( dataset: Dataset, ownerEmail: String, @@ -360,7 +402,9 @@ class DatasetResource { val repositoryName = dataset.getRepositoryName // Check if there are any changes in LakeFS before creating a new version - val diffs = LakeFSStorageClient.retrieveUncommittedObjects(repoName = repositoryName) + val diffs = withLakeFSErrorHandling { + LakeFSStorageClient.retrieveUncommittedObjects(repoName = repositoryName) + } if (diffs.isEmpty) { throw new WebApplicationException( @@ -384,11 +428,13 @@ class DatasetResource { } // Create a commit in LakeFS - val commit = LakeFSStorageClient.createCommit( - repoName = repositoryName, - branch = "main", - commitMessage = s"Created dataset version: $newVersionName" - ) + val commit = withLakeFSErrorHandling { + LakeFSStorageClient.createCommit( + repoName = repositoryName, + branch = "main", + commitMessage = s"Created dataset version: $newVersionName" + ) + } if (commit == null || commit.getId == null) { throw new WebApplicationException( @@ -412,7 +458,9 @@ class DatasetResource { .into(classOf[DatasetVersion]) // Retrieve committed file structure - val fileNodes = LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId) + val fileNodes = withLakeFSErrorHandling { + LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId) + } DashboardDatasetVersion( insertedVersion, @@ -973,7 +1021,9 @@ class DatasetResource { // Retrieve staged (uncommitted) changes from LakeFS val dataset = getDatasetByID(ctx, did) - val lakefsDiffs = LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName) + val lakefsDiffs = withLakeFSErrorHandling { + LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName) + } // Convert LakeFS Diff objects to our custom Diff case class lakefsDiffs.map(d => @@ -1578,11 +1628,13 @@ class DatasetResource { ) } - val presign = LakeFSStorageClient.initiatePresignedMultipartUploads( - repositoryName, - filePath, - numPartsValue - ) + val presign = withLakeFSErrorHandling { + LakeFSStorageClient.initiatePresignedMultipartUploads( + repositoryName, + filePath, + numPartsValue + ) + } val uploadIdStr = presign.getUploadId val physicalAddr = presign.getPhysicalAddress @@ -1772,13 +1824,15 @@ class DatasetResource { ) .toList - val objectStats = LakeFSStorageClient.completePresignedMultipartUploads( - dataset.getRepositoryName, - filePath, - uploadId, - partsList, - physicalAddr - ) + val objectStats = withLakeFSErrorHandling { + LakeFSStorageClient.completePresignedMultipartUploads( + dataset.getRepositoryName, + filePath, + uploadId, + partsList, + physicalAddr + ) + } // FINAL SERVER-SIDE SIZE CHECK (do not rely on init) val actualSizeBytes = @@ -1884,12 +1938,14 @@ class DatasetResource { ) } - LakeFSStorageClient.abortPresignedMultipartUploads( - dataset.getRepositoryName, - filePath, - session.getUploadId, - physicalAddr - ) + withLakeFSErrorHandling { + LakeFSStorageClient.abortPresignedMultipartUploads( + dataset.getRepositoryName, + filePath, + session.getUploadId, + physicalAddr + ) + } // Delete session; parts removed via ON DELETE CASCADE ctx diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 2902626a70..035c4f5da6 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -1847,4 +1847,28 @@ class DatasetResourceSpec response.getStatus shouldEqual 307 response.getHeaderString("Location") should not be null } + + "LakeFS error handling" should "return 500 when ETag is invalid, with the message included in the error response body" in { + val filePath = uniqueFilePath("error-body") + + initUpload(filePath, 2).getStatus shouldEqual 200 + uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 + uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200 + + val uploadId = fetchUploadIdOrFail(filePath) + getDSLContext + .update(DATASET_UPLOAD_SESSION_PART) + .set(DATASET_UPLOAD_SESSION_PART.ETAG, "BAD") + .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId)) + .execute() + + val ex = intercept[WebApplicationException] { + finishUpload(filePath) + } + + ex.getResponse.getStatus shouldEqual 500 + ex.getResponse.getEntity should not be null + + abortUpload(filePath) + } } From 8d3e219458570c289d39854250b77c888b99e0b0 Mon Sep 17 00:00:00 2001 From: Xuan Gu <162244362+xuang7@users.noreply.github.com> Date: Sun, 25 Jan 2026 17:26:01 -0800 Subject: [PATCH 2/2] update. --- .../service/resource/DatasetResource.scala | 43 +--------- .../service/util/LakeFSExceptionHandler.scala | 79 +++++++++++++++++++ .../resource/DatasetResourceSpec.scala | 30 ++++++- 3 files changed, 109 insertions(+), 43 deletions(-) create mode 100644 file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index e365931c3e..5a2658e065 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -19,7 +19,6 @@ package org.apache.texera.service.resource -import com.fasterxml.jackson.databind.ObjectMapper import io.dropwizard.auth.Auth import jakarta.annotation.security.RolesAllowed import jakarta.ws.rs._ @@ -73,6 +72,7 @@ import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA import org.jooq.exception.DataAccessException import software.amazon.awssdk.services.s3.model.UploadPartResponse import org.apache.commons.io.FilenameUtils +import org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling import java.sql.SQLException import scala.util.Try @@ -176,47 +176,6 @@ object DatasetResource { normalized } - /** - * Converts LakeFS ApiException to appropriate HTTP exception - */ - private def handleLakeFSException(e: io.lakefs.clients.sdk.ApiException): Nothing = { - val rawBody = Option(e.getResponseBody).filter(_.nonEmpty).getOrElse(e.getMessage) - - val message = - Try(new ObjectMapper().readTree(rawBody).get("message").asText()).getOrElse(rawBody) - - def errorResponse(status: Int): Response = - Response - .status(status) - .entity(Map("message" -> message).asJava) - .`type`(MediaType.APPLICATION_JSON) - .build() - - throw (e.getCode match { - case 400 => new BadRequestException(errorResponse(400)) - case 401 => new NotAuthorizedException(errorResponse(401)) - case 403 => new ForbiddenException(errorResponse(403)) - case 404 => new NotFoundException(errorResponse(404)) - case 409 => new WebApplicationException(errorResponse(409)) - case 410 => new WebApplicationException(errorResponse(410)) - case 412 => new WebApplicationException(errorResponse(412)) - case 416 => new WebApplicationException(errorResponse(416)) - case 420 => new WebApplicationException(errorResponse(420)) - case _ => new InternalServerErrorException(errorResponse(500)) - }) - } - - /** - * Wraps a LakeFS call with centralized error handling. - */ - private def withLakeFSErrorHandling[T](lakeFsCall: => T): T = { - try { - lakeFsCall - } catch { - case e: io.lakefs.clients.sdk.ApiException => handleLakeFSException(e) - } - } - case class DashboardDataset( dataset: Dataset, ownerEmail: String, diff --git a/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala new file mode 100644 index 0000000000..c1997fb647 --- /dev/null +++ b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import jakarta.ws.rs._ +import jakarta.ws.rs.core.{MediaType, Response} +import org.slf4j.LoggerFactory + +import scala.jdk.CollectionConverters._ + +object LakeFSExceptionHandler { + private val logger = LoggerFactory.getLogger(getClass) + + private val fallbackMessages = Map( + 400 -> "LakeFS rejected the request. Please verify the parameters (repository/branch/path) and try again.", + 401 -> "Authentication with LakeFS failed.", + 403 -> "Permission denied by LakeFS.", + 404 -> "LakeFS resource not found. The repository/branch/object may not exist.", + 409 -> "LakeFS reported a conflict. Another operation may be in progress.", + 420 -> "Too many requests to LakeFS." + ).withDefaultValue( + "LakeFS request failed due to an unexpected server error." + ) + + /** + * Wraps a LakeFS call with centralized error handling. + */ + def withLakeFSErrorHandling[T](call: => T): T = { + try { + call + } catch { + case e: io.lakefs.clients.sdk.ApiException => handleException(e) + } + } + + /** + * Converts LakeFS ApiException to appropriate HTTP exception + */ + private def handleException(e: io.lakefs.clients.sdk.ApiException): Nothing = { + val code = e.getCode + val rawBody = Option(e.getResponseBody).filter(_.nonEmpty) + val message = s"${fallbackMessages(code)}" + + logger.warn(s"LakeFS error $code, ${e.getMessage}, body: ${rawBody.getOrElse("N/A")}") + + def errorResponse(status: Int): Response = + Response + .status(status) + .entity(Map("message" -> message).asJava) + .`type`(MediaType.APPLICATION_JSON) + .build() + + throw (code match { + case 400 => new BadRequestException(errorResponse(400)) + case 401 => new NotAuthorizedException(errorResponse(401)) + case 403 => new ForbiddenException(errorResponse(403)) + case 404 => new NotFoundException(errorResponse(404)) + case c if c >= 400 && c < 500 => new WebApplicationException(errorResponse(c)) + case _ => new InternalServerErrorException(errorResponse(500)) + }) + } +} diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 035c4f5da6..0d37298e9e 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -1867,8 +1867,36 @@ class DatasetResourceSpec } ex.getResponse.getStatus shouldEqual 500 - ex.getResponse.getEntity should not be null + Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should include( + "LakeFS request failed due to an unexpected server error." + ) abortUpload(filePath) } + + it should "return 400 when physicalAddress is invalid" in { + val filePath = uniqueFilePath("missing-physical-address") + + initUpload(filePath, 2).getStatus shouldEqual 200 + uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 + uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200 + + val uploadId = fetchUploadIdOrFail(filePath) + + getDSLContext + .update(DATASET_UPLOAD_SESSION) + .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, "BAD") + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId)) + .execute() + + val ex = intercept[WebApplicationException] { finishUpload(filePath) } + ex.getResponse.getStatus shouldEqual 400 + Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should include( + "LakeFS rejected the request" + ) + + intercept[WebApplicationException] { + abortUpload(filePath) + }.getResponse.getStatus shouldEqual 400 + } }