[prev in list] [next in list] [prev in thread] [next in thread]
List: rhq-commits
Subject: [rhq] Branch 'jsanda/aggregation' - 3 commits - modules/enterprise
From: John Sanda <jsanda () fedoraproject ! org>
Date: 2013-12-19 18:44:29
Message-ID: 20131219184429.D2FC360A3B () fedorahosted ! org
[Download RAW message or body]
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java \
| 109 -- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java \
| 66 - modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java \
| 62 - modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java \
| 116 --- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java \
| 255 ------ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java \
| 331 -------- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java \
| 104 -- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java \
| 91 -- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java \
| 97 -- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java \
| 4 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java \
| 127 +++ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java \
| 84 ++ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java \
| 73 + modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java \
| 133 +++ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java \
| 257 ++++++ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java \
| 378 ++++++++++ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java \
| 113 ++ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java \
| 99 ++ modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java \
| 106 ++ modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java \
| 1 20 files changed, 1373 insertions(+), 1233 deletions(-)
New commits:
commit 1339dad67f4d008561bbb99b7fd5bec8bc90eca3
Author: John Sanda <jsanda@redhat.com>
Date: Thu Dec 19 13:41:10 2013 -0500
[BZ 1009945] turn on async aggregation by default
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
index 7f5b639..d319146 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
@@ -92,7 +92,7 @@ public class MetricsServer {
private int aggregationBatchSize;
- private boolean useAsyncAggregation = \
System.getProperty("rhq.metrics.aggregation.async") != null; + private boolean \
useAsyncAggregation = \
Boolean.valueOf(System.getProperty("rhq.metrics.aggregation.async", "true"));
public void setDAO(MetricsDAO dao) {
this.dao = dao;
commit 0852a2f987de1af4c409067da2a13b6064a640bc
Author: John Sanda <jsanda@redhat.com>
Date: Thu Dec 19 11:11:50 2013 -0500
[BZ 1009945] updating test with package refactoring
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java \
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
index 00803db..7d66cef 100644
--- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.rhq.core.domain.measurement.MeasurementDataNumeric;
+import org.rhq.server.metrics.aggregation.Aggregator;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateType;
import org.rhq.server.metrics.domain.MetricsIndexEntry;
commit 4da0ad97bbcb2e36c70d47a96e8345825bb72615
Author: John Sanda <jsanda@redhat.com>
Date: Thu Dec 19 11:05:14 2013 -0500
[BZ 1009945] cleaning up logging and adding some javadocs
Also moving all aggregation related classes into the \
org.rhq.server.metrics.aggregation package. All classes except Aggregator now have \
package-level access.
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
deleted file mode 100644
index 4a5e78b..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted \
for the batch, aggregation of 6 hour
- * data will start immediately for the batch if the 24 hour time slice has finished.
- *
- * @see Compute6HourData
- * @author John Sanda
- */
-public class Aggregate1HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate1HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
- ListenableFuture<List<ResultSet>> queriesFuture = \
Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception {
- log.error("An error occurred while fetching one hour data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures = \
Futures.transform(queriesFuture,
- state.getCompute6HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- log.debug("Finished aggregating 1 hour data for " + result.size() + \
" schedules in " +
- (System.currentTimeMillis() - start) + " ms");
- start6HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to aggregate 1 hour data", t);
- start6HourDataAggregationIfNecessary();
- }
- });
- }
-
- private void start6HourDataAggregationIfNecessary() {
- try {
- if (state.is24HourTimeSliceFinished()) {
- update6HourIndexEntries();
- List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate6HourData(dao, \
state, scheduleIds, queryFutures));
- }
- } catch (InterruptedException e) {
- log.debug("An interrupt occurred while waiting for 6 hour data index \
entries. Aborting data aggregation",
- e);
- log.info("An interrupt occurred while waiting for 6 hour data index \
entries. Aborting data aggregation: " +
- e.getMessage());
- } finally {
- state.getRemaining1HourData().addAndGet(-scheduleIds.size());
- }
- }
-
- private void update6HourIndexEntries() throws InterruptedException {
- try {
- state.getSixHourIndexEntriesArrival().await();
- try {
- state.getSixHourIndexEntriesLock().writeLock().lock();
- state.getSixHourIndexEntries().removeAll(scheduleIds);
- } finally {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 6 hour data because we do not need the index
- // here since we already have 6 hour data to aggregate along with the
- // schedule ids.
- }
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
deleted file mode 100644
index fc8c0cb..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * @author John Sanda
- */
-public class Aggregate6HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate6HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
- ListenableFuture<List<ResultSet>> queriesFuture = \
Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception {
- log.error("An error occurred while fetching 6 hour data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures = \
Futures.transform(queriesFuture,
- state.getCompute24HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- log.debug("Finished aggregating 6 hour data for " + result.size() + \
" schedules in " +
- (System.currentTimeMillis() - start) + " ms");
- state.getRemaining6HourData().addAndGet(-scheduleIds.size());
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to aggregate 6 hour data", t);
- state.getRemaining6HourData().addAndGet(-scheduleIds.size());
- }
- });
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
deleted file mode 100644
index 1eb9e53..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.FutureCallback;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
-* @author John Sanda
-*/
-class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
-
- private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
-
- private Set<Integer> indexEntries;
-
- private AtomicInteger remainingData;
-
- private SignalingCountDownLatch indexEntriesArrival;
-
- private long startTime;
-
- private String src;
-
- private String dest;
-
- public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger \
remainingData,
- SignalingCountDownLatch indexEntriesArrival, long startTime, String src, \
String dest) {
- this.indexEntries = indexEntries;
- this.remainingData = remainingData;
- this.indexEntriesArrival = indexEntriesArrival;
- this.startTime = startTime;
- this.src = src;
- this.dest = dest;
- }
-
- @Override
- public void onSuccess(ResultSet resultSet) {
- for (Row row : resultSet) {
- indexEntries.add(row.getInt(1));
- }
- remainingData.set(indexEntries.size());
- indexEntriesArrival.countDown();
- if (log.isDebugEnabled()) {
- log.debug("Finished loading " + indexEntries.size() + " " + src + " \
index entries in " +
- (System.currentTimeMillis() - startTime) + " ms");
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to retrieve " + src + " index entries. Some " + dest + " \
aggregates may not get generated.",
- t);
- remainingData.set(0);
- indexEntriesArrival.abort();
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
deleted file mode 100644
index dbfc289..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Generates 1 hour data for a batch of raw data futures. After data is inserted for \
the batch, aggregation of 1 hour
- * data will start immediately for the batch if the 6 hour time slice has finished.
- *
- * @see Compute1HourData
- * @author John Sanda
- */
-public class AggregateRawData implements Runnable {
-
- private final Log log = LogFactory.getLog(AggregateRawData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
- ListenableFuture<List<ResultSet>> rawDataFutures = \
Futures.successfulAsList(queryFutures);
- Futures.withFallback(rawDataFutures, new FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception {
- log.error("An error occurred while fetching raw data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
-
- final ListenableFuture<List<ResultSet>> insert1HourDataFutures = \
Futures.transform(rawDataFutures,
- state.getCompute1HourData(), state.getAggregationTasks());
- Futures.addCallback(insert1HourDataFutures, new \
FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> resultSets) {
- log.debug("Finished aggregating raw data for " + resultSets.size() + \
" schedules in " +
- (System.currentTimeMillis() - start) + " ms");
- start1HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to aggregate raw data", t);
- // TODO maybe add debug statement to log those schedule ids for \
which aggregation failed
- start1HourDataAggregationIfNecessary();
- }
- }, state.getAggregationTasks());
- }
-
- private void start1HourDataAggregationIfNecessary() {
- try {
- if (state.is6HourTimeSliceFinished()) {
- update1HourIndexEntries();
- List<StorageResultSetFuture> oneHourDataQueryFutures = new \
ArrayList<StorageResultSetFuture>(
- scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- \
oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
- state.getSixHourTimeSlice().getMillis(), \
state.getSixHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate1HourData(dao, \
state, scheduleIds,
- oneHourDataQueryFutures));
- }
- } catch (InterruptedException e) {
- log.debug("An interrupt occurred while waiting for 1 hour data index \
entries. Aborting data aggregation",
- e);
- log.info("An interrupt occurred while waiting for 1 hour data index \
entries. Aborting data aggregation: " +
- e.getMessage());
- } finally {
- state.getRemainingRawData().addAndGet(-scheduleIds.size());
- }
- }
-
- private void update1HourIndexEntries() throws InterruptedException {
- try {
- // Wait for the arrival so that we can remove the schedules ids in this
- // batch from the one hour index entries. This will prevent duplicate \
tasks
- // being submitted to process the same 1 hour data.
- state.getOneHourIndexEntriesArrival().await();
- try {
- state.getOneHourIndexEntriesLock().writeLock().lock();
- state.getOneHourIndexEntries().removeAll(scheduleIds);
- } finally {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 1 hour data because we do not need the index
- // here since we already have 1 hour data to aggregate along with the
- // schedule ids.
- }
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
deleted file mode 100644
index bd1ce0d..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
+++ /dev/null
@@ -1,255 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-import org.joda.time.DateTime;
-
-/**
- * @author John Sanda
- */
-public class AggregationState {
-
- private ListeningExecutorService aggregationTasks;
-
- private SignalingCountDownLatch oneHourIndexEntriesArrival;
-
- private SignalingCountDownLatch sixHourIndexEntriesArrival;
-
- private AtomicInteger remainingRawData;
-
- private AtomicInteger remaining1HourData;
-
- private AtomicInteger remaining6HourData;
-
- private Set<Integer> oneHourIndexEntries;
-
- private Set<Integer> sixHourIndexEntries;
-
- private ReentrantReadWriteLock oneHourIndexEntriesLock;
-
- private ReentrantReadWriteLock sixHourIndexEntriesLock;
-
- private DateTime oneHourTimeSlice;
-
- private DateTime sixHourTimeSlice;
-
- private DateTime sixHourTimeSliceEnd;
-
- private DateTime twentyFourHourTimeSlice;
-
- private DateTime twentyFourHourTimeSliceEnd;
-
- private boolean sixHourTimeSliceFinished;
-
- private boolean twentyFourHourTimeSliceFinished;
-
- private Compute1HourData compute1HourData;
-
- private Compute6HourData compute6HourData;
-
- private Compute24HourData compute24HourData;
-
- public ListeningExecutorService getAggregationTasks() {
- return aggregationTasks;
- }
-
- public AggregationState setAggregationTasks(ListeningExecutorService \
aggregationTasks) {
- this.aggregationTasks = aggregationTasks;
- return this;
- }
-
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
entries for schedules with 1 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
- return oneHourIndexEntriesArrival;
- }
-
- public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch \
oneHourIndexEntriesArrival) {
- this.oneHourIndexEntriesArrival = oneHourIndexEntriesArrival;
- return this;
- }
-
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
entries for schedules with 6 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
- return sixHourIndexEntriesArrival;
- }
-
- public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch \
sixHourIndexEntriesArrival) {
- this.sixHourIndexEntriesArrival = sixHourIndexEntriesArrival;
- return this;
- }
-
- /**
- * @return The remaining number of schedules with raw data to be aggregated
- */
- public AtomicInteger getRemainingRawData() {
- return remainingRawData;
- }
-
- public AggregationState setRemainingRawData(AtomicInteger remainingRawData) {
- this.remainingRawData = remainingRawData;
- return this;
- }
-
- /**
- * @return The remaining number of schedules with 1 hour data to be aggregated
- */
- public AtomicInteger getRemaining1HourData() {
- return remaining1HourData;
- }
-
- public AggregationState setRemaining1HourData(AtomicInteger remaining1HourData) \
{
- this.remaining1HourData = remaining1HourData;
- return this;
- }
-
- /**
- * @return The remaining number of schedules with 6 hour data to be aggregated
- */
- public AtomicInteger getRemaining6HourData() {
- return remaining6HourData;
- }
-
- public AggregationState setRemaining6HourData(AtomicInteger remaining6HourData) \
{
- this.remaining6HourData = remaining6HourData;
- return this;
- }
-
- /**
- * @return The schedule ids with 1 hour data to be aggregated
- */
- public Set<Integer> getOneHourIndexEntries() {
- return oneHourIndexEntries;
- }
-
- public AggregationState setOneHourIndexEntries(Set<Integer> oneHourIndexEntries) \
{
- this.oneHourIndexEntries = oneHourIndexEntries;
- return this;
- }
-
- public Set<Integer> getSixHourIndexEntries() {
- return sixHourIndexEntries;
- }
-
- public AggregationState setSixHourIndexEntries(Set<Integer> sixHourIndexEntries) \
{
- this.sixHourIndexEntries = sixHourIndexEntries;
- return this;
- }
-
- public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
- return oneHourIndexEntriesLock;
- }
-
- public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock \
oneHourIndexEntriesLock) {
- this.oneHourIndexEntriesLock = oneHourIndexEntriesLock;
- return this;
- }
-
- public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
- return sixHourIndexEntriesLock;
- }
-
- public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock \
sixHourIndexEntriesLock) {
- this.sixHourIndexEntriesLock = sixHourIndexEntriesLock;
- return this;
- }
-
- public DateTime getOneHourTimeSlice() {
- return oneHourTimeSlice;
- }
-
- public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
- this.oneHourTimeSlice = oneHourTimeSlice;
- return this;
- }
-
- public DateTime getSixHourTimeSlice() {
- return sixHourTimeSlice;
- }
-
- public AggregationState setSixHourTimeSlice(DateTime sixHourTimeSlice) {
- this.sixHourTimeSlice = sixHourTimeSlice;
- return this;
- }
-
- public DateTime getSixHourTimeSliceEnd() {
- return sixHourTimeSliceEnd;
- }
-
- public AggregationState setSixHourTimeSliceEnd(DateTime sixHourTimeSliceEnd) {
- this.sixHourTimeSliceEnd = sixHourTimeSliceEnd;
- return this;
- }
-
- public DateTime getTwentyFourHourTimeSlice() {
- return twentyFourHourTimeSlice;
- }
-
- public AggregationState setTwentyFourHourTimeSlice(DateTime \
twentyFourHourTimeSlice) {
- this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
- return this;
- }
-
- public DateTime getTwentyFourHourTimeSliceEnd() {
- return twentyFourHourTimeSliceEnd;
- }
-
- public AggregationState setTwentyFourHourTimeSliceEnd(DateTime \
twentyFourHourTimeSliceEnd) {
- this.twentyFourHourTimeSliceEnd = twentyFourHourTimeSliceEnd;
- return this;
- }
-
- public boolean is6HourTimeSliceFinished() {
- return sixHourTimeSliceFinished;
- }
-
- public AggregationState set6HourTimeSliceFinished(boolean \
is6HourTimeSliceFinished) {
- this.sixHourTimeSliceFinished = is6HourTimeSliceFinished;
- return this;
- }
-
- public boolean is24HourTimeSliceFinished() {
- return twentyFourHourTimeSliceFinished;
- }
-
- public AggregationState set24HourTimeSliceFinished(boolean \
is24HourTimeSliceFinished) {
- this.twentyFourHourTimeSliceFinished = is24HourTimeSliceFinished;
- return this;
- }
-
- public Compute1HourData getCompute1HourData() {
- return compute1HourData;
- }
-
- public AggregationState setCompute1HourData(Compute1HourData compute1HourData) {
- this.compute1HourData = compute1HourData;
- return this;
- }
-
- public Compute6HourData getCompute6HourData() {
- return compute6HourData;
- }
-
- public AggregationState setCompute6HourData(Compute6HourData compute6HourData) {
- this.compute6HourData = compute6HourData;
- return this;
- }
-
- public Compute24HourData getCompute24HourData() {
- return compute24HourData;
- }
-
- public AggregationState setCompute24HourData(Compute24HourData \
compute24HourData) {
- this.compute24HourData = compute24HourData;
- return this;
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
deleted file mode 100644
index 4ff222a..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
+++ /dev/null
@@ -1,331 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeComparator;
-import org.joda.time.Duration;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Aggregator {
-
- private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR = \
new Comparator<AggregateNumericMetric>() {
- @Override
- public int compare(AggregateNumericMetric left, AggregateNumericMetric \
right) {
- return (left.getScheduleId() < right.getScheduleId()) ? -1 : \
((left.getScheduleId() == right.getScheduleId()) ? 0 : 1);
- }
- };
-
- private final Log log = LogFactory.getLog(Aggregator.class);
-
- private MetricsDAO dao;
-
- private MetricsConfiguration configuration;
-
- private DateTimeService dtService;
-
- private DateTime startTime;
-
- /**
- * Signals when raw data index entries (in metrics_index) can be deleted. We \
cannot delete the row in metrics_index
- * until we know that it has been read, successfully or otherwise.
- */
- private SignalingCountDownLatch rawDataIndexEntriesArrival;
-
- private RateLimiter readPermits;
- private RateLimiter writePermits;
-
- private int batchSize;
-
- private AggregationState state;
-
- private Set<AggregateNumericMetric> oneHourData;
-
- private AtomicInteger remainingIndexEntries;
-
- public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao, \
MetricsConfiguration configuration,
- DateTimeService dtService, DateTime startTime, int batchSize, RateLimiter \
writePermits,
- RateLimiter readPermits) {
- this.dao = dao;
- this.configuration = configuration;
- this.dtService = dtService;
- this.startTime = startTime;
- this.readPermits = readPermits;
- this.writePermits = writePermits;
- this.batchSize = batchSize;
- oneHourData = new \
ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR);
- rawDataIndexEntriesArrival = new SignalingCountDownLatch(new \
CountDownLatch(1));
- remainingIndexEntries = new AtomicInteger(1);
-
- DateTime sixHourTimeSlice = get6HourTimeSlice();
- DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
-
- state = new AggregationState()
- .setAggregationTasks(aggregationTasks)
- .setOneHourTimeSlice(startTime)
- .setSixHourTimeSlice(sixHourTimeSlice)
- .setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
- .setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
- .setTwentyFourHourTimeSliceEnd(twentyFourHourTimeSlice.plus(configuration.getSixHourTimeSliceDuration()))
- .setCompute1HourData(new Compute1HourData(startTime, sixHourTimeSlice, \
writePermits, dao, oneHourData))
- .setCompute6HourData(new Compute6HourData(sixHourTimeSlice, \
twentyFourHourTimeSlice, writePermits, dao))
- .setCompute24HourData(new Compute24HourData(twentyFourHourTimeSlice, \
writePermits, dao))
- .set6HourTimeSliceFinished(hasTimeSliceEnded(sixHourTimeSlice, \
configuration.getOneHourTimeSliceDuration()))
- .set24HourTimeSliceFinished(hasTimeSliceEnded(twentyFourHourTimeSlice,
- configuration.getSixHourTimeSliceDuration()))
- .setRemainingRawData(new AtomicInteger(0))
- .setRemaining1HourData(new AtomicInteger(0))
- .setRemaining6HourData(new AtomicInteger(0))
- .setOneHourIndexEntries(new TreeSet<Integer>())
- .setSixHourIndexEntries(new TreeSet<Integer>())
- .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
- .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
-
- if (state.is6HourTimeSliceFinished()) {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(0)));
- state.setRemaining1HourData(new AtomicInteger(0));
- }
-
- if (state.is24HourTimeSliceFinished()) {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(0)));
- state.setRemaining6HourData(new AtomicInteger(0));
- }
- }
-
- private DateTime get24HourTimeSlice() {
- return dtService.getTimeSlice(startTime, \
configuration.getSixHourTimeSliceDuration());
- }
-
- private DateTime get6HourTimeSlice() {
- return dtService.getTimeSlice(startTime, \
configuration.getOneHourTimeSliceDuration());
- }
-
- private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
- DateTime endTime = startTime.plus(duration);
- return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= \
0;
- }
-
- protected DateTime currentHour() {
- return dtService.getTimeSlice(dtService.now(), \
configuration.getRawTimeSliceDuration());
- }
-
- public Set<AggregateNumericMetric> run() {
- log.info("Starting aggregation for time slice " + startTime);
- readPermits.acquire();
- StorageResultSetFuture rawFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
- startTime.getMillis());
- Futures.addCallback(rawFuture, new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet result) {
- List<Row> rows = result.all();
- state.getRemainingRawData().set(rows.size());
- rawDataIndexEntriesArrival.countDown();
-
- log.debug("Starting raw data aggregation for " + rows.size() + " \
schedules");
- long start = System.currentTimeMillis();
- final DateTime endTime = \
startTime.plus(configuration.getRawTimeSliceDuration());
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- List<StorageResultSetFuture> rawDataFutures = new \
ArrayList<StorageResultSetFuture>(batchSize);
- for (final Row row : rows) {
- scheduleIds.add(row.getInt(1));
- readPermits.acquire();
- rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1), \
startTime.getMillis(),
- endTime.getMillis()));
- if (rawDataFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new AggregateRawData(dao, \
state, scheduleIds,
- rawDataFutures));
- rawDataFutures = new ArrayList<StorageResultSetFuture>();
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!rawDataFutures.isEmpty()) {
- state.getAggregationTasks().submit(new AggregateRawData(dao, \
state, scheduleIds,
- rawDataFutures));
- }
- log.debug("Finished processing one hour index entries in " + \
(System.currentTimeMillis() - start) +
- " ms");
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to retrieve raw data index entries. Raw data \
aggregation for time slice [" +
- startTime + "] cannot proceed.", t);
- state.setRemainingRawData(new AtomicInteger(0));
- rawDataIndexEntriesArrival.abort();
- deleteIndexEntries(MetricsTable.ONE_HOUR);
- }
- }, state.getAggregationTasks());
-
- if (state.is6HourTimeSliceFinished()) {
- long start = System.currentTimeMillis();
- log.debug("Fetching 1 hour index entries");
- StorageResultSetFuture oneHourFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
- state.getSixHourTimeSlice().getMillis());
- Futures.addCallback(oneHourFuture, new \
AggregateIndexEntriesHandler(state.getOneHourIndexEntries(),
- state.getRemaining1HourData(), \
state.getOneHourIndexEntriesArrival(), start, "1 hour", "6 hour"),
- state.getAggregationTasks());
- }
-
- if (state.is24HourTimeSliceFinished()) {
- long start = System.currentTimeMillis();
- log.debug("Fetching 6 hour index entries");
- StorageResultSetFuture sixHourFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR,
- state.getTwentyFourHourTimeSlice().getMillis());
- Futures.addCallback(sixHourFuture, new \
AggregateIndexEntriesHandler(state.getSixHourIndexEntries(),
- state.getRemaining6HourData(), \
state.getSixHourIndexEntriesArrival(), start, "6 hour", "24 hour"),
- state.getAggregationTasks());
- }
-
- try {
- try {
- rawDataIndexEntriesArrival.await();
- deleteIndexEntries(MetricsTable.ONE_HOUR);
- } catch (AbortedException e) {
- }
-
- if (state.is6HourTimeSliceFinished()) {
- waitFor(state.getRemainingRawData());
- try {
- state.getOneHourIndexEntriesArrival().await();
- deleteIndexEntries(MetricsTable.SIX_HOUR);
-
- List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getOneHourIndexEntriesLock().writeLock().lock();
- for (Integer scheduleId : state.getOneHourIndexEntries()) {
- queryFutures.add(dao.findOneHourMetricsAsync(scheduleId, \
state.getSixHourTimeSlice().getMillis(),
- state.getSixHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new \
Aggregate1HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- state.getAggregationTasks().submit(new \
Aggregate1HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- log.warn("Failed to load 1 hour index entries. Some 6 hour \
aggregates may not get generated.", e);
- } finally {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- }
-
- if (state.is24HourTimeSliceFinished()) {
- waitFor(state.getRemaining1HourData());
- try {
- state.getSixHourIndexEntriesArrival().await();
- deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
-
- List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getSixHourIndexEntriesLock().writeLock().lock();
- for (Integer scheduleId : state.getSixHourIndexEntries()) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new \
Aggregate6HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- log.debug("Submitting 6 hour aggregation task for schedule \
ids " + scheduleIds);
- state.getAggregationTasks().submit(new \
Aggregate6HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- log.warn("Failed to load 6 hour index entries. Some 24 hour \
aggregates may not get generated.", e);
- } finally {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- }
-
- while (!isAggregationFinished()) {
- Thread.sleep(50);
- }
- } catch (InterruptedException e) {
- log.warn("An interrupt occurred while waiting for aggregation to \
finish", e);
- }
- return oneHourData;
- }
-
- private void waitFor(AtomicInteger remainingData) throws InterruptedException {
- while (remainingData.get() > 0) {
- Thread.sleep(50);
- }
- }
-
- private boolean isAggregationFinished() {
- return state.getRemainingRawData().get() <= 0 && \
state.getRemaining1HourData().get() <= 0 &&
- state.getRemaining6HourData().get() <= 0 && remainingIndexEntries.get() \
<= 0;
- }
-
- private void deleteIndexEntries(final MetricsTable table) {
- final DateTime time;
- switch (table) {
- case ONE_HOUR:
- time = startTime;
- break;
- case SIX_HOUR:
- time = state.getSixHourTimeSlice();
- break;
- default:
- time = state.getTwentyFourHourTimeSlice();
- break;
- }
- log.debug("Deleting " + table + " index entries for time slice " + time);
- writePermits.acquire();
- StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table, \
time.getMillis());
- Futures.addCallback(future, new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet result) {
- remainingIndexEntries.decrementAndGet();
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to delete index entries for table " + table + " at \
time [" + time + "]");
- remainingIndexEntries.decrementAndGet();
- }
- });
- }
-
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
deleted file mode 100644
index 7146a92..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Compute1HourData implements AsyncFunction<List<ResultSet>, \
List<ResultSet>> {
-
- private final Log log = LogFactory.getLog(Compute1HourData.class);
-
- private DateTime startTime;
-
- private RateLimiter writePermits;
-
- private MetricsDAO dao;
-
- private DateTime sixHourTimeSlice;
-
- private Set<AggregateNumericMetric> oneHourData;
-
- public Compute1HourData(DateTime startTime, DateTime sixHourTimeSlice, \
RateLimiter writePermits, MetricsDAO dao,
- Set<AggregateNumericMetric> oneHourData) {
- this.startTime = startTime;
- this.sixHourTimeSlice = sixHourTimeSlice;
- this.writePermits = writePermits;
- this.dao = dao;
- this.oneHourData = oneHourData;
- }
-
- @Override
- public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
rawDataResultSets) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Computing and storing 1 hour data for " + \
rawDataResultSets.size() + " schedules");
- }
- long start = System.currentTimeMillis();
- try {
- List<StorageResultSetFuture> insertFutures = new \
ArrayList<StorageResultSetFuture>(rawDataResultSets.size());
- for (ResultSet resultSet : rawDataResultSets) {
- AggregateNumericMetric aggregate = \
calculateAggregatedRaw(resultSet);
- oneHourData.add(aggregate);
- writePermits.acquire(4);
- insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.MIN, aggregate.getMin()));
- insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.MAX, aggregate.getMax()));
- insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.AVG, aggregate.getAvg()));
- insertFutures.add(dao.updateMetricsIndex(MetricsTable.SIX_HOUR, \
aggregate.getScheduleId(),
- sixHourTimeSlice.getMillis()));
- }
- return Futures.successfulAsList(insertFutures);
- } finally {
- if (log.isDebugEnabled()) {
- log.debug("Finished computing and storing 1 hour data for " + \
rawDataResultSets.size() +
- " schedules in " + (System.currentTimeMillis() - start) + " \
ms");
- }
- }
- }
-
- private AggregateNumericMetric calculateAggregatedRaw(ResultSet resultSet) {
- double min = Double.NaN;
- double max = min;
- int count = 0;
- ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
- double value;
- List<Row> rows = resultSet.all();
-
- for (Row row : rows) {
- value = row.getDouble(2);
- if (count == 0) {
- min = value;
- max = min;
- }
- if (value < min) {
- min = value;
- } else if (value > max) {
- max = value;
- }
- mean.add(value);
- ++count;
- }
-
- return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max,
- startTime.getMillis());
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
deleted file mode 100644
index 6274bd3..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-
-/**
- * @author John Sanda
- */
-public class Compute24HourData implements AsyncFunction<List<ResultSet>, \
List<ResultSet>> {
-
- private final Log log = LogFactory.getLog(Compute24HourData.class);
-
- private DateTime startTime;
-
- private RateLimiter writePermits;
-
- private MetricsDAO dao;
-
- public Compute24HourData(DateTime startTime, RateLimiter writePermits, \
MetricsDAO dao) {
- this.startTime = startTime;
- this.writePermits = writePermits;
- this.dao = dao;
- }
-
- @Override
- public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
sixHourDataResultSets) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Computing and storing 24 hour data for " + \
sixHourDataResultSets.size() + " schedules");
- }
- long start = System.currentTimeMillis();
- try {
- List<StorageResultSetFuture> insertFutures =
- new ArrayList<StorageResultSetFuture>(sixHourDataResultSets.size());
- for (ResultSet resultSet : sixHourDataResultSets) {
- AggregateNumericMetric aggregate = calculateAggregate(resultSet);
- writePermits.acquire(3);
- insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.MIN, aggregate.getMin()));
- insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.MAX, aggregate.getMax()));
- insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.AVG, aggregate.getAvg()));
- }
- return Futures.successfulAsList(insertFutures);
- } finally {
- if (log.isDebugEnabled()) {
- log.debug("Finished computing and storing 24 hour data for " + \
sixHourDataResultSets.size() +
- " schedules in " + (System.currentTimeMillis() - start) + " \
ms");
- }
- }
- }
-
- private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
- double min = Double.NaN;
- double max = min;
- ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
- List<Row> rows = resultSet.all();
-
- for (int i = 0; i < rows.size(); i += 3) {
- if (i == 0) {
- min = rows.get(i + 1).getDouble(3);
- max = rows.get(i).getDouble(3);
- } else {
- if (rows.get(i + 1).getDouble(3) < min) {
- min = rows.get(i + 1).getDouble(3);
- }
- if (rows.get(i).getDouble(3) > max) {
- max = rows.get(i).getDouble(3);
- }
- }
- mean.add(rows.get(i + 2).getDouble(3));
- }
- return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max,
- startTime.getMillis());
- }
-
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
deleted file mode 100644
index a1efab5..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Compute6HourData implements AsyncFunction<List<ResultSet>, \
List<ResultSet>> {
-
- private final Log log = LogFactory.getLog(Compute6HourData.class);
-
- private DateTime startTime;
-
- private RateLimiter writePermits;
-
- private MetricsDAO dao;
-
- private DateTime twentyFourHourTimeSlice;
-
- public Compute6HourData(DateTime startTime, DateTime twentyFourHourTimeSlice, \
RateLimiter writePermits,
- MetricsDAO dao) {
- this.startTime = startTime;
- this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
- this.writePermits = writePermits;
- this.dao = dao;
- }
-
- @Override
- public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
oneHourDataResultSets) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Computing and storing 6 hour data for " + \
oneHourDataResultSets.size() + " schedules");
- }
- long start = System.currentTimeMillis();
- try {
- List<StorageResultSetFuture> insertFutures =
- new ArrayList<StorageResultSetFuture>(oneHourDataResultSets.size());
- for (ResultSet resultSet : oneHourDataResultSets) {
- AggregateNumericMetric aggregate = calculateAggregate(resultSet);
- writePermits.acquire(4);
- insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.MIN, aggregate.getMin()));
- insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.MAX, aggregate.getMax()));
- insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(),
- AggregateType.AVG, aggregate.getAvg()));
- insertFutures.add(dao.updateMetricsIndex(MetricsTable.TWENTY_FOUR_HOUR, \
aggregate.getScheduleId(),
- twentyFourHourTimeSlice.getMillis()));
- }
- return Futures.successfulAsList(insertFutures);
- } finally {
- if (log.isDebugEnabled()) {
- log.debug("Finished computing and storing 6 hour data for " + \
oneHourDataResultSets.size() +
- " schedules in " + (System.currentTimeMillis() - start) + " \
ms");
- }
- }
- }
-
- private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
- double min = Double.NaN;
- double max = min;
- ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
- List<Row> rows = resultSet.all();
-
- for (int i = 0; i < rows.size(); i += 3) {
- if (i == 0) {
- min = rows.get(i + 1).getDouble(3);
- max = rows.get(i).getDouble(3);
- } else {
- if (rows.get(i + 1).getDouble(3) < min) {
- min = rows.get(i + 1).getDouble(3);
- }
- if (rows.get(i).getDouble(3) > max) {
- max = rows.get(i).getDouble(3);
- }
- }
- mean.add(rows.get(i + 2).getDouble(3));
- }
- return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max,
- startTime.getMillis());
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
index 0dc4605..7f5b639 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +55,7 @@ import org.joda.time.Duration;
import org.rhq.core.domain.measurement.MeasurementDataNumeric;
import org.rhq.core.domain.measurement.composite.MeasurementDataNumericHighLowComposite;
+import org.rhq.server.metrics.aggregation.Aggregator;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateType;
import org.rhq.server.metrics.domain.MetricsIndexEntry;
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
new file mode 100644
index 0000000..1698b28
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
@@ -0,0 +1,127 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted \
for the batch, aggregation of 6 hour + * data will start immediately for the batch if \
the 24 hour time slice has finished. + *
+ * @see Compute6HourData
+ * @author John Sanda
+ */
+class Aggregate1HourData implements Runnable {
+
+ private final Log log = LogFactory.getLog(Aggregate1HourData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds, + List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> queriesFuture = \
Futures.successfulAsList(queryFutures); + Futures.withFallback(queriesFuture, \
new FutureFallback<List<ResultSet>>() { + @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception { + log.error("An error occurred while fetching one hour \
data", t); + return Futures.immediateFailedFuture(t);
+ }
+ });
+ ListenableFuture<List<ResultSet>> computeFutures = \
Futures.transform(queriesFuture, + state.getCompute6HourData(), \
state.getAggregationTasks()); + Futures.addCallback(computeFutures, new \
FutureCallback<List<ResultSet>>() { + @Override
+ public void onSuccess(List<ResultSet> result) {
+ stopwatch.stop();
+ log.debug("Finished aggregating 1 hour data for " + result.size() + \
" schedules in " + + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); + start6HourDataAggregationIfNecessary();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate 1 hour data for " + \
scheduleIds.size() + " schedules. An " + + "unexpected error \
occurred.", t); + } else {
+ log.warn("Failed to aggregate 1 hour data for " + \
scheduleIds.size() + " schedules. An " + + "unexpected error \
occurred: " + ThrowableUtil.getRootMessage(t)); + }
+ start6HourDataAggregationIfNecessary();
+ }
+ });
+ }
+
+ private void start6HourDataAggregationIfNecessary() {
+ try {
+ if (state.is24HourTimeSliceFinished()) {
+ update6HourIndexEntries();
+ List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(scheduleIds.size()); + for (Integer \
scheduleId : scheduleIds) { + \
queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
state.getTwentyFourHourTimeSlice().getMillis(), + \
state.getTwentyFourHourTimeSliceEnd().getMillis())); + }
+ state.getAggregationTasks().submit(new Aggregate6HourData(dao, \
state, scheduleIds, queryFutures)); + }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for 6 hour data index \
entries. Aborting data aggregation", + e);
+ } else {
+ log.info("An interrupt occurred while waiting for 6 hour data index \
entries. Aborting data " + + "aggregation: " + e.getMessage());
+ }
+ } finally {
+ state.getRemaining1HourData().addAndGet(-scheduleIds.size());
+ }
+ }
+
+ private void update6HourIndexEntries() throws InterruptedException {
+ try {
+ state.getSixHourIndexEntriesArrival().await();
+ try {
+ state.getSixHourIndexEntriesLock().writeLock().lock();
+ state.getSixHourIndexEntries().removeAll(scheduleIds);
+ } finally {
+ state.getSixHourIndexEntriesLock().writeLock().unlock();
+ }
+ } catch (AbortedException e) {
+ // This means we failed to retrieve the index entries. We can however
+ // continue generating 6 hour data because we do not need the index
+ // here since we already have 6 hour data to aggregate along with the
+ // schedule ids.
+ }
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
new file mode 100644
index 0000000..fbd5057
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
@@ -0,0 +1,84 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 24 hour data for a batch of 1 hour data futures. After data is inserted \
for the batch, aggregation of 6 + * hour data will start immediately for the batch if \
the 24 hour time slice has finished. + *
+ * @see Compute24HourData
+ * @author John Sanda
+ */
+class Aggregate6HourData implements Runnable {
+
+ private final Log log = LogFactory.getLog(Aggregate6HourData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds, + List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> queriesFuture = \
Futures.successfulAsList(queryFutures); + Futures.withFallback(queriesFuture, \
new FutureFallback<List<ResultSet>>() { + @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception { + log.error("An error occurred while fetching 6 hour \
data", t); + return Futures.immediateFailedFuture(t);
+ }
+ });
+ ListenableFuture<List<ResultSet>> computeFutures = \
Futures.transform(queriesFuture, + state.getCompute24HourData(), \
state.getAggregationTasks()); + Futures.addCallback(computeFutures, new \
FutureCallback<List<ResultSet>>() { + @Override
+ public void onSuccess(List<ResultSet> result) {
+ stopwatch.stop();
+ log.debug("Finished aggregating 6 hour data for " + result.size() + \
" schedules in " + + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); + state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate 6 hour data for " + \
scheduleIds.size() + " schedules. An " + + "unexpected error \
occurred.", t); + } else {
+ log.warn("Failed to aggregate 6 hour data for " + \
scheduleIds.size() + " schedules. An " + + "unexpected error \
occurred: " + ThrowableUtil.getRootMessage(t)); + }
+ state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+ }
+ });
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
new file mode 100644
index 0000000..cb64fcf
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
@@ -0,0 +1,73 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+* @author John Sanda
+*/
+class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
+
+ private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
+
+ private Set<Integer> indexEntries;
+
+ private AtomicInteger remainingData;
+
+ private SignalingCountDownLatch indexEntriesArrival;
+
+ private Stopwatch stopwatch;
+
+ private String src;
+
+ private String dest;
+
+ public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger \
remainingData, + SignalingCountDownLatch indexEntriesArrival, Stopwatch \
stopwatch, String src, String dest) { + this.indexEntries = indexEntries;
+ this.remainingData = remainingData;
+ this.indexEntriesArrival = indexEntriesArrival;
+ this.stopwatch = stopwatch;
+ this.src = src;
+ this.dest = dest;
+ }
+
+ @Override
+ public void onSuccess(ResultSet resultSet) {
+ for (Row row : resultSet) {
+ indexEntries.add(row.getInt(1));
+ }
+ remainingData.set(indexEntries.size());
+ indexEntriesArrival.countDown();
+ stopwatch.stop();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished loading " + indexEntries.size() + " " + src + " \
index entries in " + + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); + }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some " + dest + " aggregates may not get computed. An \
unexpected error occurred while " + + "retrieving " + src + " index \
entries", t); + } else {
+ log.warn("Some " + dest + " aggregates may not get computed. An \
unexpected error occurred while " + + "retrieving " + src + " index \
entries: " + ThrowableUtil.getRootMessage(t)); + }
+ remainingData.set(0);
+ indexEntriesArrival.abort();
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
new file mode 100644
index 0000000..87a7266
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
@@ -0,0 +1,133 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 1 hour data for a batch of raw data futures. After data is inserted for \
the batch, aggregation of 1 hour + * data will start immediately for the batch if the \
6 hour time slice has finished. + *
+ * @see Compute1HourData
+ * @author John Sanda
+ */
+class AggregateRawData implements Runnable {
+
+ private final Log log = LogFactory.getLog(AggregateRawData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer> \
scheduleIds, + List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> rawDataFutures = \
Futures.successfulAsList(queryFutures); + Futures.withFallback(rawDataFutures, \
new FutureFallback<List<ResultSet>>() { + @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t) throws \
Exception { + log.error("An error occurred while fetching raw data", \
t); + return Futures.immediateFailedFuture(t);
+ }
+ });
+
+ final ListenableFuture<List<ResultSet>> insert1HourDataFutures = \
Futures.transform(rawDataFutures, + state.getCompute1HourData(), \
state.getAggregationTasks()); + Futures.addCallback(insert1HourDataFutures, \
new FutureCallback<List<ResultSet>>() { + @Override
+ public void onSuccess(List<ResultSet> resultSets) {
+ stopwatch.stop();
+ log.debug("Finished aggregating raw data for " + resultSets.size() + \
" schedules in " + + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " \
ms"); + start1HourDataAggregationIfNecessary();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate raw data for " + \
scheduleIds.size() + " schedules. An unexpected " + + "error \
occurred.", t); + } else {
+ log.warn("Failed to aggregate raw data for " + \
scheduleIds.size() + " schedules. An " + + "unexpected error \
occurred: " + ThrowableUtil.getRootMessage(t)); + }
+ start1HourDataAggregationIfNecessary();
+ }
+ }, state.getAggregationTasks());
+ }
+
+ private void start1HourDataAggregationIfNecessary() {
+ try {
+ if (state.is6HourTimeSliceFinished()) {
+ update1HourIndexEntries();
+ List<StorageResultSetFuture> oneHourDataQueryFutures = new \
ArrayList<StorageResultSetFuture>( + scheduleIds.size());
+ for (Integer scheduleId : scheduleIds) {
+ \
oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId, + \
state.getSixHourTimeSlice().getMillis(), \
state.getSixHourTimeSliceEnd().getMillis())); + }
+ state.getAggregationTasks().submit(new Aggregate1HourData(dao, \
state, scheduleIds, + oneHourDataQueryFutures));
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for 1 hour data index \
entries. Aborting data aggregation", + e);
+ } else {
+ log.info("An interrupt occurred while waiting for 1 hour data index \
entries. Aborting data " + + "aggregation: " + e.getMessage());
+ }
+ } finally {
+ state.getRemainingRawData().addAndGet(-scheduleIds.size());
+ }
+ }
+
+ private void update1HourIndexEntries() throws InterruptedException {
+ try {
+ // Wait for the arrival so that we can remove the schedules ids in this
+ // batch from the one hour index entries. This will prevent duplicate \
tasks + // being submitted to process the same 1 hour data.
+ state.getOneHourIndexEntriesArrival().await();
+ try {
+ state.getOneHourIndexEntriesLock().writeLock().lock();
+ state.getOneHourIndexEntries().removeAll(scheduleIds);
+ } finally {
+ state.getOneHourIndexEntriesLock().writeLock().unlock();
+ }
+ } catch (AbortedException e) {
+ // This means we failed to retrieve the index entries. We can however
+ // continue generating 1 hour data because we do not need the index
+ // here since we already have 1 hour data to aggregate along with the
+ // schedule ids.
+ }
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
new file mode 100644
index 0000000..345e53a
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
@@ -0,0 +1,257 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+ * @author John Sanda
+ */
+class AggregationState {
+
+ private ListeningExecutorService aggregationTasks;
+
+ private SignalingCountDownLatch oneHourIndexEntriesArrival;
+
+ private SignalingCountDownLatch sixHourIndexEntriesArrival;
+
+ private AtomicInteger remainingRawData;
+
+ private AtomicInteger remaining1HourData;
+
+ private AtomicInteger remaining6HourData;
+
+ private Set<Integer> oneHourIndexEntries;
+
+ private Set<Integer> sixHourIndexEntries;
+
+ private ReentrantReadWriteLock oneHourIndexEntriesLock;
+
+ private ReentrantReadWriteLock sixHourIndexEntriesLock;
+
+ private DateTime oneHourTimeSlice;
+
+ private DateTime sixHourTimeSlice;
+
+ private DateTime sixHourTimeSliceEnd;
+
+ private DateTime twentyFourHourTimeSlice;
+
+ private DateTime twentyFourHourTimeSliceEnd;
+
+ private boolean sixHourTimeSliceFinished;
+
+ private boolean twentyFourHourTimeSliceFinished;
+
+ private Compute1HourData compute1HourData;
+
+ private Compute6HourData compute6HourData;
+
+ private Compute24HourData compute24HourData;
+
+ public ListeningExecutorService getAggregationTasks() {
+ return aggregationTasks;
+ }
+
+ public AggregationState setAggregationTasks(ListeningExecutorService \
aggregationTasks) { + this.aggregationTasks = aggregationTasks;
+ return this;
+ }
+
+ /**
+ * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
entries for schedules with 1 hour + * data to be aggregated
+ */
+ public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
+ return oneHourIndexEntriesArrival;
+ }
+
+ public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch \
oneHourIndexEntriesArrival) { + this.oneHourIndexEntriesArrival = \
oneHourIndexEntriesArrival; + return this;
+ }
+
+ /**
+ * @return A {@link SignalingCountDownLatch} to signal the arrival of index \
entries for schedules with 6 hour + * data to be aggregated
+ */
+ public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
+ return sixHourIndexEntriesArrival;
+ }
+
+ public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch \
sixHourIndexEntriesArrival) { + this.sixHourIndexEntriesArrival = \
sixHourIndexEntriesArrival; + return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with raw data to be aggregated
+ */
+ public AtomicInteger getRemainingRawData() {
+ return remainingRawData;
+ }
+
+ public AggregationState setRemainingRawData(AtomicInteger remainingRawData) {
+ this.remainingRawData = remainingRawData;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with 1 hour data to be aggregated
+ */
+ public AtomicInteger getRemaining1HourData() {
+ return remaining1HourData;
+ }
+
+ public AggregationState setRemaining1HourData(AtomicInteger remaining1HourData) \
{ + this.remaining1HourData = remaining1HourData;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with 6 hour data to be aggregated
+ */
+ public AtomicInteger getRemaining6HourData() {
+ return remaining6HourData;
+ }
+
+ public AggregationState setRemaining6HourData(AtomicInteger remaining6HourData) \
{ + this.remaining6HourData = remaining6HourData;
+ return this;
+ }
+
+ /**
+ * @return The schedule ids with 1 hour data to be aggregated
+ */
+ public Set<Integer> getOneHourIndexEntries() {
+ return oneHourIndexEntries;
+ }
+
+ public AggregationState setOneHourIndexEntries(Set<Integer> oneHourIndexEntries) \
{ + this.oneHourIndexEntries = oneHourIndexEntries;
+ return this;
+ }
+
+ public Set<Integer> getSixHourIndexEntries() {
+ return sixHourIndexEntries;
+ }
+
+ public AggregationState setSixHourIndexEntries(Set<Integer> sixHourIndexEntries) \
{ + this.sixHourIndexEntries = sixHourIndexEntries;
+ return this;
+ }
+
+ public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
+ return oneHourIndexEntriesLock;
+ }
+
+ public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock \
oneHourIndexEntriesLock) { + this.oneHourIndexEntriesLock = \
oneHourIndexEntriesLock; + return this;
+ }
+
+ public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
+ return sixHourIndexEntriesLock;
+ }
+
+ public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock \
sixHourIndexEntriesLock) { + this.sixHourIndexEntriesLock = \
sixHourIndexEntriesLock; + return this;
+ }
+
+ public DateTime getOneHourTimeSlice() {
+ return oneHourTimeSlice;
+ }
+
+ public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
+ this.oneHourTimeSlice = oneHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getSixHourTimeSlice() {
+ return sixHourTimeSlice;
+ }
+
+ public AggregationState setSixHourTimeSlice(DateTime sixHourTimeSlice) {
+ this.sixHourTimeSlice = sixHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getSixHourTimeSliceEnd() {
+ return sixHourTimeSliceEnd;
+ }
+
+ public AggregationState setSixHourTimeSliceEnd(DateTime sixHourTimeSliceEnd) {
+ this.sixHourTimeSliceEnd = sixHourTimeSliceEnd;
+ return this;
+ }
+
+ public DateTime getTwentyFourHourTimeSlice() {
+ return twentyFourHourTimeSlice;
+ }
+
+ public AggregationState setTwentyFourHourTimeSlice(DateTime \
twentyFourHourTimeSlice) { + this.twentyFourHourTimeSlice = \
twentyFourHourTimeSlice; + return this;
+ }
+
+ public DateTime getTwentyFourHourTimeSliceEnd() {
+ return twentyFourHourTimeSliceEnd;
+ }
+
+ public AggregationState setTwentyFourHourTimeSliceEnd(DateTime \
twentyFourHourTimeSliceEnd) { + this.twentyFourHourTimeSliceEnd = \
twentyFourHourTimeSliceEnd; + return this;
+ }
+
+ public boolean is6HourTimeSliceFinished() {
+ return sixHourTimeSliceFinished;
+ }
+
+ public AggregationState set6HourTimeSliceFinished(boolean \
is6HourTimeSliceFinished) { + this.sixHourTimeSliceFinished = \
is6HourTimeSliceFinished; + return this;
+ }
+
+ public boolean is24HourTimeSliceFinished() {
+ return twentyFourHourTimeSliceFinished;
+ }
+
+ public AggregationState set24HourTimeSliceFinished(boolean \
is24HourTimeSliceFinished) { + this.twentyFourHourTimeSliceFinished = \
is24HourTimeSliceFinished; + return this;
+ }
+
+ public Compute1HourData getCompute1HourData() {
+ return compute1HourData;
+ }
+
+ public AggregationState setCompute1HourData(Compute1HourData compute1HourData) {
+ this.compute1HourData = compute1HourData;
+ return this;
+ }
+
+ public Compute6HourData getCompute6HourData() {
+ return compute6HourData;
+ }
+
+ public AggregationState setCompute6HourData(Compute6HourData compute6HourData) {
+ this.compute6HourData = compute6HourData;
+ return this;
+ }
+
+ public Compute24HourData getCompute24HourData() {
+ return compute24HourData;
+ }
+
+ public AggregationState setCompute24HourData(Compute24HourData \
compute24HourData) { + this.compute24HourData = compute24HourData;
+ return this;
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
new file mode 100644
index 0000000..bf0bc1a
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
@@ -0,0 +1,378 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeComparator;
+import org.joda.time.Duration;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.DateTimeService;
+import org.rhq.server.metrics.MetricsConfiguration;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * This class provides the main interface for metric data aggregation.
+ *
+ * @author John Sanda
+ */
+public class Aggregator {
+
+ private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR = \
new Comparator<AggregateNumericMetric>() { + @Override
+ public int compare(AggregateNumericMetric left, AggregateNumericMetric \
right) { + return (left.getScheduleId() < right.getScheduleId()) ? -1 : \
((left.getScheduleId() == right.getScheduleId()) ? 0 : 1); + }
+ };
+
+ private final Log log = LogFactory.getLog(Aggregator.class);
+
+ private MetricsDAO dao;
+
+ private MetricsConfiguration configuration;
+
+ private DateTimeService dtService;
+
+ private DateTime startTime;
+
+ /**
+ * Signals when raw data index entries (in metrics_index) can be deleted. We \
cannot delete the row in metrics_index + * until we know that it has been read, \
successfully or otherwise. + */
+ private SignalingCountDownLatch rawDataIndexEntriesArrival;
+
+ private RateLimiter readPermits;
+ private RateLimiter writePermits;
+
+ private int batchSize;
+
+ private AggregationState state;
+
+ private Set<AggregateNumericMetric> oneHourData;
+
+ private AtomicInteger remainingIndexEntries;
+
+ public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao, \
MetricsConfiguration configuration, + DateTimeService dtService, DateTime \
startTime, int batchSize, RateLimiter writePermits, + RateLimiter readPermits) \
{ + this.dao = dao;
+ this.configuration = configuration;
+ this.dtService = dtService;
+ this.startTime = startTime;
+ this.readPermits = readPermits;
+ this.writePermits = writePermits;
+ this.batchSize = batchSize;
+ oneHourData = new \
ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR); + \
rawDataIndexEntriesArrival = new SignalingCountDownLatch(new CountDownLatch(1)); + \
remainingIndexEntries = new AtomicInteger(1); +
+ DateTime sixHourTimeSlice = get6HourTimeSlice();
+ DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
+
+ state = new AggregationState()
+ .setAggregationTasks(aggregationTasks)
+ .setOneHourTimeSlice(startTime)
+ .setSixHourTimeSlice(sixHourTimeSlice)
+ .setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
+ .setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
+ .setTwentyFourHourTimeSliceEnd(twentyFourHourTimeSlice.plus(configuration.getSixHourTimeSliceDuration()))
+ .setCompute1HourData(new Compute1HourData(startTime, sixHourTimeSlice, \
writePermits, dao, oneHourData)) + .setCompute6HourData(new \
Compute6HourData(sixHourTimeSlice, twentyFourHourTimeSlice, writePermits, dao)) + \
.setCompute24HourData(new Compute24HourData(twentyFourHourTimeSlice, writePermits, \
dao)) + .set6HourTimeSliceFinished(hasTimeSliceEnded(sixHourTimeSlice, \
configuration.getOneHourTimeSliceDuration())) + \
.set24HourTimeSliceFinished(hasTimeSliceEnded(twentyFourHourTimeSlice, + \
configuration.getSixHourTimeSliceDuration())) + .setRemainingRawData(new \
AtomicInteger(0)) + .setRemaining1HourData(new AtomicInteger(0))
+ .setRemaining6HourData(new AtomicInteger(0))
+ .setOneHourIndexEntries(new TreeSet<Integer>())
+ .setSixHourIndexEntries(new TreeSet<Integer>())
+ .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
+ .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
+
+ if (state.is6HourTimeSliceFinished()) {
+ state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(1))); + remainingIndexEntries.incrementAndGet();
+ } else {
+ state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(0))); + state.setRemaining1HourData(new AtomicInteger(0));
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(1))); + remainingIndexEntries.incrementAndGet();
+ } else {
+ state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new \
CountDownLatch(0))); + state.setRemaining6HourData(new AtomicInteger(0));
+ }
+ }
+
+ private DateTime get24HourTimeSlice() {
+ return dtService.getTimeSlice(startTime, \
configuration.getSixHourTimeSliceDuration()); + }
+
+ private DateTime get6HourTimeSlice() {
+ return dtService.getTimeSlice(startTime, \
configuration.getOneHourTimeSliceDuration()); + }
+
+ private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
+ DateTime endTime = startTime.plus(duration);
+ return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= \
0; + }
+
+ protected DateTime currentHour() {
+ return dtService.getTimeSlice(dtService.now(), \
configuration.getRawTimeSliceDuration()); + }
+
+ public Set<AggregateNumericMetric> run() {
+ log.info("Starting aggregation for time slice " + startTime);
+ readPermits.acquire();
+ StorageResultSetFuture rawFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR, + \
startTime.getMillis()); + Futures.addCallback(rawFuture, new \
FutureCallback<ResultSet>() { + @Override
+ public void onSuccess(ResultSet result) {
+ List<Row> rows = result.all();
+ state.getRemainingRawData().set(rows.size());
+ rawDataIndexEntriesArrival.countDown();
+
+ Stopwatch stopwatch = new Stopwatch().start();
+
+ final DateTime endTime = \
startTime.plus(configuration.getRawTimeSliceDuration()); + \
Set<Integer> scheduleIds = new TreeSet<Integer>(); + \
List<StorageResultSetFuture> rawDataFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); + for (final Row row : \
rows) { + scheduleIds.add(row.getInt(1));
+ readPermits.acquire();
+ rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1), \
startTime.getMillis(), + endTime.getMillis()));
+ if (rawDataFutures.size() == batchSize) {
+ state.getAggregationTasks().submit(new AggregateRawData(dao, \
state, scheduleIds, + rawDataFutures));
+ rawDataFutures = new ArrayList<StorageResultSetFuture>();
+ scheduleIds = new TreeSet<Integer>();
+ }
+ }
+ if (!rawDataFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new AggregateRawData(dao, \
state, scheduleIds, + rawDataFutures));
+ }
+
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished scheduling raw data aggregation tasks for " \
+ rows.size() + " schedules in " + + \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); + }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Aggregation for time slice [" + startTime + "] cannot \
proceed. There was an " + + "unexpected error while retrieving \
raw data index entries.", t); + } else {
+ log.warn("Aggregation for time slice [" + startTime + "] cannot \
proceed. There was an " + + "unexpected error while retrieving \
raw data index entries: " + ThrowableUtil.getRootMessage(t)); + }
+ state.setRemainingRawData(new AtomicInteger(0));
+ rawDataIndexEntriesArrival.abort();
+ deleteIndexEntries(MetricsTable.ONE_HOUR);
+ }
+ }, state.getAggregationTasks());
+
+ if (state.is6HourTimeSliceFinished()) {
+ log.debug("Fetching 1 hour index entries");
+ Stopwatch stopwatch = new Stopwatch().start();
+ StorageResultSetFuture oneHourFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR, + \
state.getSixHourTimeSlice().getMillis()); + \
Futures.addCallback(oneHourFuture, new \
AggregateIndexEntriesHandler(state.getOneHourIndexEntries(), + \
state.getRemaining1HourData(), state.getOneHourIndexEntriesArrival(), stopwatch, "1 \
hour", "6 hour"), + state.getAggregationTasks());
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ log.debug("Fetching 6 hour index entries");
+ Stopwatch stopwatch = new Stopwatch().start();
+ StorageResultSetFuture sixHourFuture = \
dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR, + \
state.getTwentyFourHourTimeSlice().getMillis()); + \
Futures.addCallback(sixHourFuture, new \
AggregateIndexEntriesHandler(state.getSixHourIndexEntries(), + \
state.getRemaining6HourData(), state.getSixHourIndexEntriesArrival(), stopwatch, "6 \
hour", "24 hour"), + state.getAggregationTasks());
+ }
+
+ try {
+ try {
+ rawDataIndexEntriesArrival.await();
+ deleteIndexEntries(MetricsTable.ONE_HOUR);
+ } catch (AbortedException e) {
+ }
+
+ if (state.is6HourTimeSliceFinished()) {
+ waitFor(state.getRemainingRawData());
+ try {
+ state.getOneHourIndexEntriesArrival().await();
+ deleteIndexEntries(MetricsTable.SIX_HOUR);
+
+ List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); + Set<Integer> \
scheduleIds = new TreeSet<Integer>(); + \
state.getOneHourIndexEntriesLock().writeLock().lock(); + \
log.debug("Preparing to submit 1 hour data aggregation tasks for " + + \
state.getOneHourIndexEntries().size() + " schedules"); + for \
(Integer scheduleId : state.getOneHourIndexEntries()) { + \
queryFutures.add(dao.findOneHourMetricsAsync(scheduleId, \
state.getSixHourTimeSlice().getMillis(), + \
state.getSixHourTimeSliceEnd().getMillis())); + \
scheduleIds.add(scheduleId); + if (queryFutures.size() == \
batchSize) { + state.getAggregationTasks().submit(new \
Aggregate1HourData(dao, state, scheduleIds, + \
queryFutures)); + queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); + \
scheduleIds = new TreeSet<Integer>(); + }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new \
Aggregate1HourData(dao, state, scheduleIds, + \
queryFutures)); + queryFutures = null;
+ scheduleIds = null;
+ }
+ } catch (AbortedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some 6 hour aggregates may not get generated. \
There was an unexpected error while " + + "loading 1 hour \
index entries", e); + } else {
+ log.warn("Some 6 hour aggregates may not get generated. \
There was an unexpected error while " + + "loading 1 hour \
index entries: " + ThrowableUtil.getRootMessage(e)); + }
+ } finally {
+ state.getOneHourIndexEntriesLock().writeLock().unlock();
+ }
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ waitFor(state.getRemaining1HourData());
+ try {
+ state.getSixHourIndexEntriesArrival().await();
+ deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
+
+ List<StorageResultSetFuture> queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); + Set<Integer> \
scheduleIds = new TreeSet<Integer>(); + \
state.getSixHourIndexEntriesLock().writeLock().lock(); + \
log.debug("Preparing to submit 6 hour data aggregation tasks for " + + \
state.getSixHourIndexEntries().size() + " schedules"); + for \
(Integer scheduleId : state.getSixHourIndexEntries()) { + \
queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, \
state.getTwentyFourHourTimeSlice().getMillis(), + \
state.getTwentyFourHourTimeSliceEnd().getMillis())); + \
scheduleIds.add(scheduleId); + if (queryFutures.size() == \
batchSize) { + state.getAggregationTasks().submit(new \
Aggregate6HourData(dao, state, scheduleIds, + \
queryFutures)); + queryFutures = new \
ArrayList<StorageResultSetFuture>(batchSize); + \
scheduleIds = new TreeSet<Integer>(); + }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new \
Aggregate6HourData(dao, state, scheduleIds, + \
queryFutures)); + queryFutures = null;
+ scheduleIds = null;
+ }
+ } catch (AbortedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some 24 hour aggregates may not get generated. \
There was an unexpected error while " + + "loading 6 hour \
index entries", e); + } else {
+ log.warn("Some 24 hour aggregates may not get generated. \
There was an unexpected error while " + + "loading 6 hour \
index entries: " + ThrowableUtil.getRootMessage(e)); + }
+ } finally {
+ state.getSixHourIndexEntriesLock().writeLock().unlock();
+ }
+ }
+
+ while (!isAggregationFinished()) {
+ Thread.sleep(50);
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for aggregation to \
finish. Aborting remaining work.", e); + } else {
+ log.warn("An interrupt occurred while waiting for aggregation to \
finish. Aborting remaining work: " + + \
ThrowableUtil.getRootMessage(e)); + }
+ log.warn("An interrupt occurred while waiting for aggregation to \
finish", e); + }
+ return oneHourData;
+ }
+
+ private void waitFor(AtomicInteger remainingData) throws InterruptedException {
+ while (remainingData.get() > 0) {
+ Thread.sleep(50);
+ }
+ }
+
+ private boolean isAggregationFinished() {
+ return state.getRemainingRawData().get() <= 0 && \
state.getRemaining1HourData().get() <= 0 && + \
state.getRemaining6HourData().get() <= 0 && remainingIndexEntries.get() <= 0; + }
+
+ private void deleteIndexEntries(final MetricsTable table) {
+ final DateTime time;
+ switch (table) {
+ case ONE_HOUR:
+ time = startTime;
+ break;
+ case SIX_HOUR:
+ time = state.getSixHourTimeSlice();
+ break;
+ default:
+ time = state.getTwentyFourHourTimeSlice();
+ break;
+ }
+ log.debug("Deleting " + table + " index entries for time slice " + time);
+ writePermits.acquire();
+ StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table, \
time.getMillis()); + Futures.addCallback(future, new \
FutureCallback<ResultSet>() { + @Override
+ public void onSuccess(ResultSet result) {
+ remainingIndexEntries.decrementAndGet();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to delete index entries for table " + table + \
" at time [" + time + "]. An " + + "unexpected error \
occurred.", t); + } else {
+ log.warn("Failed to delete index entries for table " + table + " \
at time [" + time + "]. An " + + "unexpected error occurred: " \
+ ThrowableUtil.getRootMessage(t)); + }
+ remainingIndexEntries.decrementAndGet();
+ }
+ });
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
new file mode 100644
index 0000000..f130f75
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
@@ -0,0 +1,113 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 1 hour data for a batch of raw data result sets. The generated 1 hour \
aggregates are inserted along with + * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute1HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute1HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ private DateTime sixHourTimeSlice;
+
+ private Set<AggregateNumericMetric> oneHourData;
+
+ public Compute1HourData(DateTime startTime, DateTime sixHourTimeSlice, \
RateLimiter writePermits, MetricsDAO dao, + Set<AggregateNumericMetric> \
oneHourData) { + this.startTime = startTime;
+ this.sixHourTimeSlice = sixHourTimeSlice;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ this.oneHourData = oneHourData;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
rawDataResultSets) throws Exception { + if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 1 hour data for " + \
rawDataResultSets.size() + " schedules"); + }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures = new \
ArrayList<StorageResultSetFuture>(rawDataResultSets.size()); + for \
(ResultSet resultSet : rawDataResultSets) { + AggregateNumericMetric \
aggregate = calculateAggregatedRaw(resultSet); + \
oneHourData.add(aggregate); + writePermits.acquire(4);
+ insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.MIN, \
aggregate.getMin())); + \
insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.MAX, \
aggregate.getMax())); + \
insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.AVG, \
aggregate.getAvg())); + \
insertFutures.add(dao.updateMetricsIndex(MetricsTable.SIX_HOUR, \
aggregate.getScheduleId(), + sixHourTimeSlice.getMillis()));
+ }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 1 hour data for " + \
rawDataResultSets.size() + + " schedules in " + \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); + }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregatedRaw(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ int count = 0;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ double value;
+ List<Row> rows = resultSet.all();
+
+ for (Row row : rows) {
+ value = row.getDouble(2);
+ if (count == 0) {
+ min = value;
+ max = min;
+ }
+ if (value < min) {
+ min = value;
+ } else if (value > max) {
+ max = value;
+ }
+ mean.add(value);
+ ++count;
+ }
+
+ return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max, + startTime.getMillis());
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
new file mode 100644
index 0000000..6fe9d79
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
@@ -0,0 +1,99 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+
+/**
+ * Computes 24 hour data for a batch of raw data result sets. The generated 6 hour \
aggregates are inserted. + *
+ * @author John Sanda
+ */
+class Compute24HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute24HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ public Compute24HourData(DateTime startTime, RateLimiter writePermits, \
MetricsDAO dao) { + this.startTime = startTime;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
sixHourDataResultSets) throws Exception { + if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 24 hour data for " + \
sixHourDataResultSets.size() + " schedules"); + }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures =
+ new ArrayList<StorageResultSetFuture>(sixHourDataResultSets.size());
+ for (ResultSet resultSet : sixHourDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+ writePermits.acquire(3);
+ insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.MIN, \
aggregate.getMin())); + \
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.MAX, \
aggregate.getMax())); + \
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.AVG, \
aggregate.getAvg())); + }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 24 hour data for " + \
sixHourDataResultSets.size() + + " schedules in " + \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); + }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ List<Row> rows = resultSet.all();
+
+ for (int i = 0; i < rows.size(); i += 3) {
+ if (i == 0) {
+ min = rows.get(i + 1).getDouble(3);
+ max = rows.get(i).getDouble(3);
+ } else {
+ if (rows.get(i + 1).getDouble(3) < min) {
+ min = rows.get(i + 1).getDouble(3);
+ }
+ if (rows.get(i).getDouble(3) > max) {
+ max = rows.get(i).getDouble(3);
+ }
+ }
+ mean.add(rows.get(i + 2).getDouble(3));
+ }
+ return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max, + startTime.getMillis());
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java \
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
new file mode 100644
index 0000000..ec1ee26
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
@@ -0,0 +1,106 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 6 hour data for a batch of raw data result sets. The generated 6 hour \
aggregates are inserted along with + * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute6HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute6HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ private DateTime twentyFourHourTimeSlice;
+
+ public Compute6HourData(DateTime startTime, DateTime twentyFourHourTimeSlice, \
RateLimiter writePermits, + MetricsDAO dao) {
+ this.startTime = startTime;
+ this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet> \
oneHourDataResultSets) throws Exception { + if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 6 hour data for " + \
oneHourDataResultSets.size() + " schedules"); + }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures =
+ new ArrayList<StorageResultSetFuture>(oneHourDataResultSets.size());
+ for (ResultSet resultSet : oneHourDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+ writePermits.acquire(4);
+ insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.MIN, \
aggregate.getMin())); + \
insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.MAX, \
aggregate.getMax())); + \
insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), \
aggregate.getTimestamp(), + AggregateType.AVG, \
aggregate.getAvg())); + \
insertFutures.add(dao.updateMetricsIndex(MetricsTable.TWENTY_FOUR_HOUR, \
aggregate.getScheduleId(), + \
twentyFourHourTimeSlice.getMillis())); + }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 6 hour data for " + \
oneHourDataResultSets.size() + + " schedules in " + \
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); + }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ List<Row> rows = resultSet.all();
+
+ for (int i = 0; i < rows.size(); i += 3) {
+ if (i == 0) {
+ min = rows.get(i + 1).getDouble(3);
+ max = rows.get(i).getDouble(3);
+ } else {
+ if (rows.get(i + 1).getDouble(3) < min) {
+ min = rows.get(i + 1).getDouble(3);
+ }
+ if (rows.get(i).getDouble(3) > max) {
+ max = rows.get(i).getDouble(3);
+ }
+ }
+ mean.add(rows.get(i + 2).getDouble(3));
+ }
+ return new AggregateNumericMetric(rows.get(0).getInt(0), \
mean.getArithmeticMean(), min, max, + startTime.getMillis());
+ }
+}
_______________________________________________
rhq-commits mailing list
rhq-commits@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/rhq-commits
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic