Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hbase-coprocessor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>${hbase.version}</version>
</dependency>

<!-- end storage related -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.hbase.coprocessor.metrics.CompactorCoprocessorMetrics;
import org.apache.omid.transaction.CellUtils;
import org.apache.omid.transaction.CellInfo;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -62,14 +63,20 @@ public class CompactorScanner implements InternalScanner {
private final Region hRegion;

private boolean hasMoreRows = false;
private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
private List<Cell> currentRowWorthValues = new ArrayList<>();

// Metrics
private CompactorCoprocessorMetrics metrics;

public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner internalScanner,
Client commitTableClient,
Queue<CommitTable.Client> commitTableClientQueue,
boolean isMajorCompaction,
boolean preserveNonTransactionallyDeletedCells) throws IOException {
boolean preserveNonTransactionallyDeletedCells,
CompactorCoprocessorMetrics metrics) throws IOException
{

this.internalScanner = internalScanner;
this.commitTableClient = commitTableClient;
this.commitTableClientQueue = commitTableClientQueue;
Expand All @@ -78,6 +85,7 @@ public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
this.lowWatermark = getLowWatermarkFromCommitTable();
// Obtain the table in which the scanner is going to operate
this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
this.metrics = metrics;
LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
lowWatermark, hRegion.getRegionInfo());
}
Expand All @@ -101,6 +109,9 @@ public boolean next(List<Cell> result, int limit) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
}
if (hasMoreRows) {
metrics.incrScannedRows();
}
// 2) Traverse result list separating normal cells from shadow
// cells and building a map to access easily the shadow cells.
SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
Expand All @@ -111,17 +122,21 @@ public boolean next(List<Cell> result, int limit) throws IOException {
PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
= Iterators.peekingIterator(cellToSc.entrySet().iterator());
while (iter.hasNext()) {
long cellProcessingTimeStartTimeInMs = System.currentTimeMillis();
metrics.incrTotalCells();
Map.Entry<Cell, Optional<Cell>> entry = iter.next();
Cell cell = entry.getKey();
Optional<Cell> shadowCellOp = entry.getValue();

if (cell.getTimestamp() > lowWatermark) {
retain(currentRowWorthValues, cell, shadowCellOp);
metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
continue;
}

if (shouldRetainNonTransactionallyDeletedCell(cell)) {
retain(currentRowWorthValues, cell, shadowCellOp);
metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
continue;
}

Expand All @@ -142,6 +157,8 @@ public boolean next(List<Cell> result, int limit) throws IOException {
skipToNextColumn(cell, iter);
}
}
metrics.incrTombstoneCells();
metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
continue;
}
}
Expand All @@ -155,12 +172,16 @@ public boolean next(List<Cell> result, int limit) throws IOException {
byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
metrics.incrHealedShadowCells();
} else {
LOG.trace("Discarding cell {}", cell);
metrics.incrDiscardedCells();
}
}
metrics.updateCellProcessingTime(System.currentTimeMillis() - cellProcessingTimeStartTimeInMs);
}
retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
metrics.incrRetainedCells(lastTimestampedCellsInRow.values().size());

// 4) Sort the list
Collections.sort(currentRowWorthValues, KeyValue.COMPARATOR);
Expand Down Expand Up @@ -229,7 +250,9 @@ private long getLowWatermarkFromCommitTable() throws IOException {

private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
try {
long queryCommitTableStartTimeInMs = System.currentTimeMillis();
Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
metrics.updateCommitTableQueryTime(System.currentTimeMillis() - queryCommitTableStartTimeInMs);
if (ct.isPresent()) {
return Optional.of(ct.get());
} else {
Expand Down Expand Up @@ -276,11 +299,16 @@ private void retainLastTimestampedCellsSaved(List<Cell> result, Map<String, Cell
}

private void skipToNextColumn(Cell cell, PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter) {

int skippedCellsCount = 0;
while (iter.hasNext()
&& CellUtil.matchingFamily(iter.peek().getKey(), cell)
&& CellUtil.matchingQualifier(iter.peek().getKey(), cell)) {
iter.next();
skippedCellsCount++;
}
metrics.incrSkippedCells(skippedCellsCount);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.hbase.coprocessor.metrics;

import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;

public class CompactorCoprocessorMetrics extends BaseSourceImpl implements CompactorCoprocessorMetricsSource {

private static final String METRICS_NAME = "CompactorCoprocessor";
private static final String METRICS_CONTEXT = "omid.coprocessor.compactor";
private static final String METRICS_DESCRIPTION = "Omid Compactor Coprocessor Metrics";
private static final String METRICS_JMX_CONTEXT = "Omid,sub=" + METRICS_NAME;

// ----------------------------------------------------------------------------------------------------------------
// Metrics
// ----------------------------------------------------------------------------------------------------------------

// Histogram-related keys & descriptions
static final String COMPACTIONS_KEY = "compactions";
static final String COMPACTIONS_DESC = "Histogram about Compactions";
static final String CELL_PROCESSING_KEY = "cellProcessing";
static final String CELL_PROCESSING_DESC = "Histogram about Cell Processing";
static final String COMMIT_TABLE_QUERY_KEY = "commitTableQuery";
static final String COMMIT_TABLE_QUERY_DESC = "Histogram about Commit Table Query";

// Counter-related keys & descriptions
static final String MAJOR_COMPACTION_KEY = "major-compactions";
static final String MAJOR_COMPACTION_DESC = "Number of major compactions";
static final String MINOR_COMPACTION_KEY = "minor-compactions";
static final String MINOR_COMPACTION_DESC = "Number of minor compactions";
static final String SCANNED_ROWS_KEY = "scanned-rows";
static final String SCANNED_ROWS_DESC = "Number of rows scanned";
static final String TOTAL_CELLS_KEY = "total-cells";
static final String TOTAL_CELLS_DESC = "Number of cells processed";
static final String RETAINED_CELLS_KEY = "retained-cells";
static final String RETAINED_CELLS_DESC = "Number of cells retained when compacting";
static final String SKIPPED_CELLS_KEY = "skipped-cells";
static final String SKIPPED_CELLS_DESC = "Number of cells skipped when compacting";
static final String HEALED_SHADOW_CELLS_KEY = "healed-shadow-cells";
static final String HEALED_SHADOW_CELLS_DESC = "Number of cells healed when compacting";
static final String DISCARDED_CELLS_KEY = "discarded-cells";
static final String DISCARDED_CELLS_DESC = "Number of cells discarded when compacting";
static final String TOMBSTONE_CELLS_KEY = "tombstone-cells";
static final String TOMBSTONE_CELLS_DESC = "Number of tombstone cells found when compacting";

// *************************** Elements **********************************/

// Histograms
private final MetricHistogram compactionsHistogram;
private final MetricHistogram cellProcessingHistogram;
private final MetricHistogram commitTableQueryHistogram;

// Counters
private final MetricMutableCounterLong majorCompactionsCounter;
private final MetricMutableCounterLong minorCompactionsCounter;
private final MetricMutableCounterLong scannedRowsCounter;
private final MetricMutableCounterLong totalCellsCounter;
private final MetricMutableCounterLong retainedCellsCounter;
private final MetricMutableCounterLong skippedCellsCounter;
private final MetricMutableCounterLong healedShadowCellsCounter;
private final MetricMutableCounterLong discardedCellsCounter;
private final MetricMutableCounterLong tombstoneCellsCounter;

// ----------------------------------------------------------------------------------------------------------------
// End of Metrics
// ----------------------------------------------------------------------------------------------------------------

public CompactorCoprocessorMetrics() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
}

public CompactorCoprocessorMetrics(String metricsName,
String metricsDescription,
String metricsContext,
String metricsJmxContext) {

super(metricsName, metricsDescription, metricsContext, metricsJmxContext);

// Histograms
compactionsHistogram = getMetricsRegistry().newHistogram(COMPACTIONS_KEY, COMPACTIONS_DESC);
cellProcessingHistogram = getMetricsRegistry().newHistogram(CELL_PROCESSING_KEY, CELL_PROCESSING_DESC);
commitTableQueryHistogram = getMetricsRegistry().newHistogram(COMMIT_TABLE_QUERY_KEY, COMMIT_TABLE_QUERY_DESC);

// Counters
majorCompactionsCounter = getMetricsRegistry().newCounter(MAJOR_COMPACTION_KEY, MAJOR_COMPACTION_DESC, 0L);
minorCompactionsCounter = getMetricsRegistry().newCounter(MINOR_COMPACTION_KEY, MINOR_COMPACTION_DESC, 0L);
scannedRowsCounter = getMetricsRegistry().newCounter(SCANNED_ROWS_KEY, SCANNED_ROWS_DESC, 0L);
totalCellsCounter = getMetricsRegistry().newCounter(TOTAL_CELLS_KEY, TOTAL_CELLS_DESC, 0L);
retainedCellsCounter = getMetricsRegistry().newCounter(RETAINED_CELLS_KEY, RETAINED_CELLS_DESC, 0L);
skippedCellsCounter = getMetricsRegistry().newCounter(SKIPPED_CELLS_KEY, SKIPPED_CELLS_DESC, 0L);
healedShadowCellsCounter = getMetricsRegistry().newCounter(HEALED_SHADOW_CELLS_KEY, HEALED_SHADOW_CELLS_DESC, 0L);
discardedCellsCounter = getMetricsRegistry().newCounter(DISCARDED_CELLS_KEY, DISCARDED_CELLS_DESC, 0L);
tombstoneCellsCounter = getMetricsRegistry().newCounter(TOMBSTONE_CELLS_KEY, TOMBSTONE_CELLS_DESC, 0L);

}

// ----------------------------------------------------------------------------------------------------------------
// CompactorCoprocessorMetricsSource Interface Impl
// ----------------------------------------------------------------------------------------------------------------

@Override
public void updateCompactionTime(long timeInMs) {
compactionsHistogram.add(timeInMs);
}

@Override
public void updateCellProcessingTime(long timeInMs) {
cellProcessingHistogram.add(timeInMs);
}

@Override
public void updateCommitTableQueryTime(long timeInMs) {
commitTableQueryHistogram.add(timeInMs);
}

@Override
public void incrMajorCompactions() {
majorCompactionsCounter.incr();
}

@Override
public void incrMinorCompactions() {
minorCompactionsCounter.incr();
}

@Override
public void incrScannedRows() {
scannedRowsCounter.incr();
}

@Override
public void incrTotalCells() {
totalCellsCounter.incr();
}

@Override
public void incrRetainedCells(long delta) {
retainedCellsCounter.incr(delta);
}

@Override
public void incrSkippedCells(long delta) {
skippedCellsCounter.incr(delta);
}

@Override
public void incrHealedShadowCells() {
healedShadowCellsCounter.incr();
}

@Override
public void incrDiscardedCells() {
discardedCellsCounter.incr();
}

@Override
public void incrTombstoneCells() {
tombstoneCellsCounter.incr();
}

}
Loading