diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index 39cba2c84fb..5a2658e0653 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -72,6 +72,7 @@ import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA import org.jooq.exception.DataAccessException import software.amazon.awssdk.services.s3.model.UploadPartResponse import org.apache.commons.io.FilenameUtils +import org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling import java.sql.SQLException import scala.util.Try @@ -360,7 +361,9 @@ class DatasetResource { val repositoryName = dataset.getRepositoryName // Check if there are any changes in LakeFS before creating a new version - val diffs = LakeFSStorageClient.retrieveUncommittedObjects(repoName = repositoryName) + val diffs = withLakeFSErrorHandling { + LakeFSStorageClient.retrieveUncommittedObjects(repoName = repositoryName) + } if (diffs.isEmpty) { throw new WebApplicationException( @@ -384,11 +387,13 @@ class DatasetResource { } // Create a commit in LakeFS - val commit = LakeFSStorageClient.createCommit( - repoName = repositoryName, - branch = "main", - commitMessage = s"Created dataset version: $newVersionName" - ) + val commit = withLakeFSErrorHandling { + LakeFSStorageClient.createCommit( + repoName = repositoryName, + branch = "main", + commitMessage = s"Created dataset version: $newVersionName" + ) + } if (commit == null || commit.getId == null) { throw new WebApplicationException( @@ -412,7 +417,9 @@ class DatasetResource { .into(classOf[DatasetVersion]) // Retrieve committed file structure - val fileNodes = LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId) + val fileNodes = withLakeFSErrorHandling { + LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId) + } DashboardDatasetVersion( insertedVersion, @@ -973,7 +980,9 @@ class DatasetResource { // Retrieve staged (uncommitted) changes from LakeFS val dataset = getDatasetByID(ctx, did) - val lakefsDiffs = LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName) + val lakefsDiffs = withLakeFSErrorHandling { + LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName) + } // Convert LakeFS Diff objects to our custom Diff case class lakefsDiffs.map(d => @@ -1578,11 +1587,13 @@ class DatasetResource { ) } - val presign = LakeFSStorageClient.initiatePresignedMultipartUploads( - repositoryName, - filePath, - numPartsValue - ) + val presign = withLakeFSErrorHandling { + LakeFSStorageClient.initiatePresignedMultipartUploads( + repositoryName, + filePath, + numPartsValue + ) + } val uploadIdStr = presign.getUploadId val physicalAddr = presign.getPhysicalAddress @@ -1772,13 +1783,15 @@ class DatasetResource { ) .toList - val objectStats = LakeFSStorageClient.completePresignedMultipartUploads( - dataset.getRepositoryName, - filePath, - uploadId, - partsList, - physicalAddr - ) + val objectStats = withLakeFSErrorHandling { + LakeFSStorageClient.completePresignedMultipartUploads( + dataset.getRepositoryName, + filePath, + uploadId, + partsList, + physicalAddr + ) + } // FINAL SERVER-SIDE SIZE CHECK (do not rely on init) val actualSizeBytes = @@ -1884,12 +1897,14 @@ class DatasetResource { ) } - LakeFSStorageClient.abortPresignedMultipartUploads( - dataset.getRepositoryName, - filePath, - session.getUploadId, - physicalAddr - ) + withLakeFSErrorHandling { + LakeFSStorageClient.abortPresignedMultipartUploads( + dataset.getRepositoryName, + filePath, + session.getUploadId, + physicalAddr + ) + } // Delete session; parts removed via ON DELETE CASCADE ctx diff --git a/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala new file mode 100644 index 00000000000..c1997fb6477 --- /dev/null +++ b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import jakarta.ws.rs._ +import jakarta.ws.rs.core.{MediaType, Response} +import org.slf4j.LoggerFactory + +import scala.jdk.CollectionConverters._ + +object LakeFSExceptionHandler { + private val logger = LoggerFactory.getLogger(getClass) + + private val fallbackMessages = Map( + 400 -> "LakeFS rejected the request. Please verify the parameters (repository/branch/path) and try again.", + 401 -> "Authentication with LakeFS failed.", + 403 -> "Permission denied by LakeFS.", + 404 -> "LakeFS resource not found. The repository/branch/object may not exist.", + 409 -> "LakeFS reported a conflict. Another operation may be in progress.", + 420 -> "Too many requests to LakeFS." + ).withDefaultValue( + "LakeFS request failed due to an unexpected server error." + ) + + /** + * Wraps a LakeFS call with centralized error handling. + */ + def withLakeFSErrorHandling[T](call: => T): T = { + try { + call + } catch { + case e: io.lakefs.clients.sdk.ApiException => handleException(e) + } + } + + /** + * Converts LakeFS ApiException to appropriate HTTP exception + */ + private def handleException(e: io.lakefs.clients.sdk.ApiException): Nothing = { + val code = e.getCode + val rawBody = Option(e.getResponseBody).filter(_.nonEmpty) + val message = s"${fallbackMessages(code)}" + + logger.warn(s"LakeFS error $code, ${e.getMessage}, body: ${rawBody.getOrElse("N/A")}") + + def errorResponse(status: Int): Response = + Response + .status(status) + .entity(Map("message" -> message).asJava) + .`type`(MediaType.APPLICATION_JSON) + .build() + + throw (code match { + case 400 => new BadRequestException(errorResponse(400)) + case 401 => new NotAuthorizedException(errorResponse(401)) + case 403 => new ForbiddenException(errorResponse(403)) + case 404 => new NotFoundException(errorResponse(404)) + case c if c >= 400 && c < 500 => new WebApplicationException(errorResponse(c)) + case _ => new InternalServerErrorException(errorResponse(500)) + }) + } +} diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 2902626a701..0d37298e9e7 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -1847,4 +1847,56 @@ class DatasetResourceSpec response.getStatus shouldEqual 307 response.getHeaderString("Location") should not be null } + + "LakeFS error handling" should "return 500 when ETag is invalid, with the message included in the error response body" in { + val filePath = uniqueFilePath("error-body") + + initUpload(filePath, 2).getStatus shouldEqual 200 + uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 + uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200 + + val uploadId = fetchUploadIdOrFail(filePath) + getDSLContext + .update(DATASET_UPLOAD_SESSION_PART) + .set(DATASET_UPLOAD_SESSION_PART.ETAG, "BAD") + .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId)) + .execute() + + val ex = intercept[WebApplicationException] { + finishUpload(filePath) + } + + ex.getResponse.getStatus shouldEqual 500 + Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should include( + "LakeFS request failed due to an unexpected server error." + ) + + abortUpload(filePath) + } + + it should "return 400 when physicalAddress is invalid" in { + val filePath = uniqueFilePath("missing-physical-address") + + initUpload(filePath, 2).getStatus shouldEqual 200 + uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 + uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200 + + val uploadId = fetchUploadIdOrFail(filePath) + + getDSLContext + .update(DATASET_UPLOAD_SESSION) + .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, "BAD") + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId)) + .execute() + + val ex = intercept[WebApplicationException] { finishUpload(filePath) } + ex.getResponse.getStatus shouldEqual 400 + Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should include( + "LakeFS rejected the request" + ) + + intercept[WebApplicationException] { + abortUpload(filePath) + }.getResponse.getStatus shouldEqual 400 + } }