diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java index 690693997..f4b619196 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java @@ -49,7 +49,7 @@ private TSOProto.Transaction.Builder getBuilder(HBaseTransaction transaction) { } @Override - public Result get(TTable ttable, Get get, HBaseTransaction transaction) throws IOException { + public Result get(Get get, HBaseTransaction transaction) throws IOException { get.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray()); get.setAttribute(CellUtils.CLIENT_GET_ATTRIBUTE, Bytes.toBytes(true)); @@ -57,7 +57,7 @@ public Result get(TTable ttable, Get get, HBaseTransaction transaction) throws I } @Override - public ResultScanner getScanner(TTable ttable, Scan scan, HBaseTransaction transaction) throws IOException { + public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException { scan.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray()); return table.getScanner(scan); @@ -65,7 +65,7 @@ public ResultScanner getScanner(TTable ttable, Scan scan, HBaseTransaction trans @Override public List filterCellsForSnapshot(List rawCells, HBaseTransaction transaction, - int versionsToRequest, Map> familyDeletionCache) throws IOException { + int versionsToRequest, Map familyDeletionCache, Map attributeMap) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java index 06e5c8903..4b3560f7a 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java @@ -51,6 +51,21 @@ public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commit this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency")); } + private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture updateSCFuture) { + Put put = new Put(cell.getRow()); + put.add(cell.getFamily(), + CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length), + cell.getTimestamp(), + Bytes.toBytes(tx.getCommitTimestamp())); + try { + cell.getTable().put(put); + } catch (IOException e) { + LOG.warn("{}: Error inserting shadow cell {}", tx, cell, e); + updateSCFuture.setException( + new TransactionManagerException(tx + ": Error inserting shadow cell " + cell, e)); + } + } + @Override public ListenableFuture updateShadowCells(AbstractTransaction transaction) { @@ -63,18 +78,11 @@ public ListenableFuture updateShadowCells(AbstractTransaction { private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class); - public HBaseTransaction(long transactionId, long epoch, Set writeSet, AbstractTransactionManager tm) { - super(transactionId, epoch, writeSet, tm); + public HBaseTransaction(long transactionId, long epoch, Set writeSet, Set conflictFreeWriteSet, AbstractTransactionManager tm) { + super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm); } - public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch, Set writeSet, AbstractTransactionManager tm) { - super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, tm); + public HBaseTransaction(long transactionId, long epoch, Set writeSet, Set conflictFreeWriteSet, AbstractTransactionManager tm, long readTimestamp, long writeTimestamp) { + super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, readTimestamp, writeTimestamp); } + public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch, Set writeSet, Set conflictFreeWriteSet, AbstractTransactionManager tm) { + super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm); + } + + private void deleteCell(HBaseCellId cell) { + Delete delete = new Delete(cell.getRow()); + delete.deleteColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp()); + try { + cell.getTable().delete(delete); + } catch (IOException e) { + LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e); + } + } @Override public void cleanup() { - Set writeSet = getWriteSet(); - for (final HBaseCellId cell : writeSet) { - Delete delete = new Delete(cell.getRow()); - delete.deleteColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp()); - try { - cell.getTable().delete(delete); - } catch (IOException e) { - LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e); - } + for (final HBaseCellId cell : getWriteSet()) { + deleteCell(cell); + } + + for (final HBaseCellId cell : getConflictFreeWriteSet()) { + deleteCell(cell); } try { flushTables(); @@ -80,6 +90,10 @@ private Set getWrittenTables() { for (HBaseCellId cell : writeSet) { tables.add(cell.getTable()); } + writeSet = (HashSet) getConflictFreeWriteSet(); + for (HBaseCellId cell : writeSet) { + tables.add(cell.getTable()); + } return tables; } diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java index 71d1f98cb..e20f8739e 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java @@ -17,15 +17,12 @@ */ package org.apache.omid.transaction; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.omid.committable.CommitTable; -import org.apache.omid.committable.CommitTable.CommitTimestamp; import org.apache.omid.committable.hbase.HBaseCommitTable; import org.apache.omid.committable.hbase.HBaseCommitTableConfig; import org.apache.omid.tools.hbase.HBaseLogin; @@ -53,7 +50,7 @@ private static class HBaseTransactionFactory implements TransactionFactory(), tm); + return new HBaseTransaction(transactionId, epoch, new HashSet(), new HashSet(), tm); } @@ -74,8 +71,7 @@ public static TransactionManager newInstance(HBaseOmidClientConfiguration config return builder(configuration).build(); } - @VisibleForTesting - static class Builder { + public static class Builder { // Required parameters private final HBaseOmidClientConfiguration hbaseOmidClientConf; @@ -89,12 +85,12 @@ private Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { this.hbaseOmidClientConf = hbaseOmidClientConf; } - Builder tsoClient(TSOClient tsoClient) { + public Builder tsoClient(TSOClient tsoClient) { this.tsoClient = Optional.of(tsoClient); return this; } - Builder commitTableClient(CommitTable.Client client) { + public Builder commitTableClient(CommitTable.Client client) { this.commitTableClient = Optional.of(client); return this; } @@ -104,7 +100,7 @@ Builder postCommitter(PostCommitActions postCommitter) { return this; } - HBaseTransactionManager build() throws IOException, InterruptedException { + public HBaseTransactionManager build() throws IOException, InterruptedException { CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get(); PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get(); @@ -152,8 +148,7 @@ private Optional buildPostCommitter(CommitTable.Client commit } - @VisibleForTesting - static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { + public static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { return new Builder(hbaseOmidClientConf); } @@ -229,6 +224,10 @@ static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction readCommitTimestampFromShadowCell(long startTimestamp) thr Get get = new Get(hBaseCellId.getRow()); byte[] family = hBaseCellId.getFamily(); - byte[] shadowCellQualifier = CellUtils.addShadowCellSuffix(hBaseCellId.getQualifier()); + byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(hBaseCellId.getQualifier()); get.addColumn(family, shadowCellQualifier); get.setMaxVersions(1); get.setTimeStamp(startTimestamp); diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java index 1994cad4f..84c8d2ce7 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java @@ -17,13 +17,17 @@ */ package org.apache.omid.transaction; -import java.io.IOException; -import java.util.List; - import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.List; + + // This class wraps the HTableInterface object when doing client side filtering. public class HTableAccessWrapper implements TableAccessWrapper { @@ -51,4 +55,9 @@ public void put(Put put) throws IOException { writeTable.put(put); } + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return readTable.getScanner(scan); + } + } diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java index 112544fd0..4d2b8da69 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java @@ -32,12 +32,12 @@ public interface SnapshotFilter { - public Result get(TTable ttable, Get get, HBaseTransaction transaction) throws IOException; + public Result get(Get get, HBaseTransaction transaction) throws IOException; - public ResultScanner getScanner(TTable ttable, Scan scan, HBaseTransaction transaction) throws IOException; + public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException; public List filterCellsForSnapshot(List rawCells, HBaseTransaction transaction, - int versionsToRequest, Map> familyDeletionCache) throws IOException; + int versionsToRequest, Map familyDeletionCache, Map attributeMap) throws IOException; public boolean isCommitted(HBaseCellId hBaseCellId, long epoch) throws TransactionException; diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java index a656aeccc..65bbcc5da 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java @@ -23,21 +23,20 @@ import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL; import java.io.IOException; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -45,10 +44,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.omid.committable.CommitTable; import org.apache.omid.committable.CommitTable.CommitTimestamp; -import org.apache.omid.proto.TSOProto; import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel; import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl; -import org.apache.omid.transaction.TTable.TransactionalClientScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +63,16 @@ public class SnapshotFilterImpl implements SnapshotFilter { private TableAccessWrapper tableAccessWrapper; + public void closeCommitTableClient() throws IOException { + commitTableClient.close(); + } + private CommitTable.Client commitTableClient; - + + public TableAccessWrapper getTableAccessWrapper() { + return tableAccessWrapper; + } + public SnapshotFilterImpl(TableAccessWrapper tableAccessWrapper, CommitTable.Client commitTableClient) throws IOException { this.tableAccessWrapper = tableAccessWrapper; this.commitTableClient = commitTableClient; @@ -85,6 +90,14 @@ void setTableAccessWrapper(TableAccessWrapper tableAccessWrapper) { this.tableAccessWrapper = tableAccessWrapper; } + void setCommitTableClient(CommitTable.Client commitTableClient) { + this.commitTableClient = commitTableClient; + } + + private String getRowFamilyString(Cell cell) { + return Bytes.toString((CellUtil.cloneRow(cell))) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)); + } + /** * Check whether a cell was deleted using family deletion marker * @@ -94,19 +107,11 @@ void setTableAccessWrapper(TableAccessWrapper tableAccessWrapper) { * @param commitCache Holds shadow cells information * @return Whether the cell was deleted */ - private boolean checkFamilyDeletionCache(Cell cell, HBaseTransaction transaction, Map> familyDeletionCache, Map commitCache) throws IOException { - List familyDeletionCells = familyDeletionCache.get(Bytes.toString((cell.getRow()))); - if (familyDeletionCells != null) { - for(Cell familyDeletionCell : familyDeletionCells) { - String family = Bytes.toString(cell.getFamily()); - String familyDeletion = Bytes.toString(familyDeletionCell.getFamily()); - if (family.equals(familyDeletion)) { - Optional familyDeletionCommitTimestamp = getCommitTimestamp(familyDeletionCell, transaction, commitCache); - if (familyDeletionCommitTimestamp.isPresent() && familyDeletionCommitTimestamp.get() >= cell.getTimestamp()) { - return true; - } - } - } + private boolean checkFamilyDeletionCache(Cell cell, HBaseTransaction transaction, Map familyDeletionCache, Map commitCache) throws IOException { + String key = getRowFamilyString(cell); + Long familyDeletionCommitTimestamp = familyDeletionCache.get(key); + if (familyDeletionCommitTimestamp != null && familyDeletionCommitTimestamp >= cell.getTimestamp()) { + return true; } return false; } @@ -114,7 +119,7 @@ private boolean checkFamilyDeletionCache(Cell cell, HBaseTransaction transaction private void healShadowCell(Cell cell, long commitTimestamp) { Put put = new Put(CellUtil.cloneRow(cell)); byte[] family = CellUtil.cloneFamily(cell); - byte[] shadowCellQualifier = CellUtils.addShadowCellSuffix(cell.getQualifierArray(), + byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); put.add(family, shadowCellQualifier, cell.getTimestamp(), Bytes.toBytes(commitTimestamp)); @@ -219,7 +224,7 @@ public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long e } - private Optional tryToLocateCellCommitTimestamp(long epoch, + public Optional tryToLocateCellCommitTimestamp(long epoch, Cell cell, Map commitCache) throws IOException { @@ -294,27 +299,57 @@ private Map buildCommitCache(List rawCells) { return commitCache; } - private void buildFamilyDeletionCache(List rawCells, Map> familyDeletionCache) { + private void buildFamilyDeletionCache(HBaseTransaction transaction, List rawCells, Map familyDeletionCache, Map commitCache, Map attributeMap) throws IOException { for (Cell cell : rawCells) { - if (CellUtil.matchingQualifier(cell, CellUtils.FAMILY_DELETE_QUALIFIER) && - CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY)) { - - String row = Bytes.toString(cell.getRow()); - List cells = familyDeletionCache.get(row); - if (cells == null) { - cells = new ArrayList<>(); - familyDeletionCache.put(row, cells); + if (CellUtils.isFamilyDeleteCell(cell)) { + String key = getRowFamilyString(cell); + + if (familyDeletionCache.containsKey(key)) + return; + + Optional commitTimeStamp = getTSIfInTransaction(cell, transaction); + + if (!commitTimeStamp.isPresent()) { + commitTimeStamp = getTSIfInSnapshot(cell, transaction, commitCache); } - cells.add(cell); + if (commitTimeStamp.isPresent()) { + familyDeletionCache.put(key, commitTimeStamp.get()); + } else { + Cell lastCell = cell; + Map cmtCache; + boolean foundCommittedFamilyDeletion = false; + while (!foundCommittedFamilyDeletion) { + + Get g = createPendingGet(lastCell, 3); + + Result result = tableAccessWrapper.get(g); + List resultCells = result.listCells(); + if (resultCells == null) { + break; + } + + cmtCache = buildCommitCache(resultCells); + for (Cell c : resultCells) { + if (CellUtils.isFamilyDeleteCell(c)) { + commitTimeStamp = getTSIfInSnapshot(c, transaction, cmtCache); + if (commitTimeStamp.isPresent()) { + familyDeletionCache.put(key, commitTimeStamp.get()); + foundCommittedFamilyDeletion = true; + break; + } + lastCell = c; + } + } + } + } } } - } - private boolean isCellInTransaction(Cell kv, HBaseTransaction transaction, Map commitCache) { + public Optional getTSIfInTransaction(Cell kv, HBaseTransaction transaction) { long startTimestamp = transaction.getStartTimestamp(); long readTimestamp = transaction.getReadTimestamp(); @@ -322,25 +357,29 @@ private boolean isCellInTransaction(Cell kv, HBaseTransaction transaction, Map= startTimestamp && kv.getTimestamp() <= readTimestamp) { - return true; + return Optional.of(kv.getTimestamp()); } - return false; + return Optional.absent(); } - private boolean isCellInSnapshot(Cell kv, HBaseTransaction transaction, Map commitCache) + + public Optional getTSIfInSnapshot(Cell kv, HBaseTransaction transaction, Map commitCache) throws IOException { Optional commitTimestamp = getCommitTimestamp(kv, transaction, commitCache); - return commitTimestamp.isPresent() && commitTimestamp.get() < transaction.getStartTimestamp(); + if (commitTimestamp.isPresent() && commitTimestamp.get() < transaction.getStartTimestamp()) + return commitTimestamp; + + return Optional.absent(); } private Get createPendingGet(Cell cell, int versionCount) throws IOException { Get pendingGet = new Get(CellUtil.cloneRow(cell)); pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)); - pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtils.addShadowCellSuffix(cell.getQualifierArray(), + pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); pendingGet.setMaxVersions(versionCount); @@ -362,7 +401,7 @@ private Get createPendingGet(Cell cell, int versionCount) throws IOException { */ @Override public List filterCellsForSnapshot(List rawCells, HBaseTransaction transaction, - int versionsToRequest, Map> familyDeletionCache) throws IOException { + int versionsToRequest, Map familyDeletionCache, Map attributeMap) throws IOException { assert (rawCells != null && transaction != null && versionsToRequest >= 1); @@ -375,42 +414,49 @@ public List filterCellsForSnapshot(List rawCells, HBaseTransaction t } Map commitCache = buildCommitCache(rawCells); - buildFamilyDeletionCache(rawCells, familyDeletionCache); + buildFamilyDeletionCache(transaction, rawCells, familyDeletionCache, commitCache, attributeMap); + + ImmutableList> filteredCells; + if (transaction.getVisibilityLevel() == VisibilityLevel.SNAPSHOT_ALL) { + filteredCells = groupCellsByColumnFilteringShadowCells(rawCells); + } else { + filteredCells = groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(rawCells); + } - for (Collection columnCells : groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(rawCells)) { + for (Collection columnCells : filteredCells) { boolean snapshotValueFound = false; Cell oldestCell = null; for (Cell cell : columnCells) { - snapshotValueFound = checkFamilyDeletionCache(cell, transaction, familyDeletionCache, commitCache); + oldestCell = cell; + if (getTSIfInTransaction(cell, transaction).isPresent() || + getTSIfInSnapshot(cell, transaction, commitCache).isPresent()) { - if (snapshotValueFound == true) { if (transaction.getVisibilityLevel() == VisibilityLevel.SNAPSHOT_ALL) { - snapshotValueFound = false; - } else { - break; - } - } - - if (isCellInTransaction(cell, transaction, commitCache) || - isCellInSnapshot(cell, transaction, commitCache)) { - if (!CellUtil.matchingValue(cell, CellUtils.DELETE_TOMBSTONE)) { keyValuesInSnapshot.add(cell); - } - - // We can finish looking for additional results in two cases: - // 1. if we found a result and we are not in SNAPSHOT_ALL mode. - // 2. if we found a result that was not written by the current transaction. - if (transaction.getVisibilityLevel() != VisibilityLevel.SNAPSHOT_ALL || - !isCellInTransaction(cell, transaction, commitCache)) { + if (getTSIfInTransaction(cell, transaction).isPresent()) { + snapshotValueFound = false; + continue; + } else { + snapshotValueFound = true; + break; + } + } else { + if (!checkFamilyDeletionCache(cell, transaction, familyDeletionCache, commitCache) && + !CellUtils.isTombstone(cell)) { + keyValuesInSnapshot.add(cell); + } snapshotValueFound = true; break; + } } - oldestCell = cell; } if (!snapshotValueFound) { assert (oldestCell != null); Get pendingGet = createPendingGet(oldestCell, numberOfVersionsToFetch); + for (Map.Entry entry : attributeMap.entrySet()) { + pendingGet.setAttribute(entry.getKey(), entry.getValue()); + } pendingGetsList.add(pendingGet); } } @@ -420,7 +466,7 @@ public List filterCellsForSnapshot(List rawCells, HBaseTransaction t for (Result pendingGetResult : pendingGetsResults) { if (!pendingGetResult.isEmpty()) { keyValuesInSnapshot.addAll( - filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch, familyDeletionCache)); + filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch, familyDeletionCache, attributeMap)); } } } @@ -431,21 +477,21 @@ public List filterCellsForSnapshot(List rawCells, HBaseTransaction t } @Override - public Result get(TTable ttable, Get get, HBaseTransaction transaction) throws IOException { + public Result get(Get get, HBaseTransaction transaction) throws IOException { Result result = tableAccessWrapper.get(get); List filteredKeyValues = Collections.emptyList(); if (!result.isEmpty()) { - filteredKeyValues = ttable.filterCellsForSnapshot(result.listCells(), transaction, get.getMaxVersions(), new HashMap>()); + filteredKeyValues = filterCellsForSnapshot(result.listCells(), transaction, get.getMaxVersions(), new HashMap(), get.getAttributesMap()); } return Result.create(filteredKeyValues); } @Override - public ResultScanner getScanner(TTable ttable, Scan scan, HBaseTransaction transaction) throws IOException { + public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException { - return ttable.new TransactionalClientScanner(transaction, scan, 1); + return new TransactionalClientScanner(transaction, scan, 1); } @@ -483,8 +529,7 @@ static ImmutableList> groupCellsByColumnFilteringShadowCellsAnd @Override public boolean apply(Cell cell) { - boolean familyDeletionMarkerCondition = CellUtil.matchingQualifier(cell, CellUtils.FAMILY_DELETE_QUALIFIER) && - CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); + boolean familyDeletionMarkerCondition = CellUtils.isFamilyDeleteCell(cell); return cell != null && !CellUtils.isShadowCell(cell) && !familyDeletionMarkerCondition; } @@ -505,4 +550,137 @@ public ColumnWrapper apply(Cell cell) { .asList(); } + + static ImmutableList> groupCellsByColumnFilteringShadowCells(List rawCells) { + + Predicate shadowCellFilter = new Predicate() { + @Override + public boolean apply(Cell cell) { + return cell != null && !CellUtils.isShadowCell(cell); + } + }; + + Function cellToColumnWrapper = new Function() { + + @Override + public ColumnWrapper apply(Cell cell) { + return new ColumnWrapper(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)); + } + + }; + + return Multimaps.index(Iterables.filter(rawCells, shadowCellFilter), cellToColumnWrapper) + .asMap().values() + .asList(); + } + + + public class TransactionalClientScanner implements ResultScanner { + + private HBaseTransaction state; + private ResultScanner innerScanner; + private int maxVersions; + Map familyDeletionCache; + private Map attributeMap; + + TransactionalClientScanner(HBaseTransaction state, Scan scan, int maxVersions) + throws IOException { + if (scan.hasFilter()) { + LOG.warn("Client scanner with filter will return un expected results. Use Coprocessor scanning"); + } + this.state = state; + this.innerScanner = tableAccessWrapper.getScanner(scan); + this.maxVersions = maxVersions; + this.familyDeletionCache = new HashMap(); + this.attributeMap = scan.getAttributesMap(); + } + + + @Override + public Result next() throws IOException { + List filteredResult = Collections.emptyList(); + while (filteredResult.isEmpty()) { + Result result = innerScanner.next(); + if (result == null) { + return null; + } + if (!result.isEmpty()) { + filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions, familyDeletionCache, attributeMap); + } + } + return Result.create(filteredResult); + } + + // In principle no need to override, copied from super.next(int) to make + // sure it works even if super.next(int) + // changes its implementation + @Override + public Result[] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList<>(nbRows); + for (int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + + @Override + public void close() { + innerScanner.close(); + } + + @Override + public Iterator iterator() { + return new ResultIterator(this); + } + + // ------------------------------------------------------------------------------------------------------------ + // --------------------------------- Helper class for TransactionalClientScanner ------------------------------ + // ------------------------------------------------------------------------------------------------------------ + + class ResultIterator implements Iterator { + + TransactionalClientScanner scanner; + Result currentResult; + + ResultIterator(TransactionalClientScanner scanner) { + try { + this.scanner = scanner; + currentResult = scanner.next(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return currentResult != null && !currentResult.isEmpty(); + } + + @Override + public Result next() { + try { + Result result = currentResult; + currentResult = scanner.next(); + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new RuntimeException("Not implemented"); + } + + } + + } + + } diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java index 12dfb715e..4813d5b26 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java @@ -17,8 +17,14 @@ */ package org.apache.omid.transaction; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -33,6 +39,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -41,23 +48,12 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.omid.committable.CommitTable; import org.apache.omid.committable.CommitTable.CommitTimestamp; -import org.apache.omid.proto.TSOProto; import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; /** * Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link @@ -73,6 +69,8 @@ public class TTable implements Closeable { private SnapshotFilter snapshotFilter; + private boolean serverSideFilter; + // ---------------------------------------------------------------------------------------------------------------- // Construction // ---------------------------------------------------------------------------------------------------------------- @@ -98,32 +96,46 @@ public TTable(Configuration conf, String tableName, CommitTable.Client commitTab } public TTable(HTableInterface hTable) throws IOException { + this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false)); + } + + public TTable(HTableInterface hTable, boolean serverSideFilter) throws IOException { + table = hTable; + healerTable = new HTable(table.getConfiguration(), table.getTableName()); + this.serverSideFilter = serverSideFilter; + snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : + new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable)); + } + + public TTable(HTableInterface hTable, SnapshotFilter snapshotFilter ) throws IOException { table = hTable; healerTable = new HTable(table.getConfiguration(), table.getTableName()); - boolean serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false); - snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable)); + this.snapshotFilter = snapshotFilter; } public TTable(HTableInterface hTable, CommitTable.Client commitTableClient) throws IOException { table = hTable; healerTable = new HTable(table.getConfiguration(), table.getTableName()); - boolean serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false); - snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable), commitTableClient); + serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false); + snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : + new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable), commitTableClient); } public TTable(HTableInterface hTable, HTableInterface healerTable) throws IOException { table = hTable; this.healerTable = healerTable; Configuration config = table.getConfiguration(); - boolean serverSideFilter = (config == null) ? false : config.getBoolean("omid.server.side.filter", false); - snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable)); + serverSideFilter = (config == null) ? false : config.getBoolean("omid.server.side.filter", false); + snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : + new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable)); } public TTable(HTableInterface hTable, HTableInterface healerTable, CommitTable.Client commitTableClient) throws IOException { table = hTable; this.healerTable = healerTable; - boolean serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false); - snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable), commitTableClient); + serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false); + snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) : + new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable), commitTableClient); } @@ -162,6 +174,7 @@ public Result get(Transaction tx, final Get get) throws IOException { final long readTimestamp = transaction.getReadTimestamp(); final Get tsget = new Get(get.getRow()).setFilter(get.getFilter()); + propagateAttributes(get, tsget); TimeRange timeRange = get.getTimeRange(); long startTime = timeRange.getMin(); long endTime = Math.min(timeRange.getMax(), readTimestamp + 1); @@ -175,15 +188,23 @@ public Result get(Transaction tx, final Get get) throws IOException { } else { for (byte[] qualifier : qualifiers) { tsget.addColumn(family, qualifier); - tsget.addColumn(family, CellUtils.addShadowCellSuffix(qualifier)); + tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier)); } tsget.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER); - tsget.addColumn(family, CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER)); + tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER)); } } LOG.trace("Initial Get = {}", tsget); - return snapshotFilter.get(this, tsget, transaction); + return snapshotFilter.get(tsget, transaction); + } + + static private void propagateAttributes(OperationWithAttributes from, OperationWithAttributes to) { + Map attributeMap = from.getAttributesMap(); + + for (Map.Entry entry : attributeMap.entrySet()) { + to.setAttribute(entry.getKey(), entry.getValue()); + } } private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get deleteG) throws IOException { @@ -199,6 +220,8 @@ private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get } deleteP.add(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(), HConstants.EMPTY_BYTE_ARRAY); + tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER, + tx.getWriteTimestamp())); } } } @@ -209,9 +232,10 @@ private void familyQualifierBasedDeletionWithOutRead(HBaseTransaction tx, Put d for (byte[] family : fset) { deleteP.add(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(), HConstants.EMPTY_BYTE_ARRAY); + tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER, + tx.getWriteTimestamp())); + } - tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), null, null, - tx.getWriteTimestamp())); } /** @@ -227,11 +251,13 @@ public void delete(Transaction tx, Delete delete) throws IOException { HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx); - final long writeTimestamp = transaction.getStartTimestamp(); + final long writeTimestamp = transaction.getWriteTimestamp(); boolean deleteFamily = false; final Put deleteP = new Put(delete.getRow(), writeTimestamp); final Get deleteG = new Get(delete.getRow()); + propagateAttributes(delete, deleteP); + propagateAttributes(delete, deleteG); Map> fmap = delete.getFamilyCellMap(); if (fmap.isEmpty()) { familyQualifierBasedDeletion(transaction, deleteP, deleteG); @@ -293,6 +319,10 @@ public void delete(Transaction tx, Delete delete) throws IOException { } + public void markPutAsConflictFreeMutation(Put put) { + put.setAttribute(CellUtils.CONFLICT_FREE_MUTATION, Bytes.toBytes(true)); + } + /** * Transactional version of {@link HTableInterface#put(Put put)} * @@ -301,6 +331,44 @@ public void delete(Transaction tx, Delete delete) throws IOException { * @throws IOException if a remote or network exception occurs. */ public void put(Transaction tx, Put put) throws IOException { + put(tx, put, false); + } + + + /** + * @param put an instance of Put + * @param timestamp timestamp to be used as cells version + * @param commitTimestamp timestamp to be used as commit timestamp + * @throws IOException if a remote or network exception occurs. + */ + static public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) throws IOException { + final Put tsput = new Put(put.getRow(), timestamp); + propagateAttributes(put, tsput); + + Map> kvs = put.getFamilyCellMap(); + for (List kvl : kvs.values()) { + for (Cell c : kvl) { + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), timestamp); + tsput.add(kv); + tsput.add(CellUtil.cloneFamily(kv), + CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length), + kv.getTimestamp(), + Bytes.toBytes(commitTimestamp)); + } + } + + return tsput; + } + + + /** + * @param put an instance of Put + * @param tx an instance of transaction to be used + * @param autoCommit denotes whether to automatically commit the put + * @throws IOException if a remote or network exception occurs. + */ + public void put(Transaction tx, Put put, boolean autoCommit) throws IOException { throwExceptionIfOpSetsTimerange(put); @@ -310,6 +378,7 @@ public void put(Transaction tx, Put put) throws IOException { // create put with correct ts final Put tsput = new Put(put.getRow(), writeTimestamp); + propagateAttributes(put, tsput); Map> kvs = put.getFamilyCellMap(); for (List kvl : kvs.values()) { for (Cell c : kvl) { @@ -321,12 +390,25 @@ public void put(Transaction tx, Put put) throws IOException { Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp); tsput.add(kv); - transaction.addWriteSetElement( - new HBaseCellId(table, - CellUtil.cloneRow(kv), - CellUtil.cloneFamily(kv), - CellUtil.cloneQualifier(kv), - kv.getTimestamp())); + if (autoCommit) { + tsput.add(CellUtil.cloneFamily(kv), + CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length), + kv.getTimestamp(), + Bytes.toBytes(kv.getTimestamp())); + } else { + byte[] conflictFree = put.getAttribute(CellUtils.CONFLICT_FREE_MUTATION); + HBaseCellId cellId = new HBaseCellId(table, + CellUtil.cloneRow(kv), + CellUtil.cloneFamily(kv), + CellUtil.cloneQualifier(kv), + kv.getTimestamp()); + + if (conflictFree != null && conflictFree[0]!=0) { + transaction.addConflictFreeWriteSetElement(cellId); + } else { + transaction.addWriteSetElement(cellId); + } + } } } @@ -350,6 +432,7 @@ public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException { Scan tsscan = new Scan(scan); tsscan.setMaxVersions(1); tsscan.setTimeRange(0, transaction.getReadTimestamp() + 1); + propagateAttributes(scan, tsscan); Map> kvs = scan.getFamilyMap(); for (Map.Entry> entry : kvs.entrySet()) { byte[] family = entry.getKey(); @@ -358,123 +441,14 @@ public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException { continue; } for (byte[] qualifier : qualifiers) { - tsscan.addColumn(family, CellUtils.addShadowCellSuffix(qualifier)); + tsscan.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier)); } if (!qualifiers.isEmpty()) { tsscan.addColumn(entry.getKey(), CellUtils.FAMILY_DELETE_QUALIFIER); } } - return snapshotFilter.getScanner(this, tsscan, transaction); - } - - - List filterCellsForSnapshot(List rawCells, HBaseTransaction transaction, - int versionsToRequest, Map> familyDeletionCache) throws IOException { - return snapshotFilter.filterCellsForSnapshot(rawCells, transaction, versionsToRequest, familyDeletionCache); - } - - - public class TransactionalClientScanner implements ResultScanner { - - private HBaseTransaction state; - private ResultScanner innerScanner; - private int maxVersions; - Map> familyDeletionCache; - - TransactionalClientScanner(HBaseTransaction state, Scan scan, int maxVersions) - throws IOException { - this.state = state; - this.innerScanner = table.getScanner(scan); - this.maxVersions = maxVersions; - this.familyDeletionCache = new HashMap>(); - } - - - @Override - public Result next() throws IOException { - List filteredResult = Collections.emptyList(); - while (filteredResult.isEmpty()) { - Result result = innerScanner.next(); - if (result == null) { - return null; - } - if (!result.isEmpty()) { - filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions, familyDeletionCache); - } - } - return Result.create(filteredResult); - } - - // In principle no need to override, copied from super.next(int) to make - // sure it works even if super.next(int) - // changes its implementation - @Override - public Result[] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList<>(nbRows); - for (int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - - @Override - public void close() { - innerScanner.close(); - } - - @Override - public Iterator iterator() { - return new ResultIterator(this); - } - - // ------------------------------------------------------------------------------------------------------------ - // --------------------------------- Helper class for TransactionalClientScanner ------------------------------ - // ------------------------------------------------------------------------------------------------------------ - - class ResultIterator implements Iterator { - - TransactionalClientScanner scanner; - Result currentResult; - - ResultIterator(TransactionalClientScanner scanner) { - try { - this.scanner = scanner; - currentResult = scanner.next(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean hasNext() { - return currentResult != null && !currentResult.isEmpty(); - } - - @Override - public Result next() { - try { - Result result = currentResult; - currentResult = scanner.next(); - return result; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void remove() { - throw new RuntimeException("Not implemented"); - } - - } - + return snapshotFilter.getScanner(tsscan, transaction); } /** diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java index 2fc53625c..050aed092 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java @@ -17,12 +17,16 @@ */ package org.apache.omid.transaction; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; + import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Get; + //This interface is used to wrap the HTableInterface and Region object when doing client and server side filtering accordingly. public interface TableAccessWrapper { @@ -30,5 +34,5 @@ public interface TableAccessWrapper { public Result[] get(List get) throws IOException; public Result get(Get get) throws IOException; public void put(Put put) throws IOException; - + public ResultScanner getScanner(Scan scan) throws IOException; } diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java index a07be9071..1b793a1ae 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java @@ -437,4 +437,41 @@ public void testInterleavedScanReturnsTheRightSnapshotResultsWhenATransactionAbo } + @Test(timeOut = 30_000) + public void testAutoCommit(ITestContext context) + throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + + Transaction tx1 = tm.begin(); + + Put row1 = new Put(rowName1); + row1.add(famName1, colName1, dataValue1); + tt.put(tx1, row1); + + Transaction tx2 = tm.begin(); + + Transaction tx3 = tm.begin(); + + Get g = new Get(rowName1).setMaxVersions(); + g.addColumn(famName1, colName1); + Result r = tt.get(tx3, g); + assertEquals(r.size(), 0, "Unexpected size for read."); + + row1 = new Put(rowName1); + row1.add(famName1, colName1, dataValue1); + tt.put(tx2, row1, true); + + r = tt.get(tx3, g); + assertEquals(r.size(), 1, "Unexpected size for read."); + + tt.close(); + } + } diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java index 2fdd6a921..8a689caca 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.SortedMap; +import static org.apache.omid.transaction.CellUtils.SHADOW_CELL_PREFIX; import static org.apache.omid.transaction.CellUtils.SHADOW_CELL_SUFFIX; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -99,11 +100,11 @@ public void testShadowCellQualifiers(byte[] shadowCellSuffixToTest) throws IOExc public void testCorrectMapingOfCellsToShadowCells() throws IOException { // Create the required data final byte[] validShadowCellQualifier = - com.google.common.primitives.Bytes.concat(qualifier, SHADOW_CELL_SUFFIX); + com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier, SHADOW_CELL_SUFFIX); final byte[] qualifier2 = Bytes.toBytes("test-qual2"); final byte[] validShadowCellQualifier2 = - com.google.common.primitives.Bytes.concat(qualifier2, SHADOW_CELL_SUFFIX); + com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier2, SHADOW_CELL_SUFFIX); final byte[] qualifier3 = Bytes.toBytes("test-qual3"); @@ -172,10 +173,10 @@ public void testCorrectMapingOfCellsToShadowCells() throws IOException { public void testShadowCellSuffixConcatenationToQualifier() { Cell cell = new KeyValue(row, family, qualifier, 1, Bytes.toBytes("value")); - byte[] suffixedQualifier = CellUtils.addShadowCellSuffix(cell.getQualifierArray(), + byte[] suffixedQualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - byte[] expectedQualifier = com.google.common.primitives.Bytes.concat(qualifier, SHADOW_CELL_SUFFIX); + byte[] expectedQualifier = com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier, SHADOW_CELL_SUFFIX); assertEquals(suffixedQualifier, expectedQualifier); } @@ -184,19 +185,19 @@ public void testShadowCellSuffixConcatenationToQualifier() { public void testShadowCellSuffixRemovalFromQualifier(byte[] shadowCellSuffixToTest) throws IOException { // Test removal from a correclty suffixed qualifier - byte[] suffixedQualifier = com.google.common.primitives.Bytes.concat(qualifier, shadowCellSuffixToTest); + byte[] suffixedQualifier = com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier, shadowCellSuffixToTest); Cell cell = new KeyValue(row, family, suffixedQualifier, 1, Bytes.toBytes("value")); - byte[] resultedQualifier = CellUtils.removeShadowCellSuffix(cell.getQualifierArray(), + byte[] resultedQualifier = CellUtils.removeShadowCellSuffixPrefix(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); byte[] expectedQualifier = qualifier; assertEquals(resultedQualifier, expectedQualifier); // Test removal from a badly suffixed qualifier - byte[] badlySuffixedQualifier = com.google.common.primitives.Bytes.concat(qualifier, Bytes.toBytes("BAD")); + byte[] badlySuffixedQualifier = com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier, Bytes.toBytes("BAD")); Cell badCell = new KeyValue(row, family, badlySuffixedQualifier, 1, Bytes.toBytes("value")); try { - CellUtils.removeShadowCellSuffix(badCell.getQualifierArray(), + CellUtils.removeShadowCellSuffixPrefix(badCell.getQualifierArray(), badCell.getQualifierOffset(), badCell.getQualifierLength()); fail(); @@ -215,7 +216,7 @@ public void testMatchingQualifiers() { @Test(dataProvider = "shadow-cell-suffixes", timeOut = 10_000) public void testQualifierLengthFromShadowCellQualifier(byte[] shadowCellSuffixToTest) { // Test suffixed qualifier - byte[] suffixedQualifier = com.google.common.primitives.Bytes.concat(qualifier, shadowCellSuffixToTest); + byte[] suffixedQualifier = com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier, shadowCellSuffixToTest); int originalQualifierLength = CellUtils.qualifierLengthFromShadowCellQualifier(suffixedQualifier, 0, suffixedQualifier.length); assertEquals(originalQualifierLength, qualifier.length); diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java index 078ea5f42..65a2ac5f8 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -297,6 +298,42 @@ public void testSNAPSHOT_EXCLUDE_CURRENT(ITestContext context) throws Exception tt.close(); } + @Test(timeOut = 30_000) + public void testDeleteAfterCheckpoint(ITestContext context) throws Exception { + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + + Transaction tx1 = tm.begin(); + + Put row1 = new Put(rowName1); + row1.add(famName1, colName1, dataValue1); + tt.put(tx1, row1); + + tm.commit(tx1); + + Transaction tx2 = tm.begin(); + + HBaseTransaction hbaseTx2 = enforceHBaseTransactionAsParam(tx1); + + hbaseTx2.checkpoint(); + + Delete d = new Delete(rowName1); + tt.delete(tx2, d); + + try { + tm.commit(tx2); + } catch (TransactionException e) { + Assert.fail(); + } + + tt.close(); + } + @Test(timeOut = 30_000) public void testOutOfCheckpoints(ITestContext context) throws Exception { TransactionManager tm = newTransactionManager(context); diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java index 5b728561e..f7281cd5f 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java @@ -47,13 +47,13 @@ public class TestColumnIterator { // Group 1 (3 elems but grouping should filter shadow cell, so check for 2) new KeyValue(row, family1, qualifier1, 0, data), new KeyValue(row, family1, qualifier1, 1, data), - new KeyValue(row, family1, CellUtils.addShadowCellSuffix(qualifier1), 0, data), + new KeyValue(row, family1, CellUtils.addShadowCellSuffixPrefix(qualifier1), 0, data), // Group 2 (2 elems but grouping should filter shadow cell, so check for 1) new KeyValue(row, family1, qualifier2, 0, data), - new KeyValue(row, family1, CellUtils.addShadowCellSuffix(qualifier2), 0, data), + new KeyValue(row, family1, CellUtils.addShadowCellSuffixPrefix(qualifier2), 0, data), // Group 3 (2 elems but grouping should filter shadow cell, so check for 1) new KeyValue(row, family2, qualifier1, 0, data), - new KeyValue(row, family2, CellUtils.addShadowCellSuffix(qualifier1), 0, data) + new KeyValue(row, family2, CellUtils.addShadowCellSuffixPrefix(qualifier1), 0, data) ) ); diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java index b25883060..c426c1b79 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.ITestContext; @@ -47,7 +48,7 @@ public class TestDeletion extends OmidTestBase { private byte[] colA = Bytes.toBytes("testdataA"); private byte[] colB = Bytes.toBytes("testdataB"); private byte[] data1 = Bytes.toBytes("testWrite-1"); - private byte[] modrow = Bytes.toBytes("test-del" + 3); + private byte[] modrow = Bytes.toBytes("test-del" + 0); private static class FamCol { @@ -61,6 +62,102 @@ private static class FamCol { } + @Test(timeOut = 10_000) + public void runTestDeleteFamilyRow(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + ((HBaseTransactionManager) tm).setConflictDetectionLevel(ConflictDetectionLevel.ROW); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 1; + FamCol famColA = new FamCol(famA, colA); + writeRows(tt, t1, rowsWritten, famColA); + tm.commit(t1); + + Transaction t2 = tm.begin(); + Delete d = new Delete(modrow); + d.deleteFamily(famA); + tt.delete(t2, d); + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + Map count = countColsInRows(rs, famColA); + assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten"); + tm.commit(t2); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA); + Integer countFamColA = count.get(famColA); + assertEquals(countFamColA, null); + + Transaction t3 = tm.begin(); + d.deleteFamily(famA); + tt.delete(t3, d); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA); + countFamColA = count.get(famColA); + assertEquals(countFamColA, null); + + ((HBaseTransactionManager) tm).setConflictDetectionLevel(ConflictDetectionLevel.CELL); + + } + + @Test(timeOut = 10_000) + public void runTestDeleteFamilyCell(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 1; + FamCol famColA = new FamCol(famA, colA); + writeRows(tt, t1, rowsWritten, famColA); + tm.commit(t1); + + Transaction t2 = tm.begin(); + Delete d = new Delete(modrow); + d.deleteFamily(famA); + tt.delete(t2, d); + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + Map count = countColsInRows(rs, famColA); + assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten"); + tm.commit(t2); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA); + Integer countFamColA = count.get(famColA); + assertEquals(countFamColA, null); + + Transaction t3 = tm.begin(); + d.deleteFamily(famA); + tt.delete(t3, d); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA); + countFamColA = count.get(famColA); + assertEquals(countFamColA, null); + + } + @Test(timeOut = 10_000) public void runTestDeleteFamily(ITestContext context) throws Exception { @@ -95,7 +192,87 @@ public void runTestDeleteFamily(ITestContext context) throws Exception { count = countColsInRows(rs, famColA, famColB); assertEquals((int) count.get(famColA), (rowsWritten - 1), "ColA count should be equal to rowsWritten - 1"); assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten"); + } + + @Test(timeOut = 10_000) + public void runTestDeleteFamilyRowLevelCA(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + ((HBaseTransactionManager) tm).setConflictDetectionLevel(ConflictDetectionLevel.ROW); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 10; + FamCol famColA = new FamCol(famA, colA); + FamCol famColB = new FamCol(famB, colB); + writeRows(tt, t1, rowsWritten, famColA, famColB); + tm.commit(t1); + + Transaction t2 = tm.begin(); + Delete d = new Delete(modrow); + d.deleteFamily(famA); + tt.delete(t2, d); + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + Map count = countColsInRows(rs, famColA, famColB); + assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten"); + assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten"); + tm.commit(t2); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA, famColB); + assertEquals((int) count.get(famColA), (rowsWritten - 1), "ColA count should be equal to rowsWritten - 1"); + assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten"); + + ((HBaseTransactionManager) tm).setConflictDetectionLevel(ConflictDetectionLevel.CELL); + } + + @Test(timeOut = 10_000) + public void runTestDeleteFamilyAborts(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + ((HBaseTransactionManager) tm).setConflictDetectionLevel(ConflictDetectionLevel.ROW); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 10; + FamCol famColA = new FamCol(famA, colA); + FamCol famColB = new FamCol(famB, colB); + writeRows(tt, t1, rowsWritten, famColA, famColB); + + Transaction t2 = tm.begin(); + + tm.commit(t1); + + Delete d = new Delete(modrow); + d.deleteFamily(famA); + tt.delete(t2, d); + + try { + tm.commit(t2); + } catch(RollbackException e) { + System.out.println("Rollback"); + System.out.flush(); + } + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + Map count = countColsInRows(rs, famColA, famColB); + assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten"); + assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten"); + ((HBaseTransactionManager) tm).setConflictDetectionLevel(ConflictDetectionLevel.CELL); } @Test(timeOut = 10_000) diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java index 347c4ce67..51ec0c18f 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java @@ -83,7 +83,7 @@ public void testReadOnlyTransactionsDoNotContactTSOServer(ITestContext context) txTable.put(tx1, put); tm.commit(tx1); - verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class)); + verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class), anySetOf(HBaseCellId.class)); // Create a read-only tx and verify that commit has not been invoked again in the TSOClient AbstractTransaction readOnlyTx = (AbstractTransaction) tm.begin(); @@ -93,7 +93,7 @@ public void testReadOnlyTransactionsDoNotContactTSOServer(ITestContext context) assertTrue(readOnlyTx.getWriteSet().isEmpty()); tm.commit(readOnlyTx); - verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class)); + verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class), anySetOf(HBaseCellId.class)); assertEquals(readOnlyTx.getStatus(), Transaction.Status.COMMITTED_RO); } diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestMarkPutAsCommitted.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestMarkPutAsCommitted.java new file mode 100644 index 000000000..5ae4dd2e7 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestMarkPutAsCommitted.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.ITestContext; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.omid.transaction.CellUtils.hasCell; +import static org.apache.omid.transaction.CellUtils.hasShadowCell; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +@Test(groups = "sharedHBase") +public class TestMarkPutAsCommitted extends OmidTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestMarkPutAsCommitted.class); + + private static final String TEST_FAMILY = "data"; + + static final byte[] row = Bytes.toBytes("test-sc"); + static final byte[] family = Bytes.toBytes(TEST_FAMILY); + private static final byte[] qualifier = Bytes.toBytes("testdata-1"); + private static final byte[] data1 = Bytes.toBytes("testWrite-1"); + private static final byte[] data2 = Bytes.toBytes("testWrite-2"); + + @Test(timeOut = 60_000) + public void testShadowCellsExistanceInAutocommit(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + + TTable table = new TTable(hbaseConf, TEST_TABLE); + + HBaseTransaction t1 = (HBaseTransaction) tm.begin(); + + // Test shadow cells are created properly + Put put = new Put(row); + put.add(family, qualifier, data1); + + put = TTable.markPutAsCommitted(put, t1.getWriteTimestamp(), t1.getWriteTimestamp()); + + table.getHTable().put(put); + + // After markPutAsCommitted test that both cell and shadow cell are there + assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)), + "Cell should be there"); + assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)), + "Shadow cell should be there"); + } + + @Test(timeOut = 60_000) + public void testReadAfterAutocommit(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + + TTable table = new TTable(hbaseConf, TEST_TABLE); + + HBaseTransaction t1 = (HBaseTransaction) tm.begin(); + + Put put = new Put(row); + put.add(family, qualifier, data1); + + table.put(t1, put); + + tm.commit(t1); + + // After commit test that both cell and shadow cell are there + assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)), + "Cell should be there"); + assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)), + "Shadow cell should be there"); + + Transaction t2 = tm.begin(); + Get get = new Get(row); + get.addColumn(family, qualifier); + + Result getResult = table.get(t2, get); + assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same"); + + + HBaseTransaction t3 = (HBaseTransaction) tm.begin(); + + Put put1 = new Put(row); + put1.add(family, qualifier, data2); + + put1 = TTable.markPutAsCommitted(put1, t3.getWriteTimestamp(), t3.getWriteTimestamp()); + + table.getHTable().put(put1); + + // After markPutAsCommitted test that both cell and shadow cell are there + assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)), + "Cell should be there"); + assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)), + "Shadow cell should be there"); + + Transaction t4 = tm.begin(); + + getResult = table.get(t4, get); + //Test that t4 reads t3's write even though t3 was not committed + assertTrue(Arrays.equals(data2, getResult.getValue(family, qualifier)), "Values should be the same"); + } +} diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java index 8a35f9af2..18606a8f0 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java @@ -21,19 +21,21 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.omid.committable.CommitTable; -import org.apache.omid.metrics.NullMetricsProvider; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.metrics.NullMetricsProvider; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; @@ -44,7 +46,6 @@ import org.testng.annotations.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -334,7 +335,11 @@ public void run() { LOG.info("Waiting readAfterCommit barrier"); try { readAfterCommit.await(); - final TTable table = spy(new TTable(hbaseConf, TEST_TABLE)); + HTable htable = new HTable(hbaseConf, TEST_TABLE); + HTable healer = new HTable(hbaseConf, TEST_TABLE); + + final SnapshotFilter snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, healer))); + final TTable table = new TTable(htable ,snapshotFilter); doAnswer(new Answer>() { @SuppressWarnings("unchecked") @Override @@ -345,8 +350,8 @@ public List answer(InvocationOnMock invocation) throws Throwable { postCommitEnd.await(); return (List) invocation.callRealMethod(); } - }).when(table).filterCellsForSnapshot(Matchers.>any(), - any(HBaseTransaction.class), anyInt(), Matchers.>>any()); + }).when(snapshotFilter).filterCellsForSnapshot(Matchers.>any(), + any(HBaseTransaction.class), anyInt(), Matchers.>any(), Matchers.>any()); TransactionManager tm = newTransactionManager(context); if (hasShadowCell(row, @@ -438,7 +443,7 @@ public void testGetOldShadowCells(ITestContext context) throws Exception { // delete new shadow cell Delete del = new Delete(row2); - del.deleteColumn(family, CellUtils.addShadowCellSuffix(qualifier)); + del.deleteColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier)); htable.delete(del); htable.flushCommits(); diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java index 543e0c2b7..2060bc3dc 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java @@ -77,7 +77,7 @@ public void testTransactionIsCleanedUpAfterBeingAborted(ITestContext context) th .when(mockedTSOClient).getNewStartTimestamp(); doReturn(abortingFF) - .when(mockedTSOClient).commit(eq(START_TS), anySetOf(HBaseCellId.class)); + .when(mockedTSOClient).commit(eq(START_TS), anySetOf(HBaseCellId.class), anySetOf(HBaseCellId.class)); try (TransactionManager tm = newTransactionManager(context, mockedTSOClient); TTable txTable = new TTable(hbaseConf, TEST_TABLE)) { diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java index 78cdd2657..21745c465 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java @@ -300,4 +300,210 @@ public void testMultipleCellChangesOnSameRow(ITestContext context) throws Except tm.commit(t1); } + @Test(timeOut = 10_000) + public void runTestWriteWriteConflictWithAdditionalConflictFreeWrites(ITestContext context) throws Exception { + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + Transaction t2 = tm.begin(); + LOG.info("Transaction created" + t2); + + byte[] row = Bytes.toBytes("test-simple"); + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("testdata"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + Put p = new Put(row); + p.add(fam, col, data1); + tt.put(t1, p); + + Put p2 = new Put(row); + p2.add(fam, col, data2); + tt.put(t2, p2); + + row = Bytes.toBytes("test-simple-cf"); + p = new Put(row); + p.add(fam, col, data1); + tt.markPutAsConflictFreeMutation(p); + tt.put(t1, p); + + p2 = new Put(row); + p2.add(fam, col, data2); + tt.markPutAsConflictFreeMutation(p2); + tt.put(t2, p2); + + tm.commit(t2); + + try { + tm.commit(t1); + fail("Transaction should not commit successfully"); + } catch (RollbackException e) { + } + } + + @Test(timeOut = 10_000) + public void runTestWriteWriteConflictFreeWrites(ITestContext context) throws Exception { + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + Transaction t2 = tm.begin(); + LOG.info("Transaction created" + t2); + + byte[] row = Bytes.toBytes("test-simple"); + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("testdata"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + Put p = new Put(row); + p.add(fam, col, data1); + tt.markPutAsConflictFreeMutation(p); + tt.put(t1, p); + + Put p2 = new Put(row); + p2.add(fam, col, data2); + tt.markPutAsConflictFreeMutation(p2); + tt.put(t2, p2); + + row = Bytes.toBytes("test-simple-cf"); + p = new Put(row); + p.add(fam, col, data1); + tt.markPutAsConflictFreeMutation(p); + tt.put(t1, p); + + p2 = new Put(row); + p2.add(fam, col, data2); + tt.markPutAsConflictFreeMutation(p2); + tt.put(t2, p2); + + tm.commit(t2); + + try { + tm.commit(t1); + } catch (RollbackException e) { + fail("Transaction should not commit successfully"); + } + } + + @Test(timeOut = 10_000) + public void runTestWriteWriteConflictFreeWritesWithOtherWrites(ITestContext context) throws Exception { + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + Transaction t2 = tm.begin(); + LOG.info("Transaction created" + t2); + + byte[] row = Bytes.toBytes("test-simple"); + byte[] row1 = Bytes.toBytes("test-simple-1"); + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("testdata"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + Put p = new Put(row); + p.add(fam, col, data1); + tt.put(t1, p); + + Put p2 = new Put(row1); + p2.add(fam, col, data2); + tt.put(t2, p2); + + row = Bytes.toBytes("test-simple-cf"); + p = new Put(row); + p.add(fam, col, data1); + tt.markPutAsConflictFreeMutation(p); + tt.put(t1, p); + + p2 = new Put(row); + p2.add(fam, col, data2); + tt.markPutAsConflictFreeMutation(p2); + tt.put(t2, p2); + + tm.commit(t2); + + try { + tm.commit(t1); + } catch (RollbackException e) { + fail("Transaction should not commit successfully"); + } + } + + @Test(timeOut = 10_000) + public void runTestCleanupConflictFreeWritesAfterConflict(ITestContext context) throws Exception { + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + Transaction t2 = tm.begin(); + LOG.info("Transaction created" + t2); + + byte[] row = Bytes.toBytes("test-simple"); + byte[] row1 = Bytes.toBytes("test-simple-1"); + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("testdata"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + Put p = new Put(row); + p.add(fam, col, data1); + tt.put(t1, p); + + Get g = new Get(row).setMaxVersions(); + g.addColumn(fam, col); + Result r = tt.getHTable().get(g); + assertEquals(r.size(), 1, "Unexpected size for read."); + assertTrue(Bytes.equals(data1, r.getValue(fam, col)), + "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col))); + + Put p2 = new Put(row); + p2.add(fam, col, data2); + tt.put(t2, p2); + + Put p3 = new Put(row1); + p3.add(fam, col, data2); + tt.markPutAsConflictFreeMutation(p3); + tt.put(t2, p3); + + r = tt.getHTable().get(g); + assertEquals(r.size(), 2, "Unexpected size for read."); + r = tt.get(t2, g); + assertEquals(r.size(),1, "Unexpected size for read."); + assertTrue(Bytes.equals(data2, r.getValue(fam, col)), + "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col))); + + Get g1 = new Get(row1).setMaxVersions(); + g1.addColumn(fam, col); + r = tt.getHTable().get(g1); + assertEquals(r.size(), 1, "Unexpected size for read."); + + tm.commit(t1); + + boolean aborted = false; + try { + tm.commit(t2); + fail("Transaction commited successfully"); + } catch (RollbackException e) { + aborted = true; + } + assertTrue(aborted, "Transaction didn't raise exception"); + + r = tt.getHTable().get(g); + assertEquals(r.size(), 1, "Unexpected size for read."); + assertTrue(Bytes.equals(data1, r.getValue(fam, col)), + "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col))); + r = tt.getHTable().get(g1); + assertEquals(r.size(), 0, "Unexpected size for read."); + } } diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java index 14b9d72a6..04900a6c9 100644 --- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java +++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; @@ -48,10 +49,15 @@ public final class CellUtils { private static final Logger LOG = LoggerFactory.getLogger(CellUtils.class); static final byte[] SHADOW_CELL_SUFFIX = "\u0080".getBytes(Charsets.UTF_8); // Non printable char (128 ASCII) - static byte[] DELETE_TOMBSTONE = Bytes.toBytes("__OMID_TOMBSTONE__"); - public static final byte[] FAMILY_DELETE_QUALIFIER = new byte[0]; + //Prefix starts with 0 to apear before other cells in TransactionVisibilityFilter + static final byte[] SHADOW_CELL_PREFIX = "\u0000\u0080".getBytes(Charsets.UTF_8); + static byte[] DELETE_TOMBSTONE = HConstants.EMPTY_BYTE_ARRAY; + static byte[] LEGACY_DELETE_TOMBSTONE = Bytes.toBytes("__OMID_TOMBSTONE__"); + public static final byte[] FAMILY_DELETE_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; public static final String TRANSACTION_ATTRIBUTE = "__OMID_TRANSACTION__"; + /**/ public static final String CLIENT_GET_ATTRIBUTE = "__OMID_CLIENT_GET__"; + public static final String CONFLICT_FREE_MUTATION = "__OMID_CONFLICT_FREE_MUTATION__"; /** * Utility interface to get rid of the dependency on HBase server package @@ -100,7 +106,7 @@ public static boolean hasShadowCell(byte[] row, byte[] qualifier, long version, CellGetter cellGetter) throws IOException { - return hasCell(row, family, addShadowCellSuffix(qualifier), + return hasCell(row, family, addShadowCellSuffixPrefix(qualifier), version, cellGetter); } @@ -111,10 +117,12 @@ public static boolean hasShadowCell(byte[] row, * @param qualLength the qualifier length * @return the suffixed qualifier */ - public static byte[] addShadowCellSuffix(byte[] qualifierArray, int qualOffset, int qualLength) { - byte[] result = new byte[qualLength + SHADOW_CELL_SUFFIX.length]; - System.arraycopy(qualifierArray, qualOffset, result, 0, qualLength); - System.arraycopy(SHADOW_CELL_SUFFIX, 0, result, qualLength, SHADOW_CELL_SUFFIX.length); + public static byte[] addShadowCellSuffixPrefix(byte[] qualifierArray, int qualOffset, int qualLength) { + byte[] result = new byte[qualLength + SHADOW_CELL_SUFFIX.length + SHADOW_CELL_PREFIX.length]; + System.arraycopy(SHADOW_CELL_PREFIX, 0, result,0 , SHADOW_CELL_PREFIX.length); + System.arraycopy(qualifierArray, qualOffset, result, SHADOW_CELL_PREFIX.length, qualLength); + System.arraycopy(SHADOW_CELL_SUFFIX, 0, result, qualLength + SHADOW_CELL_PREFIX.length, + SHADOW_CELL_SUFFIX.length); return result; } @@ -125,8 +133,8 @@ public static byte[] addShadowCellSuffix(byte[] qualifierArray, int qualOffset, * the qualifier to be suffixed * @return the suffixed qualifier */ - public static byte[] addShadowCellSuffix(byte[] qualifier) { - return addShadowCellSuffix(qualifier, 0, qualifier.length); + public static byte[] addShadowCellSuffixPrefix(byte[] qualifier) { + return addShadowCellSuffixPrefix(qualifier, 0, qualifier.length); } /** @@ -137,12 +145,18 @@ public static byte[] addShadowCellSuffix(byte[] qualifier) { * @param qualLength the qualifier length * @return the new qualifier without the suffix */ - public static byte[] removeShadowCellSuffix(byte[] qualifier, int qualOffset, int qualLength) { - + public static byte[] removeShadowCellSuffixPrefix(byte[] qualifier, int qualOffset, int qualLength) { if (endsWith(qualifier, qualOffset, qualLength, SHADOW_CELL_SUFFIX)) { - return Arrays.copyOfRange(qualifier, - qualOffset, - qualOffset + (qualLength - SHADOW_CELL_SUFFIX.length)); + if (startsWith(qualifier, qualOffset,qualLength, SHADOW_CELL_PREFIX)) { + return Arrays.copyOfRange(qualifier, + qualOffset + SHADOW_CELL_PREFIX.length, + qualOffset + (qualLength - SHADOW_CELL_SUFFIX.length)); + } else { + //support backward competatbiliy + return Arrays.copyOfRange(qualifier, + qualOffset,qualOffset + (qualLength - SHADOW_CELL_SUFFIX.length)); + } + } throw new IllegalArgumentException( @@ -151,7 +165,7 @@ public static byte[] removeShadowCellSuffix(byte[] qualifier, int qualOffset, in } /** - * Returns the qualifier length removing the shadow cell suffix. In case that que suffix is not found, + * Returns the qualifier length removing the shadow cell suffix and prefix. In case that que suffix is not found, * just returns the length of the qualifier passed. * @param qualifier the qualifier to remove the suffix from * @param qualOffset the offset where the qualifier starts @@ -161,11 +175,13 @@ public static byte[] removeShadowCellSuffix(byte[] qualifier, int qualOffset, in public static int qualifierLengthFromShadowCellQualifier(byte[] qualifier, int qualOffset, int qualLength) { if (endsWith(qualifier, qualOffset, qualLength, SHADOW_CELL_SUFFIX)) { - return qualLength - SHADOW_CELL_SUFFIX.length; + if (startsWith(qualifier,qualOffset, qualLength, SHADOW_CELL_PREFIX)) { + return qualLength - SHADOW_CELL_SUFFIX.length - SHADOW_CELL_PREFIX.length; + } else { + return qualLength - SHADOW_CELL_SUFFIX.length; + } } - return qualLength; - } /** @@ -200,6 +216,17 @@ public static void validateCell(Cell cell, long startTimestamp) { } } + /** + * Returns whether a cell contains a qualifier that is a delete cell + * column qualifier or not. + * @param cell the cell to check if contains the delete cell qualifier + * @return whether the cell passed contains a delete cell qualifier or not + */ + public static boolean isFamilyDeleteCell(Cell cell) { + return CellUtil.matchingQualifier(cell, CellUtils.FAMILY_DELETE_QUALIFIER) && + CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); + } + /** * Returns whether a cell contains a qualifier that is a shadow cell * column qualifier or not. @@ -225,15 +252,27 @@ private static boolean endsWith(byte[] value, int offset, int length, byte[] suf return result == 0; } + private static boolean startsWith(byte[] value, int offset, int length, byte[] prefix) { + if (length <= prefix.length) { + return false; + } + + int result = Bytes.compareTo(value, offset, prefix.length, + prefix, 0, prefix.length); + return result == 0; + } + /** * Returns if a cell is marked as a tombstone. * @param cell the cell to check * @return whether the cell is marked as a tombstone or not */ public static boolean isTombstone(Cell cell) { - return CellUtil.matchingValue(cell, DELETE_TOMBSTONE); + return CellUtil.matchingValue(cell, DELETE_TOMBSTONE) || + CellUtil.matchingValue(cell, LEGACY_DELETE_TOMBSTONE); } + /** * Returns a new shadow cell created from a particular cell. * @param cell @@ -243,7 +282,7 @@ public static boolean isTombstone(Cell cell) { * @return the brand-new shadow cell */ public static Cell buildShadowCellFromCell(Cell cell, byte[] shadowCellValue) { - byte[] shadowCellQualifier = addShadowCellSuffix(cell.getQualifierArray(), + byte[] shadowCellQualifier = addShadowCellSuffixPrefix(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); return new KeyValue( @@ -349,8 +388,13 @@ public boolean equals(Object o) { int qualifierLength = qualifierLengthFromShadowCellQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + int qualifierOffset = cell.getQualifierOffset(); + if (startsWith(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), SHADOW_CELL_PREFIX)) { + qualifierOffset = qualifierOffset + SHADOW_CELL_PREFIX.length; + } if (!matchingQualifier(otherCell, - cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength)) { + cell.getQualifierArray(), qualifierOffset, qualifierLength)) { return false; } } else { @@ -370,12 +414,17 @@ public int hashCode() { hasher.putBytes(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); hasher.putBytes(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); int qualifierLength = cell.getQualifierLength(); - if (isShadowCell()) { // Update qualifier length when qualifier is shadow cell + int qualifierOffset = cell.getQualifierOffset(); + if (isShadowCell()) { qualifierLength = qualifierLengthFromShadowCellQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (startsWith(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), SHADOW_CELL_PREFIX)) { + qualifierOffset = qualifierOffset + SHADOW_CELL_PREFIX.length; + } } - hasher.putBytes(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); + hasher.putBytes(cell.getQualifierArray(),qualifierOffset , qualifierLength); hasher.putLong(cell.getTimestamp()); return hasher.hash().asInt(); } @@ -393,7 +442,7 @@ public String toString() { cell.getQualifierOffset(), cell.getQualifierLength()); helper.add("qualifier whithout shadow cell suffix", - Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength)); + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset() + 1, qualifierLength)); } helper.add("ts", cell.getTimestamp()); return helper.toString(); diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java index 12ecbb027..cf9316376 100644 --- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java +++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java @@ -235,7 +235,7 @@ private Optional queryCommitTimestamp(Cell cell) throws IOExcep } else { Get g = new Get(CellUtil.cloneRow(cell)); byte[] family = CellUtil.cloneFamily(cell); - byte[] qualifier = CellUtils.addShadowCellSuffix(cell.getQualifierArray(), + byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); g.addColumn(family, qualifier); diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java deleted file mode 100644 index 752e9a787..000000000 --- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.omid.transaction.HBaseTransaction; -import org.apache.omid.transaction.SnapshotFilterImpl; - -public class OmidRegionScanner implements RegionScanner { - - private RegionScanner scanner; - private SnapshotFilterImpl snapshotFilter; - private HBaseTransaction transaction; - private int maxVersions; - private Map> familyDeletionCache; - - public OmidRegionScanner(SnapshotFilterImpl snapshotFilter, - RegionScanner s, - HBaseTransaction transaction, - int maxVersions) { - this.snapshotFilter = snapshotFilter; - this.scanner = s; - this.transaction = transaction; - this.maxVersions = maxVersions; - this.familyDeletionCache = new HashMap>(); - } - - @Override - public boolean next(List results) throws IOException { - return next(results, Integer.MAX_VALUE); - } - - public boolean next(List result, int limit) throws IOException { - return nextRaw(result, limit); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override - public HRegionInfo getRegionInfo() { - return scanner.getRegionInfo(); - } - - @Override - public boolean isFilterDone() throws IOException { - return scanner.isFilterDone(); - } - - @Override - public boolean reseek(byte[] row) throws IOException { - throw new RuntimeException("Not implemented"); - } - - @Override - public long getMaxResultSize() { - return scanner.getMaxResultSize(); - } - - @Override - public long getMvccReadPoint() { - return scanner.getMvccReadPoint(); - } - - @Override - public boolean nextRaw(List result) throws IOException { - return nextRaw(result,Integer.MAX_VALUE); - } - - public boolean next(List result, - ScannerContext scannerContext) throws IOException { - return next(result, scannerContext.getBatchLimit()); - } - - public boolean nextRaw(List result, - ScannerContext scannerContext) throws IOException { - return nextRaw(result, scannerContext.getBatchLimit()); - } - - public int getBatch() { - return Integer.MAX_VALUE; - } - - public boolean nextRaw(List result, int limit) throws IOException { - List filteredResult = new ArrayList(); - while (filteredResult.isEmpty()) { - scanner.nextRaw(filteredResult); - if (filteredResult.isEmpty()) { - return false; - } - - filteredResult = snapshotFilter.filterCellsForSnapshot(filteredResult, transaction, maxVersions, familyDeletionCache); - } - - for (Cell cell : filteredResult) { - result.add(cell); - } - - return true; - } - -} diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java index 28a4be3e3..4786eda78 100644 --- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java +++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.util.List; + import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.omid.transaction.TableAccessWrapper; // This class wraps the Region object when doing server side filtering. @@ -56,4 +58,9 @@ public void put(Put put) throws IOException { region.put(put); } + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return null; + } + } diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilter.java new file mode 100644 index 000000000..e80ac96f5 --- /dev/null +++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; + +import java.io.IOException; +import java.util.List; + +/** + * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue} + * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL}, + * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different + * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}. + * Please see TEPHRA-169 for more details. + */ + +public class CellSkipFilter extends FilterBase { + private final Filter filter; + // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL + private KeyValue skipColumn = null; + + public CellSkipFilter(Filter filter) { + this.filter = filter; + } + + /** + * Determines whether the current cell should be skipped. The cell will be skipped + * if the previous keyvalue had the same key as the current cell. This means filter already responded + * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL. + * @param cell the {@link Cell} to be tested for skipping + * @return true is current cell should be skipped, false otherwise + */ + private boolean skipCellVersion(Cell cell) { + return skipColumn != null + && CellUtil.matchingRow(cell, skipColumn.getRowArray(), skipColumn.getRowOffset(), + skipColumn.getRowLength()) + && CellUtil.matchingFamily(cell, skipColumn.getFamilyArray(), skipColumn.getFamilyOffset(), + skipColumn.getFamilyLength()) + && CellUtil.matchingQualifier(cell, skipColumn.getQualifierArray(), skipColumn.getQualifierOffset(), + skipColumn.getQualifierLength()); + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + if (skipCellVersion(cell)) { + return ReturnCode.NEXT_COL; + } + + ReturnCode code = filter.filterKeyValue(cell); + if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) { + // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL + skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength()); + } else { + skipColumn = null; + } + return code; + } + + @Override + public boolean filterRow() throws IOException { + return filter.filterRow(); + } + + @Override + public Cell transformCell(Cell cell) throws IOException { + return filter.transformCell(cell); + } + + @Override + public void reset() throws IOException { + filter.reset(); + } + + @Override + public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { + return filter.filterRowKey(buffer, offset, length); + } + + @Override + public boolean filterAllRemaining() throws IOException { + return filter.filterAllRemaining(); + } + + @Override + public void filterRowCells(List kvs) throws IOException { + filter.filterRowCells(kvs); + } + + @Override + public boolean hasFilterRow() { + return filter.hasFilterRow(); + } + + @SuppressWarnings("deprecation") + @Override + public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { + return filter.getNextKeyHint(currentKV); + } + + @Override + public Cell getNextCellHint(Cell currentKV) throws IOException { + return filter.getNextCellHint(currentKV); + } + + @Override + public boolean isFamilyEssential(byte[] name) throws IOException { + return filter.isFamilyEssential(name); + } + + @Override + public byte[] toByteArray() throws IOException { + return filter.toByteArray(); + } +} diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java index 887a2f69f..e839187d5 100644 --- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java +++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java @@ -18,11 +18,13 @@ package org.apache.omid.transaction; import com.google.common.annotations.VisibleForTesting; + import org.apache.omid.committable.CommitTable; import org.apache.omid.committable.hbase.HBaseCommitTable; import org.apache.omid.committable.hbase.HBaseCommitTableConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; @@ -58,6 +60,8 @@ public class OmidCompactor extends BaseRegionObserver { final static String OMID_COMPACTABLE_CF_FLAG = "OMID_ENABLED"; + private boolean enableCompactorForAllFamilies = false; + private HBaseCommitTableConfig commitTableConf = null; private Configuration conf = null; @VisibleForTesting @@ -71,7 +75,12 @@ public class OmidCompactor extends BaseRegionObserver { private boolean retainNonTransactionallyDeletedCells; public OmidCompactor() { - LOG.info("Compactor coprocessor initialized via empty constructor"); + this(false); + } + + public OmidCompactor(boolean enableCompactorForAllFamilies) { + LOG.info("Compactor coprocessor initialized"); + this.enableCompactorForAllFamilies = enableCompactorForAllFamilies; } @Override @@ -101,31 +110,43 @@ public void stop(CoprocessorEnvironment e) throws IOException { } @Override - public InternalScanner preCompact(ObserverContext e, + public InternalScanner preCompact(ObserverContext env, Store store, InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException { - HTableDescriptor desc = e.getEnvironment().getRegion().getTableDesc(); - HColumnDescriptor famDesc + boolean omidCompactable; + try { + if (enableCompactorForAllFamilies) { + omidCompactable = true; + } else { + HTableDescriptor desc = env.getEnvironment().getRegion().getTableDesc(); + HColumnDescriptor famDesc = desc.getFamily(Bytes.toBytes(store.getColumnFamilyName())); - boolean omidCompactable = Boolean.valueOf(famDesc.getValue(OMID_COMPACTABLE_CF_FLAG)); - // only column families tagged as compactable are compacted - // with omid compactor - if (!omidCompactable) { - return scanner; - } else { - CommitTable.Client commitTableClient = commitTableClientQueue.poll(); - if (commitTableClient == null) { - commitTableClient = initAndGetCommitTableClient(); + omidCompactable = Boolean.valueOf(famDesc.getValue(OMID_COMPACTABLE_CF_FLAG)); + } + + // only column families tagged as compactable are compacted + // with omid compactor + if (!omidCompactable) { + return scanner; + } else { + CommitTable.Client commitTableClient = commitTableClientQueue.poll(); + if (commitTableClient == null) { + commitTableClient = initAndGetCommitTableClient(); + } + boolean isMajorCompaction = request.isMajor(); + return new CompactorScanner(env, + scanner, + commitTableClient, + commitTableClientQueue, + isMajorCompaction, + retainNonTransactionallyDeletedCells); } - boolean isMajorCompaction = request.isMajor(); - return new CompactorScanner(e, - scanner, - commitTableClient, - commitTableClientQueue, - isMajorCompaction, - retainNonTransactionallyDeletedCells); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new DoNotRetryIOException(e); } } diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java index daf319cd4..024d8993f 100644 --- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java +++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java @@ -17,8 +17,13 @@ */ package org.apache.omid.transaction; -import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; + +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.omid.committable.CommitTable; import org.apache.omid.committable.hbase.HBaseCommitTable; import org.apache.omid.committable.hbase.HBaseCommitTableConfig; @@ -29,22 +34,21 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.OmidRegionScanner; import org.apache.hadoop.hbase.regionserver.RegionAccessWrapper; -import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; + import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY; @@ -57,12 +61,14 @@ public class OmidSnapshotFilter extends BaseRegionObserver { private HBaseCommitTableConfig commitTableConf = null; private Configuration conf = null; - @VisibleForTesting - private CommitTable.Client commitTableClient; - - private SnapshotFilterImpl snapshotFilter; + private Queue snapshotFilterQueue = new ConcurrentLinkedQueue<>(); + private Map snapshotFilterMap = new ConcurrentHashMap(); + private CommitTable.Client inMemoryCommitTable = null; - final static String OMID_SNAPSHOT_FILTER_CF_FLAG = "OMID_SNAPSHOT_FILTER_ENABLED"; + public OmidSnapshotFilter(CommitTable.Client commitTableClient) { + LOG.info("Compactor coprocessor initialized with constructor for testing"); + this.inMemoryCommitTable = commitTableClient; + } public OmidSnapshotFilter() { LOG.info("Compactor coprocessor initialized via empty constructor"); @@ -77,80 +83,126 @@ public void start(CoprocessorEnvironment env) throws IOException { if (commitTableName != null) { commitTableConf.setTableName(commitTableName); } - commitTableClient = initAndGetCommitTableClient(); - - snapshotFilter = new SnapshotFilterImpl(commitTableClient); - LOG.info("Snapshot filter started"); } @Override public void stop(CoprocessorEnvironment e) throws IOException { LOG.info("Stopping snapshot filter coprocessor"); - commitTableClient.close(); + if (snapshotFilterQueue != null) { + for (SnapshotFilterImpl snapshotFilter: snapshotFilterQueue) { + snapshotFilter.closeCommitTableClient(); + } + } LOG.info("Snapshot filter stopped"); } + @Override - public void preGetOp(ObserverContext c, Get get, List result) throws IOException { + public void postGetOp(ObserverContext e, Get get, List results) + throws IOException { + SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(get); + if (snapshotFilter != null) { + snapshotFilterQueue.add(snapshotFilter); + } + } - if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return; - get.setAttribute(CellUtils.CLIENT_GET_ATTRIBUTE, null); - RegionAccessWrapper regionAccessWrapper = new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(c.getEnvironment())); - Result res = regionAccessWrapper.get(get); // get parameters were set at the client side + @Override + public void preGetOp(ObserverContext e, Get get, List results) + throws IOException { - snapshotFilter.setTableAccessWrapper(regionAccessWrapper); + if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return; - List filteredKeyValues = Collections.emptyList(); - if (!res.isEmpty()) { - TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE)); + HBaseTransaction hbaseTransaction = getHBaseTransaction(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE)); + SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e); + snapshotFilterMap.put(get, snapshotFilter); + // In order to get hbase FilterBase framework to keep getting more versions + get.setMaxVersions(); + Filter newFilter = TransactionFilters.getVisibilityFilter(get.getFilter(), + snapshotFilter, hbaseTransaction); + get.setFilter(newFilter); + } - long id = transaction.getTimestamp(); - long readTs = transaction.getReadTimestamp(); - long epoch = transaction.getEpoch(); - VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel()); - HBaseTransaction hbaseTransaction = new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet(), null); - filteredKeyValues = snapshotFilter.filterCellsForSnapshot(res.listCells(), hbaseTransaction, get.getMaxVersions(), new HashMap>()); + private SnapshotFilterImpl getSnapshotFilter(ObserverContext e) + throws IOException { + SnapshotFilterImpl snapshotFilter= snapshotFilterQueue.poll(); + if (snapshotFilter == null) { + RegionAccessWrapper regionAccessWrapper = + new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(e.getEnvironment())); + snapshotFilter = new SnapshotFilterImpl(regionAccessWrapper, initAndGetCommitTableClient()); } + return snapshotFilter; + } - for (Cell cell : filteredKeyValues) { - result.add(cell); + + @Override + public RegionScanner preScannerOpen(ObserverContext e, + Scan scan, + RegionScanner s) throws IOException { + + byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE); + + if (byteTransaction == null) { + return s; } - c.bypass(); + HBaseTransaction hbaseTransaction = getHBaseTransaction(byteTransaction); + SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e); + scan.setMaxVersions(); + Filter newFilter = TransactionFilters.getVisibilityFilter(scan.getFilter(), + snapshotFilter, hbaseTransaction); + scan.setFilter(newFilter); + snapshotFilterMap.put(scan, snapshotFilter); + return s; } @Override public RegionScanner postScannerOpen(ObserverContext e, - Scan scan, - RegionScanner s) throws IOException { + Scan scan, + RegionScanner s) throws IOException { byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE); if (byteTransaction == null) { return s; } - TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction); + SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(scan); + assert(snapshotFilter != null); + snapshotFilterMap.remove(scan); + snapshotFilterMap.put(s, snapshotFilter); + return s; + } + + @Override + public void preScannerClose(ObserverContext e, InternalScanner s) + throws IOException { + SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(s); + if (snapshotFilter != null) { + snapshotFilterQueue.add(snapshotFilter); + } + } + + + private HBaseTransaction getHBaseTransaction(byte[] byteTransaction) + throws InvalidProtocolBufferException { + TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction); long id = transaction.getTimestamp(); long readTs = transaction.getReadTimestamp(); long epoch = transaction.getEpoch(); VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel()); - HBaseTransaction hbaseTransaction = new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet(), null); - - RegionAccessWrapper regionAccessWrapper = new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(e.getEnvironment())); - - snapshotFilter.setTableAccessWrapper(regionAccessWrapper); - - return new OmidRegionScanner(snapshotFilter, s, hbaseTransaction, 1); + return new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet(), new HashSet(), null); } private CommitTable.Client initAndGetCommitTableClient() throws IOException { LOG.info("Trying to get the commit table client"); + if (inMemoryCommitTable != null) { + return inMemoryCommitTable; + } CommitTable commitTable = new HBaseCommitTable(conf, commitTableConf); CommitTable.Client commitTableClient = commitTable.getClient(); LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName()); diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java new file mode 100644 index 000000000..8b5e68792 --- /dev/null +++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import org.apache.hadoop.hbase.filter.Filter; + +public class TransactionFilters { + + public static Filter getVisibilityFilter(Filter cellFilter, + SnapshotFilterImpl regionAccessWrapper, + HBaseTransaction hbaseTransaction) { + return new CellSkipFilter(new TransactionVisibilityFilter(cellFilter, regionAccessWrapper, hbaseTransaction)); + } +} diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java new file mode 100644 index 000000000..dafdf3f2c --- /dev/null +++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import com.google.common.base.Optional; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class TransactionVisibilityFilter extends FilterBase { + + // optional sub-filter to apply to visible cells + private final Filter userFilter; + private final SnapshotFilterImpl snapshotFilter; + private final Map commitCache; + private final HBaseTransaction hbaseTransaction; + + // This cache is cleared when moving to the next row + // So no need to keep row name + private final Map familyDeletionCache; + + public TransactionVisibilityFilter(Filter cellFilter, + SnapshotFilterImpl snapshotFilter, + HBaseTransaction hbaseTransaction) { + this.userFilter = cellFilter; + this.snapshotFilter = snapshotFilter; + commitCache = new HashMap<>(); + this.hbaseTransaction = hbaseTransaction; + familyDeletionCache = new HashMap<>(); + } + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + if (CellUtils.isShadowCell(v)) { + Long commitTs = Bytes.toLong(CellUtil.cloneValue(v)); + commitCache.put(v.getTimestamp(), commitTs); + // Continue getting shadow cells until one of them fits this transaction + if (hbaseTransaction.getStartTimestamp() >= commitTs) { + return ReturnCode.NEXT_COL; + } else { + return ReturnCode.SKIP; + } + } + + Optional ct = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v)); + if (ct.isPresent()) { + commitCache.put(v.getTimestamp(), ct.get()); + if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL && + snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) { + return runUserFilter(v, ReturnCode.INCLUDE); + } + if (CellUtils.isFamilyDeleteCell(v)) { + familyDeletionCache.put(createImmutableBytesWritable(v), ct.get()); + if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) { + return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL); + } else { + return ReturnCode.NEXT_COL; + } + } + Long deleteCommit = familyDeletionCache.get(createImmutableBytesWritable(v)); + if (deleteCommit != null && deleteCommit >= v.getTimestamp()) { + if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) { + return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL); + } else { + return ReturnCode.NEXT_COL; + } + } + if (CellUtils.isTombstone(v)) { + if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) { + return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL); + } else { + return ReturnCode.NEXT_COL; + } + } + + return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL); + } + + return ReturnCode.SKIP; + } + + + private ImmutableBytesWritable createImmutableBytesWritable(Cell v) { + return new ImmutableBytesWritable(v.getFamilyArray(), + v.getFamilyOffset(),v.getFamilyLength()); + } + + private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn) + throws IOException { + assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL || snapshotReturn == ReturnCode.INCLUDE); + if (userFilter == null) { + return snapshotReturn; + } + + ReturnCode userRes = userFilter.filterKeyValue(v); + switch (userRes) { + case INCLUDE: + return snapshotReturn; + case SKIP: + return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL; + default: + return userRes; + } + + } + + // For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct + private Optional getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException { + Long cachedCommitTS = commitCache.get(v.getTimestamp()); + if (cachedCommitTS != null && hbaseTransaction.getStartTimestamp() >= cachedCommitTS) { + return Optional.of(cachedCommitTS); + } + if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) { + return Optional.of(v.getTimestamp()); + } + + if (getShadowCellBeforeCT) { + + // Try to get shadow cell from region + final Get get = new Get(CellUtil.cloneRow(v)); + get.setTimeStamp(v.getTimestamp()).setMaxVersions(1); + get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER)); + Result shadowCell = snapshotFilter.getTableAccessWrapper().get(get); + + if (!shadowCell.isEmpty()) { + long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0])); + if (commitTS <= hbaseTransaction.getStartTimestamp()) { + return Optional.of(commitTS); + } + } + } + + return snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache); + } + + + @Override + public void reset() throws IOException { + commitCache.clear(); + familyDeletionCache.clear(); + if (userFilter != null) { + userFilter.reset(); + } + } + + @Override + public boolean filterRow() throws IOException { + if (userFilter != null) { + return userFilter.filterRow(); + } + return super.filterRow(); + } + + + @Override + public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { + if (userFilter != null) { + return userFilter.filterRowKey(buffer, offset, length); + } + return super.filterRowKey(buffer, offset, length); + } + + @Override + public boolean filterAllRemaining() throws IOException { + if (userFilter != null) { + return userFilter.filterAllRemaining(); + } + return super.filterAllRemaining(); + } + + @Override + public void filterRowCells(List kvs) throws IOException { + if (userFilter != null) { + userFilter.filterRowCells(kvs); + } else { + super.filterRowCells(kvs); + } + } + + @Override + public boolean hasFilterRow() { + if (userFilter != null) { + return userFilter.hasFilterRow(); + } + return super.hasFilterRow(); + } + + + @Override + public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { + if (userFilter != null) { + return userFilter.getNextKeyHint(currentKV); + } + return super.getNextKeyHint(currentKV); + } + + @Override + public Cell getNextCellHint(Cell currentKV) throws IOException { + if (userFilter != null) { + return userFilter.getNextCellHint(currentKV); + } + return super.getNextCellHint(currentKV); + } + + @Override + public boolean isFamilyEssential(byte[] name) throws IOException { + if (userFilter != null) { + return userFilter.isFamilyEssential(name); + } + return super.isFamilyEssential(name); + } + + @Override + public byte[] toByteArray() throws IOException { + return super.toByteArray(); + } +} diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java index 29f0a4b24..90dd5ddba 100644 --- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java +++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java @@ -17,15 +17,18 @@ */ package org.apache.omid.transaction; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; @@ -33,6 +36,14 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; + +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.omid.TestUtils; import org.apache.omid.committable.CommitTable; @@ -41,6 +52,9 @@ import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig; import org.apache.omid.tso.TSOServer; import org.apache.omid.tso.TSOServerConfig; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -49,7 +63,11 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -83,6 +101,7 @@ public void setupTestSnapshotFilter() throws Exception { TSOServerConfig tsoConfig = new TSOServerConfig(); tsoConfig.setPort(5678); tsoConfig.setConflictMapSize(1); + tsoConfig.setWaitStrategy("LOW_CPU"); injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig)); hbaseConf = injector.getInstance(Configuration.class); hbaseConf.setBoolean("omid.server.side.filter", true); @@ -170,9 +189,9 @@ private TransactionManager newTransactionManager() throws Exception { .build(); } + @Test(timeOut = 60_000) public void testGetFirstResult() throws Throwable { - byte[] rowName1 = Bytes.toBytes("row1"); byte[] famName1 = Bytes.toBytes(TEST_FAMILY); byte[] colName1 = Bytes.toBytes("col1"); @@ -225,15 +244,289 @@ public void testGetFirstResult() throws Throwable { tt.close(); } + + // This test will fail if filtering is done before snapshot filtering @Test(timeOut = 60_000) - public void testGetSecondResult() throws Throwable { + public void testServerSideSnapshotFiltering() throws Throwable { + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + byte[] dataValue2 = Bytes.toBytes("testWrite-2"); + String TEST_TABLE = "testServerSideSnapshotFiltering"; + createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); + + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction tx1 = tm.begin(); + Put put1 = new Put(rowName1); + put1.add(famName1, colName1, dataValue1); + tt.put(tx1, put1); + tm.commit(tx1); + + Transaction tx2 = tm.begin(); + Put put2 = new Put(rowName1); + put2.add(famName1, colName1, dataValue2); + tt.put(tx2, put2); + + Transaction tx3 = tm.begin(); + Get get = new Get(rowName1); + + // If snapshot filtering is not done in the server then the first value is + // "testWrite-2" and the whole row will be filtered out. + SingleColumnValueFilter filter = new SingleColumnValueFilter( + famName1, + colName1, + CompareFilter.CompareOp.EQUAL, + new SubstringComparator("testWrite-1")); + + get.setFilter(filter); + Result results = tt.get(tx3, get); + assertTrue(results.size() == 1); + } + + + // This test will fail if filtering is done before snapshot filtering + @Test(timeOut = 60_000) + public void testServerSideSnapshotScannerFiltering() throws Throwable { byte[] rowName1 = Bytes.toBytes("row1"); byte[] famName1 = Bytes.toBytes(TEST_FAMILY); byte[] colName1 = Bytes.toBytes("col1"); byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - String TEST_TABLE = "testGetFirstResult"; + String TEST_TABLE = "testServerSideSnapshotFiltering"; + createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); + + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction tx1 = tm.begin(); + Put put1 = new Put(rowName1); + put1.add(famName1, colName1, dataValue1); + tt.put(tx1, put1); + tm.commit(tx1); + + Transaction tx2 = tm.begin(); + Put put2 = new Put(rowName1); + put2.add(famName1, colName1, dataValue2); +// tt.put(tx2, put2); + + Transaction tx3 = tm.begin(); + + // If snapshot filtering is not done in the server then the first value is + // "testWrite-2" and the whole row will be filtered out. + SingleColumnValueFilter filter = new SingleColumnValueFilter( + famName1, + colName1, + CompareFilter.CompareOp.EQUAL, + new SubstringComparator("testWrite-1")); + + + Scan scan = new Scan(); + scan.setFilter(filter); + + ResultScanner iterableRS = tt.getScanner(tx3, scan); + Result result = iterableRS.next(); + + assertTrue(result.size() == 1); + } + + + @Test(timeOut = 60_000) + public void testGetWithFamilyDelete() throws Throwable { + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] famName2 = Bytes.toBytes("test-fam2"); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] colName2 = Bytes.toBytes("col2"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + + String TEST_TABLE = "testGetWithFamilyDelete"; + createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction tx1 = tm.begin(); + + Put put1 = new Put(rowName1); + put1.add(famName1, colName1, dataValue1); + tt.put(tx1, put1); + + tm.commit(tx1); + + Transaction tx2 = tm.begin(); + Put put2 = new Put(rowName1); + put2.add(famName2, colName2, dataValue1); + tt.put(tx2, put2); + tm.commit(tx2); + + Transaction tx3 = tm.begin(); + + Delete d = new Delete(rowName1); + d.deleteFamily(famName2); + tt.delete(tx3, d); + + + Transaction tx4 = tm.begin(); + + Get get = new Get(rowName1); + + Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE, + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))), + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2))); + + get.setFilter(filter1); + Result result = tt.get(tx4, get); + assertTrue(result.size() == 2, "Result should be 2"); + + tm.commit(tx3); + + Transaction tx5 = tm.begin(); + result = tt.get(tx5, get); + assertTrue(result.size() == 1, "Result should be 1"); + + tt.close(); + } + + @Test(timeOut = 60_000) + public void testReadFromCommitTable() throws Exception { + final byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + final String TEST_TABLE = "testReadFromCommitTable"; + final byte[] famName2 = Bytes.toBytes("test-fam2"); + + final CountDownLatch readAfterCommit = new CountDownLatch(1); + final CountDownLatch postCommitBegin = new CountDownLatch(1); + + final AtomicBoolean readFailed = new AtomicBoolean(false); + final AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(); + createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2); + + doAnswer(new Answer>() { + @Override + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { + LOG.info("Releasing readAfterCommit barrier"); + readAfterCommit.countDown(); + LOG.info("Waiting postCommitBegin barrier"); + postCommitBegin.await(); + ListenableFuture result = (ListenableFuture) invocation.callRealMethod(); + return result; + } + }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class)); + + Thread readThread = new Thread("Read Thread") { + @Override + public void run() { + + try { + LOG.info("Waiting readAfterCommit barrier"); + readAfterCommit.await(); + + Transaction tx4 = tm.begin(); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + Get get = new Get(rowName1); + + Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE, + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))), + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2))); + + get.setFilter(filter1); + Result result = tt.get(tx4, get); + + if (result.size() == 2) { + readFailed.set(false); + } + else { + readFailed.set(false); + } + + postCommitBegin.countDown(); + } catch (Throwable e) { + readFailed.set(false); + LOG.error("Error whilst reading", e); + } + } + }; + readThread.start(); + + TTable table = new TTable(hbaseConf, TEST_TABLE); + final HBaseTransaction t1 = (HBaseTransaction) tm.begin(); + Put put1 = new Put(rowName1); + put1.add(famName1, colName1, dataValue1); + table.put(t1, put1); + tm.commit(t1); + + readThread.join(); + + assertFalse(readFailed.get(), "Read should have succeeded"); + + } + + + + @Test(timeOut = 60_000) + public void testGetWithFilter() throws Throwable { + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] famName2 = Bytes.toBytes("test-fam2"); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] colName2 = Bytes.toBytes("col2"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + + String TEST_TABLE = "testGetWithFilter"; + createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction tx1 = tm.begin(); + + Put put1 = new Put(rowName1); + put1.add(famName1, colName1, dataValue1); + tt.put(tx1, put1); + + tm.commit(tx1); + + Transaction tx2 = tm.begin(); + Put put2 = new Put(rowName1); + put2.add(famName2, colName2, dataValue1); + tt.put(tx2, put2); + tm.commit(tx2); + + Transaction tx3 = tm.begin(); + + Get get = new Get(rowName1); + + Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE, + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))), + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2))); + + get.setFilter(filter1); + Result result = tt.get(tx3, get); + assertTrue(result.size() == 2, "Result should be 2"); + + + Filter filter2 = new FilterList(FilterList.Operator.MUST_PASS_ONE, + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY)))); + + get.setFilter(filter2); + result = tt.get(tx3, get); + assertTrue(result.size() == 1, "Result should be 2"); + + tm.commit(tx3); + + tt.close(); + } + + + @Test(timeOut = 60_000) + public void testGetSecondResult() throws Throwable { + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + + String TEST_TABLE = "testGetSecondResult"; createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); TTable tt = new TTable(hbaseConf, TEST_TABLE); @@ -273,7 +566,7 @@ public void testScanFirstResult() throws Throwable { byte[] colName1 = Bytes.toBytes("col1"); byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - String TEST_TABLE = "testGetFirstResult"; + String TEST_TABLE = "testScanFirstResult"; createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); TTable tt = new TTable(hbaseConf, TEST_TABLE); @@ -314,10 +607,62 @@ public void testScanFirstResult() throws Throwable { assertFalse(iterableRS2.next() != null); tm.commit(tx4); + tt.close(); + } + + @Test(timeOut = 60_000) + public void testScanWithFilter() throws Throwable { + + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] famName2 = Bytes.toBytes("test-fam2"); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] colName2 = Bytes.toBytes("col2"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + + String TEST_TABLE = "testScanWithFilter"; + createTableIfNotExists(TEST_TABLE, famName1, famName2); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction tx1 = tm.begin(); + Put put1 = new Put(rowName1); + put1.add(famName1, colName1, dataValue1); + tt.put(tx1, put1); + tm.commit(tx1); + + Transaction tx2 = tm.begin(); + Put put2 = new Put(rowName1); + put2.add(famName2, colName2, dataValue1); + tt.put(tx2, put2); + + tm.commit(tx2); + Transaction tx3 = tm.begin(); + + Scan scan = new Scan(); + scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))))); + scan.setStartRow(rowName1).setStopRow(rowName1); + + ResultScanner iterableRS = tt.getScanner(tx3, scan); + Result result = iterableRS.next(); + assertTrue(result.containsColumn(famName1, colName1)); + assertFalse(result.containsColumn(famName2, colName2)); + + scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))), + new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)))); + + iterableRS = tt.getScanner(tx3, scan); + result = iterableRS.next(); + assertTrue(result.containsColumn(famName1, colName1)); + assertTrue(result.containsColumn(famName2, colName2)); + + tm.commit(tx3); tt.close(); } + @Test(timeOut = 60_000) public void testScanSecondResult() throws Throwable { @@ -326,7 +671,7 @@ public void testScanSecondResult() throws Throwable { byte[] colName1 = Bytes.toBytes("col1"); byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - String TEST_TABLE = "testGetFirstResult"; + String TEST_TABLE = "testScanSecondResult"; createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); TTable tt = new TTable(hbaseConf, TEST_TABLE); @@ -370,7 +715,7 @@ public void testScanFewResults() throws Throwable { byte[] dataValue1 = Bytes.toBytes("testWrite-1"); byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - String TEST_TABLE = "testGetFirstResult"; + String TEST_TABLE = "testScanFewResults"; createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); TTable tt = new TTable(hbaseConf, TEST_TABLE); @@ -420,7 +765,7 @@ public void testScanFewResultsDifferentTransaction() throws Throwable { byte[] dataValue1 = Bytes.toBytes("testWrite-1"); byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - String TEST_TABLE = "testGetFirstResult"; + String TEST_TABLE = "testScanFewResultsDifferentTransaction"; createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); TTable tt = new TTable(hbaseConf, TEST_TABLE); @@ -473,7 +818,7 @@ public void testScanFewResultsSameTransaction() throws Throwable { byte[] dataValue1 = Bytes.toBytes("testWrite-1"); byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - String TEST_TABLE = "testGetFirstResult"; + String TEST_TABLE = "testScanFewResultsSameTransaction"; createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); TTable tt = new TTable(hbaseConf, TEST_TABLE); diff --git a/pom.xml b/pom.xml index b476a8940..b2e36e7b7 100644 --- a/pom.xml +++ b/pom.xml @@ -138,7 +138,7 @@ 1.1.1 14.0.1 3.0 - 6.8.8 + 6.10 1.7.7 1.2.17 3.2.6.Final diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java index b22f024b7..2581d184e 100644 --- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java +++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java @@ -38,7 +38,7 @@ */ public abstract class AbstractTransaction implements Transaction { - enum VisibilityLevel { + public enum VisibilityLevel { // Regular snapshot isolation. Returns the last key, either from the snapshot or from the current transaction // Sets the readTimestamp to be the writeTimestamp SNAPSHOT, @@ -78,6 +78,7 @@ public static VisibilityLevel fromInteger(int number) { private long commitTimestamp; private boolean isRollbackOnly; private final Set writeSet; + private final Set conflictFreeWriteSet; private Status status = Status.RUNNING; private VisibilityLevel visibilityLevel; @@ -92,6 +93,9 @@ public static VisibilityLevel fromInteger(int number) { * @param writeSet * initial write set for the transaction. * Should be empty in most cases. + * @param conflictFreeWriteSet + * initial conflict free write set for the transaction. + * Should be empty in most cases. * @param transactionManager * transaction manager associated to this transaction. * Usually, should be the one that created the transaction @@ -100,8 +104,9 @@ public static VisibilityLevel fromInteger(int number) { public AbstractTransaction(long transactionId, long epoch, Set writeSet, + Set conflictFreeWriteSet, AbstractTransactionManager transactionManager) { - this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, transactionManager); + this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet, transactionManager); } public AbstractTransaction(long transactionId, @@ -109,28 +114,69 @@ public AbstractTransaction(long transactionId, VisibilityLevel visibilityLevel, long epoch, Set writeSet, + Set conflictFreeWriteSet, AbstractTransactionManager transactionManager) { this.startTimestamp = this.writeTimestamp = transactionId; this.readTimestamp = readTimestamp; this.epoch = epoch; this.writeSet = writeSet; + this.conflictFreeWriteSet = conflictFreeWriteSet; this.transactionManager = transactionManager; this.visibilityLevel = visibilityLevel; } + /** + * Base constructor + * + * @param transactionId + * transaction identifier to assign + * @param epoch + * epoch of the TSOServer instance that created this transaction + * Used in High Availability to guarantee data consistency + * @param writeSet + * initial write set for the transaction. + * Should be empty in most cases. + * @param transactionManager + * transaction manager associated to this transaction. + * Usually, should be the one that created the transaction + * instance. + * @param readTimestamp + * the snapshot to read from + * @param writeTimestamp + * the timestamp to write to + * + */ + public AbstractTransaction(long transactionId, + long epoch, + Set writeSet, + Set conflictFreeWriteSet, + AbstractTransactionManager transactionManager, + long readTimestamp, + long writeTimestamp) { + this.startTimestamp = transactionId; + this.readTimestamp = readTimestamp; + this.writeTimestamp = writeTimestamp; + this.epoch = epoch; + this.writeSet = writeSet; + this.conflictFreeWriteSet = conflictFreeWriteSet; + this.transactionManager = transactionManager; + this.visibilityLevel = VisibilityLevel.SNAPSHOT; + } + /** * Creates a checkpoint and sets the visibility level to SNAPSHOT_EXCLUDE_CURRENT * The number of checkpoints is bounded to NUM_CHECKPOINTS in order to make checkpoint a client side operation * @return true if a checkpoint was created and false otherwise * @throws TransactionException */ - void checkpoint() throws TransactionException { + public void checkpoint() throws TransactionException { setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); this.readTimestamp = this.writeTimestamp++; if (this.writeTimestamp % AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN == 0) { - throw new TransactionException("Error: number of checkpoing cannot exceed " + (AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN - 1)); + throw new TransactionException("Error: number of checkpoing cannot exceed " + + (AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN - 1)); } } @@ -199,6 +245,7 @@ public long getStartTimestamp() { * Returns the read timestamp for this transaction. * @return read timestamp */ + @Override public long getReadTimestamp() { return readTimestamp; } @@ -207,6 +254,7 @@ public long getReadTimestamp() { * Returns the write timestamp for this transaction. * @return write timestamp */ + @Override public long getWriteTimestamp() { return writeTimestamp; } @@ -269,6 +317,14 @@ public Set getWriteSet() { return writeSet; } + /** + * Returns the current write-set for this transaction that its elements are not candidates for conflict analysis. + * @return conflictFreeWriteSet + */ + public Set getConflictFreeWriteSet() { + return conflictFreeWriteSet; + } + /** * Adds an element to the transaction write-set. * @param element @@ -278,9 +334,18 @@ public void addWriteSetElement(T element) { writeSet.add(element); } + /** + * Adds an element to the transaction conflict free write-set. + * @param element + * the element to add + */ + public void addConflictFreeWriteSetElement(T element) { + conflictFreeWriteSet.add(element); + } + @Override public String toString() { - return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s", + return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s ConflictFreeWriteSet %s", Long.toHexString(getTransactionId()), status, startTimestamp, @@ -288,7 +353,8 @@ public String toString() { writeTimestamp, commitTimestamp, epoch, - writeSet); + writeSet, + conflictFreeWriteSet); } @Override diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java index ec0302cd7..573f63135 100644 --- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java +++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java @@ -243,7 +243,7 @@ public final void commit(Transaction transaction) throws RollbackException, Tran commitTimer.start(); try { - if (tx.getWriteSet().isEmpty()) { + if (tx.getWriteSet().isEmpty() && tx.getConflictFreeWriteSet().isEmpty()) { markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server } else { commitRegularTransaction(tx); @@ -359,7 +359,7 @@ private void commitRegularTransaction(AbstractTransaction tx) try { - long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet()).get(); + long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get(); certifyCommitForTx(tx, commitTs); updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter); diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java b/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java index b9405c654..fdc5b3f69 100644 --- a/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java +++ b/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java @@ -46,6 +46,19 @@ enum Status { */ Status getStatus(); + /** + * Returns the read timestamp for this transaction. + * @return read timestamp + */ + long getReadTimestamp(); + + /** + * Returns the write timestamp for this transaction. + * @return write timestamp + */ + + long getWriteTimestamp(); + /** * Forces the transaction to rollback, even when there's an intention * to commit it. diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java index 344d343ce..e18befcad 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java @@ -102,6 +102,11 @@ private boolean hasConflictsWithCommittedTransactions(long transactionId, Set commit(long transactionId, Set cells, Set conflictFreeWriteSet) { + return commit(transactionId, cells); + } + @Override public TSOFuture commit(long transactionId, Set cells) { synchronized (conflictMap) { diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java index 1c6287613..25e124e82 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java @@ -201,6 +201,14 @@ public TSOFuture getNewStartTimestamp() { */ @Override public TSOFuture commit(long transactionId, Set cells) { + return commit(transactionId, cells, new HashSet()); + } + + /** + * @see TSOProtocol#commit(long, Set, Set) + */ + @Override + public TSOFuture commit(long transactionId, Set cells, Set conflictFreeWriteSet) { TSOProto.Request.Builder builder = TSOProto.Request.newBuilder(); TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder(); commitbuilder.setStartTimestamp(transactionId); @@ -229,6 +237,11 @@ public TSOFuture commit(long transactionId, Set cells) { commitbuilder.addCellId(id); tableIDs.add(cell.getTableId()); } + + for (CellId cell : conflictFreeWriteSet) { + tableIDs.add(cell.getTableId()); + } + commitbuilder.addAllTableId(tableIDs); tableIDs.clear(); builder.setCommitRequest(commitbuilder.build()); @@ -307,6 +320,13 @@ public ConflictDetectionLevel getConflictDetectionLevel() { return conflictDetectionLevel; } + /** + * Used for family deletion testing + */ + public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) { + this.conflictDetectionLevel = conflictDetectionLevel; + } + // ---------------------------------------------------------------------------------------------------------------- // NodeCacheListener interface // ---------------------------------------------------------------------------------------------------------------- diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java index 5ad63262c..343610f03 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java @@ -50,6 +50,22 @@ public interface TSOProtocol { */ TSOFuture commit(long transactionId, Set writeSet); + /** + * Returns the result of the conflict detection made on the server-side for the specified transaction + * @param transactionId + * the transaction to check for conflicts + * @param writeSet + * the writeSet of the transaction, which includes all the modified cells + * @param conflictFreeWriteSet + * the conflict free writeSet of the transaction, needed only for table access information. + * @return the commit timestamp as a future if the transaction was committed. If the transaction was aborted due + * to conflicts with a concurrent transaction, the future will include an AbortException. If an error was detected, + * the future will contain a corresponding protocol exception + * see org.apache.omid.tso.TimestampOracle + * see org.apache.omid.tso.TSOServer + */ + TSOFuture commit(long transactionId, Set writeSet, Set conflictFreeWriteSet); + /** * Returns a new fence timestamp assigned by on the server-side * @param tableId diff --git a/transaction-client/src/test/java/org/apache/omid/tso/client/TestMockTSOClient.java b/transaction-client/src/test/java/org/apache/omid/tso/client/TestMockTSOClient.java index 054eb65ee..b68b552f4 100644 --- a/transaction-client/src/test/java/org/apache/omid/tso/client/TestMockTSOClient.java +++ b/transaction-client/src/test/java/org/apache/omid/tso/client/TestMockTSOClient.java @@ -18,11 +18,13 @@ package org.apache.omid.tso.client; import com.google.common.collect.Sets; + import org.apache.omid.committable.CommitTable; import org.apache.omid.committable.InMemoryCommitTable; import org.apache.omid.tso.util.DummyCellIdImpl; import org.testng.annotations.Test; +import java.util.HashSet; import java.util.concurrent.ExecutionException; import static org.testng.Assert.assertEquals; @@ -42,10 +44,10 @@ public void testConflicts() throws Exception { long tr1 = client.getNewStartTimestamp().get(); long tr2 = client.getNewStartTimestamp().get(); - client.commit(tr1, Sets.newHashSet(c1)).get(); + client.commit(tr1, Sets.newHashSet(c1), new HashSet()).get(); try { - client.commit(tr2, Sets.newHashSet(c1, c2)).get(); + client.commit(tr2, Sets.newHashSet(c1, c2), new HashSet()).get(); fail("Shouldn't have committed"); } catch (ExecutionException ee) { assertEquals(ee.getCause().getClass(), AbortException.class, "Should have aborted"); @@ -59,12 +61,12 @@ public void testWatermarkUpdate() throws Exception { CommitTable.Client commitTableClient = commitTable.getClient(); long tr1 = client.getNewStartTimestamp().get(); - client.commit(tr1, Sets.newHashSet(c1)).get(); + client.commit(tr1, Sets.newHashSet(c1), new HashSet()).get(); long initWatermark = commitTableClient.readLowWatermark().get(); long tr2 = client.getNewStartTimestamp().get(); - client.commit(tr2, Sets.newHashSet(c1)).get(); + client.commit(tr2, Sets.newHashSet(c1), new HashSet()).get(); long newWatermark = commitTableClient.readLowWatermark().get(); assertTrue(newWatermark > initWatermark, "new low watermark should be bigger"); diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java index a6d63c7b8..a704453eb 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java +++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java @@ -30,7 +30,8 @@ import java.util.concurrent.ExecutionException; import static com.codahale.metrics.MetricRegistry.name; -import static org.apache.omid.tso.PersistEvent.Type.*; +import static org.apache.omid.tso.PersistEvent.Type.COMMIT_RETRY; + public class PersistenceProcessorHandler implements WorkHandler { diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java index 19d9f0155..f30e64d32 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java +++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java @@ -62,7 +62,7 @@ public class TSOServer extends AbstractIdleService { // ---------------------------------------------------------------------------------------------------------------- - static TSOServer getInitializedTsoServer(TSOServerConfig config) throws IOException { + public static TSOServer getInitializedTsoServer(TSOServerConfig config) throws IOException { LOG.info("Configuring TSO Server..."); Injector injector = Guice.createInjector(buildModuleList(config)); LOG.info("TSO Server configured. Creating instance..."); diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java index 17fd2e003..fc30e605a 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java @@ -26,6 +26,7 @@ import org.apache.omid.metrics.MetricsRegistry; import org.apache.omid.metrics.NullMetricsProvider; import org.apache.omid.timestamp.storage.TimestampStorage; +import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE; import org.apache.omid.tso.TimestampOracleImpl.InMemoryTimestampStorage; import javax.inject.Named; @@ -51,7 +52,11 @@ protected void configure() { bind(TSOStateManager.class).to(TSOStateManagerImpl.class).in(Singleton.class); bind(CommitTable.class).to(InMemoryCommitTable.class).in(Singleton.class); bind(TimestampStorage.class).to(InMemoryTimestampStorage.class).in(Singleton.class); - bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class); + if (config.getTimestampTypeEnum() == TIMESTAMP_TYPE.WORLD_TIME) { + bind(TimestampOracle.class).to(WorldClockOracleImpl.class).in(Singleton.class); + } else { + bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class); + } bind(Panicker.class).to(MockPanicker.class).in(Singleton.class); install(new BatchPoolModule(config));