Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9c48cfd
[OMID-83] Attributes added to Put, Get, and Scan are not propagated t…
Feb 7, 2018
7d6c030
[OMID-84] Today, all the writes done by a transaction are taking part…
Feb 7, 2018
ea4c739
[OMID-85] Writing directly to HBase using specific version marks the …
Feb 7, 2018
f3d980c
[OMID-70] - reopen in order to bind WorldClockOracleImpl in TSOMockMo…
Mar 22, 2018
540e22f
[OMID-74] - A bug fix. The update of the write set is
Mar 22, 2018
0515a94
[OMID-93] This commit adds an option to add commit
Mar 22, 2018
a4ab246
[OMID-94] Tune Omid for Phoenix testing environment.
Mar 22, 2018
76400f6
[OMID-96] Enable compactor on all column families during initializati…
Apr 3, 2018
5ab09f3
[OMID-99] Change TestNG version to 6.10.
May 22, 2018
d4dab87
[OMID-100] James Taylor's patch to:
Jun 3, 2018
0f14b27
[OMID-72] bug fix, accessed tables should be sent to transaction mana…
Jun 5, 2018
41554ec
Support for user Filter when using coprocessor for snapshot filtering
yonigottesman Jul 25, 2018
ba57538
In coprocessor filtering, get shadow cell of delete family before goi…
yonigottesman Jul 25, 2018
26469a0
add inMemoryCommitTable client option in omid coprocessor for testing
yonigottesman Jul 26, 2018
8d63267
Fix visibilityFilter to check if delete family is in current TX
yonigottesman Jul 31, 2018
7d0986e
[OMID-105] When a tentative family deletion marker is
Jul 31, 2018
1fdd260
[OMID-106] Delete should use write timestamp when writing
Jul 31, 2018
b2980c7
Merge OMID-105 changes
yonigottesman Aug 1, 2018
575c437
fix server filtering
yonigottesman Aug 6, 2018
c890c00
fix snapshot filter to work with SNAPSHOT_ALL in client
yonigottesman Aug 6, 2018
2071125
Add server side filtering test. Improve server filtering memory usage
yonigottesman Aug 7, 2018
46b0c81
remove wildcard importas
yonigottesman Aug 7, 2018
a79d375
Fix coprocessor scanning bug. add scan with filter test
yonigottesman Aug 8, 2018
a86afc5
Add CellSkipFilter
yonigottesman Aug 16, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,23 @@ private TSOProto.Transaction.Builder getBuilder(HBaseTransaction transaction) {
}

@Override
public Result get(TTable ttable, Get get, HBaseTransaction transaction) throws IOException {
public Result get(Get get, HBaseTransaction transaction) throws IOException {
get.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray());
get.setAttribute(CellUtils.CLIENT_GET_ATTRIBUTE, Bytes.toBytes(true));

return table.get(get);
}

@Override
public ResultScanner getScanner(TTable ttable, Scan scan, HBaseTransaction transaction) throws IOException {
public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException {
scan.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray());

return table.getScanner(scan);
}

@Override
public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
int versionsToRequest, Map<String, List<Cell>> familyDeletionCache) throws IOException {
int versionsToRequest, Map<String, Long> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commit
this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
}

private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture) {
Put put = new Put(cell.getRow());
put.add(cell.getFamily(),
CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
cell.getTimestamp(),
Bytes.toBytes(tx.getCommitTimestamp()));
try {
cell.getTable().put(put);
} catch (IOException e) {
LOG.warn("{}: Error inserting shadow cell {}", tx, cell, e);
updateSCFuture.setException(
new TransactionManagerException(tx + ": Error inserting shadow cell " + cell, e));
}
}

@Override
public ListenableFuture<Void> updateShadowCells(AbstractTransaction<? extends CellId> transaction) {

Expand All @@ -63,18 +78,11 @@ public ListenableFuture<Void> updateShadowCells(AbstractTransaction<? extends Ce

// Add shadow cells
for (HBaseCellId cell : tx.getWriteSet()) {
Put put = new Put(cell.getRow());
put.add(cell.getFamily(),
CellUtils.addShadowCellSuffix(cell.getQualifier(), 0, cell.getQualifier().length),
cell.getTimestamp(),
Bytes.toBytes(tx.getCommitTimestamp()));
try {
cell.getTable().put(put);
} catch (IOException e) {
LOG.warn("{}: Error inserting shadow cell {}", tx, cell, e);
updateSCFuture.setException(
new TransactionManagerException(tx + ": Error inserting shadow cell " + cell, e));
}
addShadowCell(cell, tx, updateSCFuture);
}

for (HBaseCellId cell : tx.getConflictFreeWriteSet()) {
addShadowCell(cell, tx, updateSCFuture);
}

// Flush affected tables before returning to avoid loss of shadow cells updates when autoflush is disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,35 @@
public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class);

public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet, AbstractTransactionManager tm) {
super(transactionId, epoch, writeSet, tm);
public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm) {
super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm);
}

public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch, Set<HBaseCellId> writeSet, AbstractTransactionManager tm) {
super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, tm);
public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm, long readTimestamp, long writeTimestamp) {
super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, readTimestamp, writeTimestamp);
}

public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch, Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm) {
super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm);
}

private void deleteCell(HBaseCellId cell) {
Delete delete = new Delete(cell.getRow());
delete.deleteColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
try {
cell.getTable().delete(delete);
} catch (IOException e) {
LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);
}
}
@Override
public void cleanup() {
Set<HBaseCellId> writeSet = getWriteSet();
for (final HBaseCellId cell : writeSet) {
Delete delete = new Delete(cell.getRow());
delete.deleteColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
try {
cell.getTable().delete(delete);
} catch (IOException e) {
LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);
}
for (final HBaseCellId cell : getWriteSet()) {
deleteCell(cell);
}

for (final HBaseCellId cell : getConflictFreeWriteSet()) {
deleteCell(cell);
}
try {
flushTables();
Expand Down Expand Up @@ -80,6 +90,10 @@ private Set<HTableInterface> getWrittenTables() {
for (HBaseCellId cell : writeSet) {
tables.add(cell.getTable());
}
writeSet = (HashSet<HBaseCellId>) getConflictFreeWriteSet();
for (HBaseCellId cell : writeSet) {
tables.add(cell.getTable());
}
return tables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
*/
package org.apache.omid.transaction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.tools.hbase.HBaseLogin;
Expand Down Expand Up @@ -53,7 +50,7 @@ private static class HBaseTransactionFactory implements TransactionFactory<HBase
@Override
public HBaseTransaction createTransaction(long transactionId, long epoch, AbstractTransactionManager tm) {

return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), tm);
return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), tm);

}

Expand All @@ -74,8 +71,7 @@ public static TransactionManager newInstance(HBaseOmidClientConfiguration config
return builder(configuration).build();
}

@VisibleForTesting
static class Builder {
public static class Builder {

// Required parameters
private final HBaseOmidClientConfiguration hbaseOmidClientConf;
Expand All @@ -89,12 +85,12 @@ private Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
this.hbaseOmidClientConf = hbaseOmidClientConf;
}

Builder tsoClient(TSOClient tsoClient) {
public Builder tsoClient(TSOClient tsoClient) {
this.tsoClient = Optional.of(tsoClient);
return this;
}

Builder commitTableClient(CommitTable.Client client) {
public Builder commitTableClient(CommitTable.Client client) {
this.commitTableClient = Optional.of(client);
return this;
}
Expand All @@ -104,7 +100,7 @@ Builder postCommitter(PostCommitActions postCommitter) {
return this;
}

HBaseTransactionManager build() throws IOException, InterruptedException {
public HBaseTransactionManager build() throws IOException, InterruptedException {

CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
Expand Down Expand Up @@ -152,8 +148,7 @@ private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commit

}

@VisibleForTesting
static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
public static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
return new Builder(hbaseOmidClientConf);
}

Expand Down Expand Up @@ -229,6 +224,10 @@ static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction<? ext

}

public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
tsoClient.setConflictDetectionLevel(conflictDetectionLevel);
}

public ConflictDetectionLevel getConflictDetectionLevel() {
return tsoClient.getConflictDetectionLevel();
}
Expand Down Expand Up @@ -265,7 +264,7 @@ public Optional<Long> readCommitTimestampFromShadowCell(long startTimestamp) thr

Get get = new Get(hBaseCellId.getRow());
byte[] family = hBaseCellId.getFamily();
byte[] shadowCellQualifier = CellUtils.addShadowCellSuffix(hBaseCellId.getQualifier());
byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(hBaseCellId.getQualifier());
get.addColumn(family, shadowCellQualifier);
get.setMaxVersions(1);
get.setTimeStamp(startTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
*/
package org.apache.omid.transaction;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;

import java.io.IOException;
import java.util.List;



// This class wraps the HTableInterface object when doing client side filtering.
public class HTableAccessWrapper implements TableAccessWrapper {
Expand Down Expand Up @@ -51,4 +55,9 @@ public void put(Put put) throws IOException {
writeTable.put(put);
}

@Override
public ResultScanner getScanner(Scan scan) throws IOException {
return readTable.getScanner(scan);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@

public interface SnapshotFilter {

public Result get(TTable ttable, Get get, HBaseTransaction transaction) throws IOException;
public Result get(Get get, HBaseTransaction transaction) throws IOException;

public ResultScanner getScanner(TTable ttable, Scan scan, HBaseTransaction transaction) throws IOException;
public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException;

public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
int versionsToRequest, Map<String, List<Cell>> familyDeletionCache) throws IOException;
int versionsToRequest, Map<String, Long> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException;

public boolean isCommitted(HBaseCellId hBaseCellId, long epoch) throws TransactionException;

Expand Down
Loading