diff --git a/hbase-coprocessor/pom.xml b/hbase-coprocessor/pom.xml
index 8af093c2a..74804a4ce 100644
--- a/hbase-coprocessor/pom.xml
+++ b/hbase-coprocessor/pom.xml
@@ -53,6 +53,11 @@
org.apache.hbase
hbase-server
+
+ org.apache.hbase
+ hbase-hadoop-compat
+ ${hbase.version}
+
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java
index 12ecbb027..b79b1afac 100644
--- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java
@@ -25,6 +25,7 @@
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
+import org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics;
import org.apache.omid.transaction.CellUtils;
import org.apache.omid.transaction.CellInfo;
import org.apache.hadoop.hbase.Cell;
@@ -62,14 +63,20 @@ public class CompactorScanner implements InternalScanner {
private final Region hRegion;
private boolean hasMoreRows = false;
- private List currentRowWorthValues = new ArrayList();
+ private List currentRowWorthValues = new ArrayList<>();
+
+ // Metrics
+ private CompactorCoprocessorMetrics metrics;
public CompactorScanner(ObserverContext e,
InternalScanner internalScanner,
Client commitTableClient,
Queue commitTableClientQueue,
boolean isMajorCompaction,
- boolean preserveNonTransactionallyDeletedCells) throws IOException {
+ boolean preserveNonTransactionallyDeletedCells,
+ CompactorCoprocessorMetrics metrics) throws IOException
+ {
+
this.internalScanner = internalScanner;
this.commitTableClient = commitTableClient;
this.commitTableClientQueue = commitTableClientQueue;
@@ -78,6 +85,7 @@ public CompactorScanner(ObserverContext e,
this.lowWatermark = getLowWatermarkFromCommitTable();
// Obtain the table in which the scanner is going to operate
this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
+ this.metrics = metrics;
LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
lowWatermark, hRegion.getRegionInfo());
}
@@ -101,6 +109,9 @@ public boolean next(List result, int limit) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
}
+ if (hasMoreRows) {
+ metrics.incrScannedRows();
+ }
// 2) Traverse result list separating normal cells from shadow
// cells and building a map to access easily the shadow cells.
SortedMap> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
@@ -111,17 +122,21 @@ public boolean next(List result, int limit) throws IOException {
PeekingIterator>> iter
= Iterators.peekingIterator(cellToSc.entrySet().iterator());
while (iter.hasNext()) {
+ long cellProcessingTimeStartTimeInMs = System.currentTimeMillis();
+ metrics.incrTotalCells();
Map.Entry> entry = iter.next();
Cell cell = entry.getKey();
Optional shadowCellOp = entry.getValue();
if (cell.getTimestamp() > lowWatermark) {
retain(currentRowWorthValues, cell, shadowCellOp);
+ metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
continue;
}
if (shouldRetainNonTransactionallyDeletedCell(cell)) {
retain(currentRowWorthValues, cell, shadowCellOp);
+ metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
continue;
}
@@ -142,6 +157,8 @@ public boolean next(List result, int limit) throws IOException {
skipToNextColumn(cell, iter);
}
}
+ metrics.incrTombstoneCells();
+ metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
continue;
}
}
@@ -155,12 +172,16 @@ public boolean next(List result, int limit) throws IOException {
byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
+ metrics.incrHealedShadowCells();
} else {
LOG.trace("Discarding cell {}", cell);
+ metrics.incrDiscardedCells();
}
}
+ metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
}
retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
+ metrics.incrRetainedCells(lastTimestampedCellsInRow.values().size());
// 4) Sort the list
Collections.sort(currentRowWorthValues, KeyValue.COMPARATOR);
@@ -229,7 +250,9 @@ private long getLowWatermarkFromCommitTable() throws IOException {
private Optional queryCommitTimestamp(Cell cell) throws IOException {
try {
+ long queryCommitTableStartTimeInMs = System.currentTimeMillis();
Optional ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
+ metrics.updateCommitTableQueryTime(System.currentTimeMillis() - queryCommitTableStartTimeInMs);
if (ct.isPresent()) {
return Optional.of(ct.get());
} else {
@@ -276,11 +299,16 @@ private void retainLastTimestampedCellsSaved(List| result, Map>> iter) {
+
+ int skippedCellsCount = 0;
while (iter.hasNext()
&& CellUtil.matchingFamily(iter.peek().getKey(), cell)
&& CellUtil.matchingQualifier(iter.peek().getKey(), cell)) {
iter.next();
+ skippedCellsCount++;
}
+ metrics.incrSkippedCells(skippedCellsCount);
+
}
}
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/hbase/coprocessor/metrics/CompactorCoprocessorMetrics.java b/hbase-coprocessor/src/main/java/org/apache/omid/hbase/coprocessor/metrics/CompactorCoprocessorMetrics.java
new file mode 100644
index 000000000..f0e4acd8c
--- /dev/null
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/hbase/coprocessor/metrics/CompactorCoprocessorMetrics.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.hbase.coprocessor.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+
+public class CompactorCoprocessorMetrics extends BaseSourceImpl implements CompactorCoprocessorMetricsSource {
+
+ private static final String METRICS_NAME = "CompactorCoprocessor";
+ private static final String METRICS_CONTEXT = "omid.coprocessor.compactor";
+ private static final String METRICS_DESCRIPTION = "Omid Compactor Coprocessor Metrics";
+ private static final String METRICS_JMX_CONTEXT = "Omid,sub=" + METRICS_NAME;
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Metrics
+ // ----------------------------------------------------------------------------------------------------------------
+
+ // Histogram-related keys & descriptions
+ static final String COMPACTIONS_KEY = "compactions";
+ static final String COMPACTIONS_DESC = "Histogram about Compactions";
+ static final String CELL_PROCESSING_KEY = "cellProcessing";
+ static final String CELL_PROCESSING_DESC = "Histogram about Cell Processing";
+ static final String COMMIT_TABLE_QUERY_KEY = "commitTableQuery";
+ static final String COMMIT_TABLE_QUERY_DESC = "Histogram about Commit Table Query";
+
+ // Counter-related keys & descriptions
+ static final String MAJOR_COMPACTION_KEY = "major-compactions";
+ static final String MAJOR_COMPACTION_DESC = "Number of major compactions";
+ static final String MINOR_COMPACTION_KEY = "minor-compactions";
+ static final String MINOR_COMPACTION_DESC = "Number of minor compactions";
+ static final String SCANNED_ROWS_KEY = "scanned-rows";
+ static final String SCANNED_ROWS_DESC = "Number of rows scanned";
+ static final String TOTAL_CELLS_KEY = "total-cells";
+ static final String TOTAL_CELLS_DESC = "Number of cells processed";
+ static final String RETAINED_CELLS_KEY = "retained-cells";
+ static final String RETAINED_CELLS_DESC = "Number of cells retained when compacting";
+ static final String SKIPPED_CELLS_KEY = "skipped-cells";
+ static final String SKIPPED_CELLS_DESC = "Number of cells skipped when compacting";
+ static final String HEALED_SHADOW_CELLS_KEY = "healed-shadow-cells";
+ static final String HEALED_SHADOW_CELLS_DESC = "Number of cells healed when compacting";
+ static final String DISCARDED_CELLS_KEY = "discarded-cells";
+ static final String DISCARDED_CELLS_DESC = "Number of cells discarded when compacting";
+ static final String TOMBSTONE_CELLS_KEY = "tombstone-cells";
+ static final String TOMBSTONE_CELLS_DESC = "Number of tombstone cells found when compacting";
+
+ // *************************** Elements **********************************/
+
+ // Histograms
+ private final MetricHistogram compactionsHistogram;
+ private final MetricHistogram cellProcessingHistogram;
+ private final MetricHistogram commitTableQueryHistogram;
+
+ // Counters
+ private final MetricMutableCounterLong majorCompactionsCounter;
+ private final MetricMutableCounterLong minorCompactionsCounter;
+ private final MetricMutableCounterLong scannedRowsCounter;
+ private final MetricMutableCounterLong totalCellsCounter;
+ private final MetricMutableCounterLong retainedCellsCounter;
+ private final MetricMutableCounterLong skippedCellsCounter;
+ private final MetricMutableCounterLong healedShadowCellsCounter;
+ private final MetricMutableCounterLong discardedCellsCounter;
+ private final MetricMutableCounterLong tombstoneCellsCounter;
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // End of Metrics
+ // ----------------------------------------------------------------------------------------------------------------
+
+ public CompactorCoprocessorMetrics() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+ }
+
+ public CompactorCoprocessorMetrics(String metricsName,
+ String metricsDescription,
+ String metricsContext,
+ String metricsJmxContext) {
+
+ super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+ // Histograms
+ compactionsHistogram = getMetricsRegistry().newHistogram(COMPACTIONS_KEY, COMPACTIONS_DESC);
+ cellProcessingHistogram = getMetricsRegistry().newHistogram(CELL_PROCESSING_KEY, CELL_PROCESSING_DESC);
+ commitTableQueryHistogram = getMetricsRegistry().newHistogram(COMMIT_TABLE_QUERY_KEY, COMMIT_TABLE_QUERY_DESC);
+
+ // Counters
+ majorCompactionsCounter = getMetricsRegistry().newCounter(MAJOR_COMPACTION_KEY, MAJOR_COMPACTION_DESC, 0L);
+ minorCompactionsCounter = getMetricsRegistry().newCounter(MINOR_COMPACTION_KEY, MINOR_COMPACTION_DESC, 0L);
+ scannedRowsCounter = getMetricsRegistry().newCounter(SCANNED_ROWS_KEY, SCANNED_ROWS_DESC, 0L);
+ totalCellsCounter = getMetricsRegistry().newCounter(TOTAL_CELLS_KEY, TOTAL_CELLS_DESC, 0L);
+ retainedCellsCounter = getMetricsRegistry().newCounter(RETAINED_CELLS_KEY, RETAINED_CELLS_DESC, 0L);
+ skippedCellsCounter = getMetricsRegistry().newCounter(SKIPPED_CELLS_KEY, SKIPPED_CELLS_DESC, 0L);
+ healedShadowCellsCounter = getMetricsRegistry().newCounter(HEALED_SHADOW_CELLS_KEY, HEALED_SHADOW_CELLS_DESC, 0L);
+ discardedCellsCounter = getMetricsRegistry().newCounter(DISCARDED_CELLS_KEY, DISCARDED_CELLS_DESC, 0L);
+ tombstoneCellsCounter = getMetricsRegistry().newCounter(TOMBSTONE_CELLS_KEY, TOMBSTONE_CELLS_DESC, 0L);
+
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // CompactorCoprocessorMetricsSource Interface Impl
+ // ----------------------------------------------------------------------------------------------------------------
+
+ @Override
+ public void updateCompactionTime(long timeInMs) {
+ compactionsHistogram.add(timeInMs);
+ }
+
+ @Override
+ public void updateCellProcessingTime(long timeInMs) {
+ cellProcessingHistogram.add(timeInMs);
+ }
+
+ @Override
+ public void updateCommitTableQueryTime(long timeInMs) {
+ commitTableQueryHistogram.add(timeInMs);
+ }
+
+ @Override
+ public void incrMajorCompactions() {
+ majorCompactionsCounter.incr();
+ }
+
+ @Override
+ public void incrMinorCompactions() {
+ minorCompactionsCounter.incr();
+ }
+
+ @Override
+ public void incrScannedRows() {
+ scannedRowsCounter.incr();
+ }
+
+ @Override
+ public void incrTotalCells() {
+ totalCellsCounter.incr();
+ }
+
+ @Override
+ public void incrRetainedCells(long delta) {
+ retainedCellsCounter.incr(delta);
+ }
+
+ @Override
+ public void incrSkippedCells(long delta) {
+ skippedCellsCounter.incr(delta);
+ }
+
+ @Override
+ public void incrHealedShadowCells() {
+ healedShadowCellsCounter.incr();
+ }
+
+ @Override
+ public void incrDiscardedCells() {
+ discardedCellsCounter.incr();
+ }
+
+ @Override
+ public void incrTombstoneCells() {
+ tombstoneCellsCounter.incr();
+ }
+
+}
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/hbase/coprocessor/metrics/CompactorCoprocessorMetricsSource.java b/hbase-coprocessor/src/main/java/org/apache/omid/hbase/coprocessor/metrics/CompactorCoprocessorMetricsSource.java
new file mode 100644
index 000000000..0dd5ff0a0
--- /dev/null
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/hbase/coprocessor/metrics/CompactorCoprocessorMetricsSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.hbase.coprocessor.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface CompactorCoprocessorMetricsSource extends BaseSource {
+
+ /**
+ * Update the compaction time histogram
+ * @param timeInMs time it took
+ */
+ void updateCompactionTime(long timeInMs);
+
+ /**
+ * Update the time it took processing a cell
+ * @param timeInMs time it took
+ */
+ void updateCellProcessingTime(long timeInMs);
+
+ /**
+ * Update the time it took to query the commit table for trying to find
+ * the commit timestamp
+ * @param timeInMs time it took
+ */
+ void updateCommitTableQueryTime(long timeInMs);
+
+ /**
+ * Increment the number of major compactions
+ */
+ public void incrMajorCompactions();
+
+ /**
+ * Increment the number of minor compactions
+ */
+ public void incrMinorCompactions();
+
+ /**
+ * Increment the number of scanned rows when compacting
+ */
+ void incrScannedRows();
+
+ /**
+ * Increment the number of total cells processed when compacting
+ */
+ void incrTotalCells();
+
+ /**
+ * Increment the number of retained cells when compacting
+ * @param delta the delta to increment the counter
+ */
+ void incrRetainedCells(long delta);
+
+ /**
+ * Increment the number of skipped cells when compacting
+ * @param delta the delta to increment the counter
+ */
+ void incrSkippedCells(long delta);
+
+ /**
+ * Increment the number of healed shadow cells when compacting
+ */
+ void incrHealedShadowCells();
+
+ /**
+ * Increment the number of discarded cells when compacting
+ */
+ void incrDiscardedCells();
+
+ /**
+ * Increment the number of tombstone cells when compacting
+ */
+ void incrTombstoneCells();
+
+}
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index 887a2f69f..e2e47faa0 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -18,6 +18,7 @@
package org.apache.omid.transaction;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
@@ -34,6 +35,7 @@
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +72,10 @@ public class OmidCompactor extends BaseRegionObserver {
// will be deleted anyways after a major one
private boolean retainNonTransactionallyDeletedCells;
+ // Metrics
+ private CompactorCoprocessorMetrics metrics;
+ private long compactionStartTimeInMs;
+
public OmidCompactor() {
LOG.info("Compactor coprocessor initialized via empty constructor");
}
@@ -86,6 +92,8 @@ public void start(CoprocessorEnvironment env) throws IOException {
retainNonTransactionallyDeletedCells =
conf.getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
+ LOG.info("\tStarting coprocessor metrics...");
+ metrics = new CompactorCoprocessorMetrics();
LOG.info("Compactor coprocessor started");
}
@@ -106,12 +114,11 @@ public InternalScanner preCompact(ObserverContext
InternalScanner scanner,
ScanType scanType,
CompactionRequest request) throws IOException {
+
HTableDescriptor desc = e.getEnvironment().getRegion().getTableDesc();
- HColumnDescriptor famDesc
- = desc.getFamily(Bytes.toBytes(store.getColumnFamilyName()));
+ HColumnDescriptor famDesc = desc.getFamily(Bytes.toBytes(store.getColumnFamilyName()));
boolean omidCompactable = Boolean.valueOf(famDesc.getValue(OMID_COMPACTABLE_CF_FLAG));
- // only column families tagged as compactable are compacted
- // with omid compactor
+ // only column families tagged as compactable are compacted with omid compactor
if (!omidCompactable) {
return scanner;
} else {
@@ -120,15 +127,31 @@ public InternalScanner preCompact(ObserverContext
commitTableClient = initAndGetCommitTableClient();
}
boolean isMajorCompaction = request.isMajor();
+ if (isMajorCompaction) {
+ metrics.incrMajorCompactions();
+ } else {
+ metrics.incrMinorCompactions();
+ }
+ compactionStartTimeInMs = System.currentTimeMillis();
return new CompactorScanner(e,
- scanner,
- commitTableClient,
- commitTableClientQueue,
- isMajorCompaction,
- retainNonTransactionallyDeletedCells);
+ scanner,
+ commitTableClient,
+ commitTableClientQueue,
+ isMajorCompaction,
+ retainNonTransactionallyDeletedCells,
+ metrics);
}
}
+ @Override
+ public void postCompact(ObserverContext e,
+ final Store store,
+ final StoreFile resultFile) throws IOException {
+
+ metrics.updateCompactionTime(System.currentTimeMillis() - compactionStartTimeInMs);
+
+ }
+
private CommitTable.Client initAndGetCommitTableClient() throws IOException {
LOG.info("Trying to get the commit table client");
CommitTable commitTable = new HBaseCommitTable(conf, commitTableConf);
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/hbase/coprocessor/metrics/TestCompactorCoprocessorMetrics.java b/hbase-coprocessor/src/test/java/org/apache/omid/hbase/coprocessor/metrics/TestCompactorCoprocessorMetrics.java
new file mode 100644
index 000000000..7f0307891
--- /dev/null
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/hbase/coprocessor/metrics/TestCompactorCoprocessorMetrics.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.hbase.coprocessor.metrics;
+
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.DISCARDED_CELLS_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.HEALED_SHADOW_CELLS_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.MAJOR_COMPACTION_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.MINOR_COMPACTION_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.RETAINED_CELLS_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.SCANNED_ROWS_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.SKIPPED_CELLS_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.TOMBSTONE_CELLS_KEY;
+import static org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics.TOTAL_CELLS_KEY;
+
+public class TestCompactorCoprocessorMetrics {
+
+ public static MetricsAssertHelper HELPER = CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+ private CompactorCoprocessorMetrics compactorMetrics;
+
+ @BeforeClass
+ public static void classSetUp() {
+ HELPER.init();
+ }
+
+ @BeforeMethod
+ public void setUp() {
+ compactorMetrics = new CompactorCoprocessorMetrics();
+ }
+
+ @Test
+ public void testCounters() {
+
+ for (int i = 0; i < 10; i++) {
+ compactorMetrics.incrMajorCompactions();
+ }
+ HELPER.assertCounter(MAJOR_COMPACTION_KEY, 10, compactorMetrics);
+
+ for (int i = 0; i < 11; i++) {
+ compactorMetrics.incrMinorCompactions();
+ }
+ HELPER.assertCounter(MINOR_COMPACTION_KEY, 11, compactorMetrics);
+
+ for (int i = 0; i < 12; i++) {
+ compactorMetrics.incrScannedRows();
+ }
+ HELPER.assertCounter(SCANNED_ROWS_KEY, 12, compactorMetrics);
+
+ for (int i = 0; i < 13; i++) {
+ compactorMetrics.incrTotalCells();
+ }
+ HELPER.assertCounter(TOTAL_CELLS_KEY, 13, compactorMetrics);
+
+ for (int i = 0; i < 14; i++) {
+ compactorMetrics.incrRetainedCells(14);
+ }
+ HELPER.assertCounter(RETAINED_CELLS_KEY, 14 * 14, compactorMetrics);
+
+ for (int i = 0; i < 15; i++) {
+ compactorMetrics.incrSkippedCells(15);
+ }
+ HELPER.assertCounter(SKIPPED_CELLS_KEY, 15 * 15, compactorMetrics);
+
+ for (int i = 0; i < 16; i++) {
+ compactorMetrics.incrHealedShadowCells();
+ }
+ HELPER.assertCounter(HEALED_SHADOW_CELLS_KEY, 16, compactorMetrics);
+
+ for (int i = 0; i < 17; i++) {
+ compactorMetrics.incrDiscardedCells();
+ }
+ HELPER.assertCounter(DISCARDED_CELLS_KEY, 17, compactorMetrics);
+
+ for (int i = 0; i < 18; i++) {
+ compactorMetrics.incrTombstoneCells();
+ }
+ HELPER.assertCounter(TOMBSTONE_CELLS_KEY, 18, compactorMetrics);
+
+ }
+
+}
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
index 8a217b31a..b1aa064b9 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
+import org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.DataProvider;
@@ -60,6 +61,7 @@ public void testShouldRetainNonTransactionallyDeletedCellMethod(int optionIdx, b
throws Exception {
// Create required mocks
+ CompactorCoprocessorMetrics metrics = mock(CompactorCoprocessorMetrics.class);
@SuppressWarnings("unchecked")
ObserverContext ctx = mock(ObserverContext.class);
InternalScanner internalScanner = mock(InternalScanner.class);
@@ -80,11 +82,12 @@ public void testShouldRetainNonTransactionallyDeletedCellMethod(int optionIdx, b
LOG.info("Testing when retain is {}", retainOption);
try (CompactorScanner scanner = spy(new CompactorScanner(ctx,
- internalScanner,
- ctClient,
- queue,
- false,
- retainOption))) {
+ internalScanner,
+ ctClient,
+ queue,
+ false,
+ retainOption,
+ metrics))) {
// Different cell types to test
KeyValue regularKV = new KeyValue(Bytes.toBytes("test-row"), TEST_TS, Type.Put);
| | | | | | | | | | |