Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA
import org.jooq.exception.DataAccessException
import software.amazon.awssdk.services.s3.model.UploadPartResponse
import org.apache.commons.io.FilenameUtils
import org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling

import java.sql.SQLException
import scala.util.Try
Expand Down Expand Up @@ -360,7 +361,9 @@ class DatasetResource {
val repositoryName = dataset.getRepositoryName

// Check if there are any changes in LakeFS before creating a new version
val diffs = LakeFSStorageClient.retrieveUncommittedObjects(repoName = repositoryName)
val diffs = withLakeFSErrorHandling {
LakeFSStorageClient.retrieveUncommittedObjects(repoName = repositoryName)
}

if (diffs.isEmpty) {
throw new WebApplicationException(
Expand All @@ -384,11 +387,13 @@ class DatasetResource {
}

// Create a commit in LakeFS
val commit = LakeFSStorageClient.createCommit(
repoName = repositoryName,
branch = "main",
commitMessage = s"Created dataset version: $newVersionName"
)
val commit = withLakeFSErrorHandling {
LakeFSStorageClient.createCommit(
repoName = repositoryName,
branch = "main",
commitMessage = s"Created dataset version: $newVersionName"
)
}
Comment on lines +390 to +396
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuang7 There is another approach with scala implicit, it will look like the next code snippet, in your preference, is this cleaner?

val commit =
  LakeFSStorageClient.safeCall(_.createCommit(
    repoName = repositoryName,
    branch = "main",
    commitMessage = s"Created dataset version: $newVersionName"
  ))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I currently centralized the LakeFS exception→HTTP mapping in the file-service (LakeFSExceptionHandler). The safeCall approach also looks clean and would centralize handling at the storage layer near LakeFSStorageClient. Would you recommend switching to LakeFSStorageClient.safeCall(...) so all callers share the same handler?

Copy link
Contributor

@carloea2 carloea2 Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep this approach for now.

It would be good to open a discussion, and would be good if @Ma77Ball can give his input since he is working in logging. Here is the things to discuss later:

I think this PR is really about how we want error handling to work across the services calls (LakeFS now, others later): do we force callers to deal with failures, or do we centralize + make it easy/consistent.

Option 1: change defs and force handling (typed)

Return Either[StorageError, A] (or F[Either[...]]). Caller can’t ignore it.

storage.commit(repo, branch, msg): Either[StorageError, CommitId]
storage.uploadPart(key, part, bytes): Either[StorageError, Unit]

// usage
storage.commit(repo, branch, msg).map(Ok(_)).leftMap(mapToHttp)

Pros: type-safe + enforced. Cons: signature churn + lots of callsite updates + need Error mapping.

Option 2: implicit + Dynamic .safe wrapper (ergonomic, but risky)

This option adds automatic logs
Nice callsite but reflection + weak typing (Any) + possible runtime method lookup issues. Notice this method does not require any changes to the def like commit, or uploadPart but it is optional to the caller to use .safe and

Changing the return type (2.1)

lakefs.safe.commit(repo, branch, msg)          // Either[Throwable, Any]
datasetResource.safe.uploadPart(key, part, bs) // Either[Throwable, Any]

Keeping the return type and throwing (2.2)

lakefs.safe.commit(repo, branch, msg)
datasetResource.safe.uploadPart(key, part, bs)


if (commit == null || commit.getId == null) {
throw new WebApplicationException(
Expand All @@ -412,7 +417,9 @@ class DatasetResource {
.into(classOf[DatasetVersion])

// Retrieve committed file structure
val fileNodes = LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId)
val fileNodes = withLakeFSErrorHandling {
LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId)
}

DashboardDatasetVersion(
insertedVersion,
Expand Down Expand Up @@ -973,7 +980,9 @@ class DatasetResource {

// Retrieve staged (uncommitted) changes from LakeFS
val dataset = getDatasetByID(ctx, did)
val lakefsDiffs = LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName)
val lakefsDiffs = withLakeFSErrorHandling {
LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName)
}

// Convert LakeFS Diff objects to our custom Diff case class
lakefsDiffs.map(d =>
Expand Down Expand Up @@ -1578,11 +1587,13 @@ class DatasetResource {
)
}

val presign = LakeFSStorageClient.initiatePresignedMultipartUploads(
repositoryName,
filePath,
numPartsValue
)
val presign = withLakeFSErrorHandling {
LakeFSStorageClient.initiatePresignedMultipartUploads(
repositoryName,
filePath,
numPartsValue
)
}

val uploadIdStr = presign.getUploadId
val physicalAddr = presign.getPhysicalAddress
Expand Down Expand Up @@ -1772,13 +1783,15 @@ class DatasetResource {
)
.toList

val objectStats = LakeFSStorageClient.completePresignedMultipartUploads(
dataset.getRepositoryName,
filePath,
uploadId,
partsList,
physicalAddr
)
val objectStats = withLakeFSErrorHandling {
LakeFSStorageClient.completePresignedMultipartUploads(
dataset.getRepositoryName,
filePath,
uploadId,
partsList,
physicalAddr
)
}

// FINAL SERVER-SIDE SIZE CHECK (do not rely on init)
val actualSizeBytes =
Expand Down Expand Up @@ -1884,12 +1897,14 @@ class DatasetResource {
)
}

LakeFSStorageClient.abortPresignedMultipartUploads(
dataset.getRepositoryName,
filePath,
session.getUploadId,
physicalAddr
)
withLakeFSErrorHandling {
LakeFSStorageClient.abortPresignedMultipartUploads(
dataset.getRepositoryName,
filePath,
session.getUploadId,
physicalAddr
)
}

// Delete session; parts removed via ON DELETE CASCADE
ctx
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.service.util

import jakarta.ws.rs._
import jakarta.ws.rs.core.{MediaType, Response}
import org.slf4j.LoggerFactory

import scala.jdk.CollectionConverters._

object LakeFSExceptionHandler {
private val logger = LoggerFactory.getLogger(getClass)

private val fallbackMessages = Map(
400 -> "LakeFS rejected the request. Please verify the parameters (repository/branch/path) and try again.",
401 -> "Authentication with LakeFS failed.",
403 -> "Permission denied by LakeFS.",
404 -> "LakeFS resource not found. The repository/branch/object may not exist.",
409 -> "LakeFS reported a conflict. Another operation may be in progress.",
420 -> "Too many requests to LakeFS."
).withDefaultValue(
"LakeFS request failed due to an unexpected server error."
)

/**
* Wraps a LakeFS call with centralized error handling.
*/
def withLakeFSErrorHandling[T](call: => T): T = {
try {
call
} catch {
case e: io.lakefs.clients.sdk.ApiException => handleException(e)
}
}

/**
* Converts LakeFS ApiException to appropriate HTTP exception
*/
private def handleException(e: io.lakefs.clients.sdk.ApiException): Nothing = {
val code = e.getCode
val rawBody = Option(e.getResponseBody).filter(_.nonEmpty)
val message = s"${fallbackMessages(code)}"

logger.warn(s"LakeFS error $code, ${e.getMessage}, body: ${rawBody.getOrElse("N/A")}")

def errorResponse(status: Int): Response =
Response
.status(status)
.entity(Map("message" -> message).asJava)
.`type`(MediaType.APPLICATION_JSON)
.build()

throw (code match {
case 400 => new BadRequestException(errorResponse(400))
case 401 => new NotAuthorizedException(errorResponse(401))
case 403 => new ForbiddenException(errorResponse(403))
case 404 => new NotFoundException(errorResponse(404))
case c if c >= 400 && c < 500 => new WebApplicationException(errorResponse(c))
case _ => new InternalServerErrorException(errorResponse(500))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1847,4 +1847,56 @@ class DatasetResourceSpec
response.getStatus shouldEqual 307
response.getHeaderString("Location") should not be null
}

"LakeFS error handling" should "return 500 when ETag is invalid, with the message included in the error response body" in {
val filePath = uniqueFilePath("error-body")

initUpload(filePath, 2).getStatus shouldEqual 200
uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200

val uploadId = fetchUploadIdOrFail(filePath)
getDSLContext
.update(DATASET_UPLOAD_SESSION_PART)
.set(DATASET_UPLOAD_SESSION_PART.ETAG, "BAD")
.where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
.execute()

val ex = intercept[WebApplicationException] {
finishUpload(filePath)
}

ex.getResponse.getStatus shouldEqual 500
Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should include(
"LakeFS request failed due to an unexpected server error."
)

abortUpload(filePath)
}

it should "return 400 when physicalAddress is invalid" in {
val filePath = uniqueFilePath("missing-physical-address")

initUpload(filePath, 2).getStatus shouldEqual 200
uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200

val uploadId = fetchUploadIdOrFail(filePath)

getDSLContext
.update(DATASET_UPLOAD_SESSION)
.set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, "BAD")
.where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId))
.execute()

val ex = intercept[WebApplicationException] { finishUpload(filePath) }
ex.getResponse.getStatus shouldEqual 400
Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should include(
"LakeFS rejected the request"
)

intercept[WebApplicationException] {
abortUpload(filePath)
}.getResponse.getStatus shouldEqual 400
}
}